This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new cf63eae4 [ISSUE-301][Subtask][Improvement][AQE] Merge continuous 
ShuffleDataSegment into single one (#303)
cf63eae4 is described below

commit cf63eae4539264f694760fdf48d882d931ac8e20
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Nov 7 11:59:47 2022 +0800

    [ISSUE-301][Subtask][Improvement][AQE] Merge continuous ShuffleDataSegment 
into single one (#303)
    
    ### What changes were proposed in this pull request?
    1. Merge continuous ShuffleDataSegment into single one
    2. Throw exception when reading the discontinuous blocks into segments 
directly
    
    ### Why are the changes needed?
    Currently, the LocalOrderSegmentSplitter will split the index file into
    multiple shuffleDataSegments. But the split scope is limited in the range 
of local order.
    
    For example:
    The blocks are as follow
    ```
    block-a (taskId-1)
    block-b (taskId-2)
    block-c (taskId-1)
    block-d (taskId-2)
    ```
    
    When the reader want to get  the range of taskIds: [1, 3), the strategy 
will return
     two shuffleDataSegments. But we'd better to merge them into single one to
    reduce the network interaction times, because they are continuous.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    1. UTs
---
 .../common/segment/LocalOrderSegmentSplitter.java  | 12 ++-
 .../segment/LocalOrderSegmentSplitterTest.java     | 88 ++++++++++++++++++++++
 2 files changed, 98 insertions(+), 2 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
 
b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index 77e02e06..0a4a669d 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -70,6 +70,7 @@ public class LocalOrderSegmentSplitter implements 
SegmentSplitter {
     long totalLen = 0;
 
     long lastTaskAttemptId = -1;
+    long lastExpectedBlockIndex = -1;
 
     /**
      * One ShuffleDataSegment should meet following requirements:
@@ -79,6 +80,7 @@ public class LocalOrderSegmentSplitter implements 
SegmentSplitter {
      * 3. ShuffleDataSegment's blocks should be continuous
      *
      */
+    int index = 0;
     while (byteBuffer.hasRemaining()) {
       try {
         long offset = byteBuffer.getLong();
@@ -98,7 +100,8 @@ public class LocalOrderSegmentSplitter implements 
SegmentSplitter {
           break;
         }
 
-        if ((taskAttemptId < lastTaskAttemptId && bufferSegments.size() > 0) 
|| bufferOffset >= readBufferSize) {
+        if ((taskAttemptId < lastTaskAttemptId && bufferSegments.size() > 0 && 
index - lastExpectedBlockIndex != 1)
+            || bufferOffset >= readBufferSize) {
           ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, 
bufferOffset, bufferSegments);
           dataFileSegments.add(sds);
           bufferSegments = Lists.newArrayList();
@@ -107,14 +110,20 @@ public class LocalOrderSegmentSplitter implements 
SegmentSplitter {
         }
 
         if (expectTaskIds.contains(taskAttemptId)) {
+          if (bufferOffset != 0 && index - lastExpectedBlockIndex > 1) {
+            throw new RssException("There are discontinuous blocks which 
should not happen when using LOCAL_ORDER.");
+          }
+
           if (fileOffset == -1) {
             fileOffset = offset;
           }
           bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, 
uncompressLength, crc, taskAttemptId));
           bufferOffset += length;
+          lastExpectedBlockIndex = index;
         }
 
         lastTaskAttemptId = taskAttemptId;
+        index++;
       } catch (BufferUnderflowException ue) {
         throw new RssException("Read index data under flow", ue);
       }
@@ -124,7 +133,6 @@ public class LocalOrderSegmentSplitter implements 
SegmentSplitter {
       ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, 
bufferOffset, bufferSegments);
       dataFileSegments.add(sds);
     }
-
     return dataFileSegments;
   }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
 
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
index 27a29814..4df956d6 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
@@ -24,14 +24,102 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class LocalOrderSegmentSplitterTest {
 
+  @Test
+  public void testSplitWithDiscontinuousBlocksShouldThrowException() {
+    Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 4);
+    LocalOrderSegmentSplitter splitter = new 
LocalOrderSegmentSplitter(taskIds, 32);
+    byte[] data = generateData(
+        Pair.of(1, 1),
+        Pair.of(1, 2),
+        Pair.of(1, 3),
+        Pair.of(1, 4)
+    );
+    try {
+      splitter.split(new ShuffleIndexResult(data, -1));
+      fail();
+    } catch (Exception e) {
+      // ignore
+    }
+  }
+
+  @Test
+  public void testSplitForMergeContinuousSegments() {
+    /**
+     * case1: (32, 5) (16, 1) (10, 1) (16, 2) (6, 1) (8, 1) (10, 3) (9, 1)
+     *
+     * It will skip the (32, 5) and merge others into one dataSegment when no 
exceeding the
+     * read buffer size.
+     */
+    Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1, 2);
+    LocalOrderSegmentSplitter splitter = new 
LocalOrderSegmentSplitter(taskIds, 1000);
+    byte[] data = generateData(
+        Pair.of(32, 5),
+        Pair.of(16, 1),
+        Pair.of(10, 1),
+        Pair.of(16, 2),
+        Pair.of(6, 1),
+        Pair.of(8, 1),
+        Pair.of(10, 3),
+        Pair.of(9, 1)
+    );
+    List<ShuffleDataSegment> dataSegments = splitter.split(new 
ShuffleIndexResult(data, -1));
+    assertEquals(2, dataSegments.size());
+    assertEquals(32, dataSegments.get(0).getOffset());
+    assertEquals(56, dataSegments.get(0).getLength());
+
+    List<BufferSegment> bufferSegments = 
dataSegments.get(0).getBufferSegments();
+    assertEquals(0, bufferSegments.get(0).getOffset());
+    assertEquals(16, bufferSegments.get(0).getLength());
+
+    assertEquals(16, bufferSegments.get(1).getOffset());
+    assertEquals(10, bufferSegments.get(1).getLength());
+
+    assertEquals(26, bufferSegments.get(2).getOffset());
+    assertEquals(16, bufferSegments.get(2).getLength());
+
+    assertEquals(42, bufferSegments.get(3).getOffset());
+    assertEquals(6, bufferSegments.get(3).getLength());
+
+    assertEquals(48, bufferSegments.get(4).getOffset());
+    assertEquals(8, bufferSegments.get(4).getLength());
+
+    assertEquals(98, dataSegments.get(1).getOffset());
+    assertEquals(9, dataSegments.get(1).getLength());
+    bufferSegments = dataSegments.get(1).getBufferSegments();
+    assertEquals(1, bufferSegments.size());
+    assertEquals(0, bufferSegments.get(0).getOffset());
+    assertEquals(9, bufferSegments.get(0).getLength());
+
+    /**
+     * case2: (16, 1) (16, 2) (6, 1)
+     *
+     * It will skip merging into one dataSegment when exceeding the
+     * read buffer size.
+     */
+    data = generateData(
+        Pair.of(16, 1),
+        Pair.of(15, 2),
+        Pair.of(1, 1),
+        Pair.of(6, 1)
+    );
+    dataSegments = new LocalOrderSegmentSplitter(taskIds, 32).split(new 
ShuffleIndexResult(data, -1));
+    assertEquals(2, dataSegments.size());
+    assertEquals(0, dataSegments.get(0).getOffset());
+    assertEquals(32, dataSegments.get(0).getLength());
+    assertEquals(32, dataSegments.get(1).getOffset());
+    assertEquals(6, dataSegments.get(1).getLength());
+  }
+
   @Test
   public void testSplit() {
     Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1);

Reply via email to