This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new cf63eae4 [ISSUE-301][Subtask][Improvement][AQE] Merge continuous
ShuffleDataSegment into single one (#303)
cf63eae4 is described below
commit cf63eae4539264f694760fdf48d882d931ac8e20
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Nov 7 11:59:47 2022 +0800
[ISSUE-301][Subtask][Improvement][AQE] Merge continuous ShuffleDataSegment
into single one (#303)
### What changes were proposed in this pull request?
1. Merge continuous ShuffleDataSegment into single one
2. Throw exception when reading the discontinuous blocks into segments
directly
### Why are the changes needed?
Currently, the LocalOrderSegmentSplitter will split the index file into
multiple shuffleDataSegments. But the split scope is limited in the range
of local order.
For example:
The blocks are as follow
```
block-a (taskId-1)
block-b (taskId-2)
block-c (taskId-1)
block-d (taskId-2)
```
When the reader want to get the range of taskIds: [1, 3), the strategy
will return
two shuffleDataSegments. But we'd better to merge them into single one to
reduce the network interaction times, because they are continuous.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. UTs
---
.../common/segment/LocalOrderSegmentSplitter.java | 12 ++-
.../segment/LocalOrderSegmentSplitterTest.java | 88 ++++++++++++++++++++++
2 files changed, 98 insertions(+), 2 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index 77e02e06..0a4a669d 100644
---
a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++
b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -70,6 +70,7 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
long totalLen = 0;
long lastTaskAttemptId = -1;
+ long lastExpectedBlockIndex = -1;
/**
* One ShuffleDataSegment should meet following requirements:
@@ -79,6 +80,7 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
* 3. ShuffleDataSegment's blocks should be continuous
*
*/
+ int index = 0;
while (byteBuffer.hasRemaining()) {
try {
long offset = byteBuffer.getLong();
@@ -98,7 +100,8 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
break;
}
- if ((taskAttemptId < lastTaskAttemptId && bufferSegments.size() > 0)
|| bufferOffset >= readBufferSize) {
+ if ((taskAttemptId < lastTaskAttemptId && bufferSegments.size() > 0 &&
index - lastExpectedBlockIndex != 1)
+ || bufferOffset >= readBufferSize) {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset,
bufferOffset, bufferSegments);
dataFileSegments.add(sds);
bufferSegments = Lists.newArrayList();
@@ -107,14 +110,20 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
}
if (expectTaskIds.contains(taskAttemptId)) {
+ if (bufferOffset != 0 && index - lastExpectedBlockIndex > 1) {
+ throw new RssException("There are discontinuous blocks which
should not happen when using LOCAL_ORDER.");
+ }
+
if (fileOffset == -1) {
fileOffset = offset;
}
bufferSegments.add(new BufferSegment(blockId, bufferOffset, length,
uncompressLength, crc, taskAttemptId));
bufferOffset += length;
+ lastExpectedBlockIndex = index;
}
lastTaskAttemptId = taskAttemptId;
+ index++;
} catch (BufferUnderflowException ue) {
throw new RssException("Read index data under flow", ue);
}
@@ -124,7 +133,6 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset,
bufferOffset, bufferSegments);
dataFileSegments.add(sds);
}
-
return dataFileSegments;
}
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
index 27a29814..4df956d6 100644
---
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
@@ -24,14 +24,102 @@ import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
public class LocalOrderSegmentSplitterTest {
+ @Test
+ public void testSplitWithDiscontinuousBlocksShouldThrowException() {
+ Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 4);
+ LocalOrderSegmentSplitter splitter = new
LocalOrderSegmentSplitter(taskIds, 32);
+ byte[] data = generateData(
+ Pair.of(1, 1),
+ Pair.of(1, 2),
+ Pair.of(1, 3),
+ Pair.of(1, 4)
+ );
+ try {
+ splitter.split(new ShuffleIndexResult(data, -1));
+ fail();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ @Test
+ public void testSplitForMergeContinuousSegments() {
+ /**
+ * case1: (32, 5) (16, 1) (10, 1) (16, 2) (6, 1) (8, 1) (10, 3) (9, 1)
+ *
+ * It will skip the (32, 5) and merge others into one dataSegment when no
exceeding the
+ * read buffer size.
+ */
+ Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1, 2);
+ LocalOrderSegmentSplitter splitter = new
LocalOrderSegmentSplitter(taskIds, 1000);
+ byte[] data = generateData(
+ Pair.of(32, 5),
+ Pair.of(16, 1),
+ Pair.of(10, 1),
+ Pair.of(16, 2),
+ Pair.of(6, 1),
+ Pair.of(8, 1),
+ Pair.of(10, 3),
+ Pair.of(9, 1)
+ );
+ List<ShuffleDataSegment> dataSegments = splitter.split(new
ShuffleIndexResult(data, -1));
+ assertEquals(2, dataSegments.size());
+ assertEquals(32, dataSegments.get(0).getOffset());
+ assertEquals(56, dataSegments.get(0).getLength());
+
+ List<BufferSegment> bufferSegments =
dataSegments.get(0).getBufferSegments();
+ assertEquals(0, bufferSegments.get(0).getOffset());
+ assertEquals(16, bufferSegments.get(0).getLength());
+
+ assertEquals(16, bufferSegments.get(1).getOffset());
+ assertEquals(10, bufferSegments.get(1).getLength());
+
+ assertEquals(26, bufferSegments.get(2).getOffset());
+ assertEquals(16, bufferSegments.get(2).getLength());
+
+ assertEquals(42, bufferSegments.get(3).getOffset());
+ assertEquals(6, bufferSegments.get(3).getLength());
+
+ assertEquals(48, bufferSegments.get(4).getOffset());
+ assertEquals(8, bufferSegments.get(4).getLength());
+
+ assertEquals(98, dataSegments.get(1).getOffset());
+ assertEquals(9, dataSegments.get(1).getLength());
+ bufferSegments = dataSegments.get(1).getBufferSegments();
+ assertEquals(1, bufferSegments.size());
+ assertEquals(0, bufferSegments.get(0).getOffset());
+ assertEquals(9, bufferSegments.get(0).getLength());
+
+ /**
+ * case2: (16, 1) (16, 2) (6, 1)
+ *
+ * It will skip merging into one dataSegment when exceeding the
+ * read buffer size.
+ */
+ data = generateData(
+ Pair.of(16, 1),
+ Pair.of(15, 2),
+ Pair.of(1, 1),
+ Pair.of(6, 1)
+ );
+ dataSegments = new LocalOrderSegmentSplitter(taskIds, 32).split(new
ShuffleIndexResult(data, -1));
+ assertEquals(2, dataSegments.size());
+ assertEquals(0, dataSegments.get(0).getOffset());
+ assertEquals(32, dataSegments.get(0).getLength());
+ assertEquals(32, dataSegments.get(1).getOffset());
+ assertEquals(6, dataSegments.get(1).getLength());
+ }
+
@Test
public void testSplit() {
Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1);