This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a9d1d64cf [CELEBORN-914][FOLLOWUP] optimize write and sort logic for 
memory storage
a9d1d64cf is described below

commit a9d1d64cf40db899e661e08523ebdb88e684a724
Author: mingji <[email protected]>
AuthorDate: Fri Jun 21 16:17:59 2024 +0800

    [CELEBORN-914][FOLLOWUP] optimize write and sort logic for memory storage
    
    ### What changes were proposed in this pull request?
    1. Optimize writer check offsets logic
    2. Optimize sort memory file logic
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA.
    
    Closes #2581 from FMX/b914-1.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../common/util/ShuffleBlockInfoUtils.java         |  36 ++++-
 .../common/util/ShuffleBlockInfoUtilsTest.java     | 146 +++++++++++++++++++++
 .../deploy/worker/storage/PartitionDataWriter.java |  24 ++--
 .../worker/storage/PartitionFilesSorter.java       |   7 +-
 4 files changed, 197 insertions(+), 16 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java
 
b/common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java
index a7f25cc48..d62bf95d3 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
 
 public class ShuffleBlockInfoUtils {
@@ -31,6 +30,13 @@ public class ShuffleBlockInfoUtils {
   public static class ShuffleBlockInfo {
     public long offset;
     public long length;
+
+    public ShuffleBlockInfo() {}
+
+    public ShuffleBlockInfo(long offset, long length) {
+      this.offset = offset;
+      this.length = length;
+    }
   }
 
   public static List<Long> getChunkOffsetsFromShuffleBlockInfos(
@@ -125,17 +131,35 @@ public class ShuffleBlockInfoUtils {
       int endMapIndex,
       Map<Integer, List<ShuffleBlockInfo>> indexMap,
       CompositeByteBuf sortedByteBuf,
-      CompositeByteBuf targetByteBuf) {
+      CompositeByteBuf targetByteBuf,
+      long shuffleChunkSize) {
+    int offset = 0;
+    int length = 0;
+    boolean blockBoundary = true;
     for (int i = startMapIndex; i < endMapIndex; i++) {
       List<ShuffleBlockInfo> blockInfos = indexMap.get(i);
       if (blockInfos != null) {
         for (ShuffleBlockInfo blockInfo : blockInfos) {
-          ByteBuf slice = sortedByteBuf.slice((int) blockInfo.offset, (int) 
blockInfo.length);
-          // Do not retain this buffer because this buffer
-          // will be released when the fileinfo is released
-          targetByteBuf.addComponent(slice);
+          if (blockBoundary) {
+            offset = (int) blockInfo.offset;
+            blockBoundary = false;
+          }
+          length += (int) blockInfo.length;
+          if (length - offset > shuffleChunkSize) {
+            // Do not retain this buffer because this buffer
+            // will be released when the fileinfo is released
+            targetByteBuf.addComponent(sortedByteBuf.slice(offset, length));
+            blockBoundary = true;
+            length = 0;
+          }
         }
       }
     }
+    // process last small block
+    if (length != 0) {
+      // Do not retain this buffer because this buffer
+      // will be released when the fileinfo is released
+      targetByteBuf.addComponent(sortedByteBuf.slice(offset, length));
+    }
   }
 }
diff --git 
a/common/src/test/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtilsTest.java
 
b/common/src/test/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtilsTest.java
new file mode 100644
index 000000000..72d7e6af5
--- /dev/null
+++ 
b/common/src/test/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtilsTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.util;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.celeborn.common.util.ShuffleBlockInfoUtils.ShuffleBlockInfo;
+
+public class ShuffleBlockInfoUtilsTest {
+  private CompositeByteBuf sortedByteBuf;
+  private CompositeByteBuf targetByteBuf;
+  private long shuffleChunkSize;
+
+  @Before
+  public void setUp() {
+    sortedByteBuf = Unpooled.compositeBuffer();
+    targetByteBuf = Unpooled.compositeBuffer();
+    shuffleChunkSize = 100L;
+  }
+
+  @Test
+  public void testSliceSortedBufferByMapRangeCase1() {
+    Map<Integer, List<ShuffleBlockInfo>> indexMap = new HashMap<>();
+
+    indexMap.put(0, Arrays.asList(new ShuffleBlockInfo(0, 50), new 
ShuffleBlockInfo(50, 30)));
+    indexMap.put(1, Arrays.asList(new ShuffleBlockInfo(0, 20), new 
ShuffleBlockInfo(20, 30)));
+
+    for (ShuffleBlockInfo blockInfo :
+        
indexMap.values().stream().flatMap(List::stream).toArray(ShuffleBlockInfo[]::new))
 {
+      byte[] data = new byte[(int) blockInfo.length];
+      Arrays.fill(data, (byte) blockInfo.offset); // Fill data with offset 
value for simplicity
+      sortedByteBuf.addComponents(Unpooled.wrappedBuffer(data));
+    }
+
+    ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
+        0, 1, indexMap, sortedByteBuf, targetByteBuf, shuffleChunkSize);
+
+    Assert.assertEquals(
+        "Unexpected number of components in target buffer", 1, 
targetByteBuf.numComponents());
+  }
+
+  @Test
+  public void testSliceSortedBufferByMapRangeCase2() {
+    Map<Integer, List<ShuffleBlockInfo>> indexMap = new HashMap<>();
+
+    indexMap.put(0, Arrays.asList(new ShuffleBlockInfo(0, 50), new 
ShuffleBlockInfo(50, 50)));
+    indexMap.put(1, Arrays.asList(new ShuffleBlockInfo(0, 20), new 
ShuffleBlockInfo(20, 30)));
+
+    for (ShuffleBlockInfo blockInfo :
+        
indexMap.values().stream().flatMap(List::stream).toArray(ShuffleBlockInfo[]::new))
 {
+      byte[] data = new byte[(int) blockInfo.length];
+      Arrays.fill(data, (byte) blockInfo.offset); // Fill data with offset 
value for simplicity
+      sortedByteBuf.addComponents(Unpooled.wrappedBuffer(data));
+    }
+
+    ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
+        0, 1, indexMap, sortedByteBuf, targetByteBuf, shuffleChunkSize);
+
+    Assert.assertEquals(
+        "Unexpected number of components in target buffer", 1, 
targetByteBuf.numComponents());
+  }
+
+  @Test
+  public void testSliceSortedBufferByMapRangeCase3() {
+    Map<Integer, List<ShuffleBlockInfo>> indexMap = new HashMap<>();
+
+    indexMap.put(0, Arrays.asList(new ShuffleBlockInfo(0, 50), new 
ShuffleBlockInfo(50, 51)));
+    indexMap.put(1, Arrays.asList(new ShuffleBlockInfo(0, 20), new 
ShuffleBlockInfo(20, 30)));
+
+    for (ShuffleBlockInfo blockInfo :
+        
indexMap.values().stream().flatMap(List::stream).toArray(ShuffleBlockInfo[]::new))
 {
+      byte[] data = new byte[(int) blockInfo.length];
+      Arrays.fill(data, (byte) blockInfo.offset); // Fill data with offset 
value for simplicity
+      sortedByteBuf.addComponents(Unpooled.wrappedBuffer(data));
+    }
+
+    ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
+        0, 1, indexMap, sortedByteBuf, targetByteBuf, shuffleChunkSize);
+
+    Assert.assertEquals(
+        "Unexpected number of components in target buffer", 1, 
targetByteBuf.numComponents());
+  }
+
+  @Test
+  public void testSliceSortedBufferByMapRangeCase4() {
+    Map<Integer, List<ShuffleBlockInfo>> indexMap = new HashMap<>();
+
+    indexMap.put(
+        0,
+        Arrays.asList(
+            new ShuffleBlockInfo(0, 50),
+            new ShuffleBlockInfo(50, 51),
+            new ShuffleBlockInfo(101, 49),
+            new ShuffleBlockInfo(150, 51)));
+    indexMap.put(1, Arrays.asList(new ShuffleBlockInfo(0, 20), new 
ShuffleBlockInfo(20, 30)));
+
+    for (ShuffleBlockInfo blockInfo :
+        
indexMap.values().stream().flatMap(List::stream).toArray(ShuffleBlockInfo[]::new))
 {
+      byte[] data = new byte[(int) blockInfo.length];
+      Arrays.fill(data, (byte) blockInfo.offset); // Fill data with offset 
value for simplicity
+      sortedByteBuf.addComponents(Unpooled.wrappedBuffer(data));
+    }
+
+    ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
+        0, 1, indexMap, sortedByteBuf, targetByteBuf, shuffleChunkSize);
+
+    Assert.assertEquals(
+        "Unexpected number of components in target buffer", 2, 
targetByteBuf.numComponents());
+  }
+
+  @Test
+  public void testSliceWithEmptyMap() {
+    Map<Integer, List<ShuffleBlockInfo>> indexMap = new HashMap<>();
+
+    ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
+        0, 0, indexMap, sortedByteBuf, targetByteBuf, shuffleChunkSize);
+
+    Assert.assertTrue(
+        "Target buffer should remain empty with an empty indexMap",
+        targetByteBuf.numComponents() == 0);
+  }
+}
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index a2f00b0a8..a39a4f9d7 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -102,6 +102,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
   private final long hdfsFlusherBufferSize;
   private Exception exception = null;
   private boolean metricsCollectCriticalEnabled;
+  private long chunkSize;
 
   public PartitionDataWriter(
       StorageManager storageManager,
@@ -125,6 +126,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     this.localFlusherBufferSize = conf.workerFlusherBufferSize();
     this.hdfsFlusherBufferSize = conf.workerHdfsFlusherBufferSize();
     this.metricsCollectCriticalEnabled = conf.metricsCollectCriticalEnabled();
+    this.chunkSize = conf.shuffleChunkSize();
 
     Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =
         storageManager.createFile(writerContext, supportInMemory);
@@ -216,16 +218,20 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
           MemoryManager.instance().incrementDiskBuffer(numBytes);
           // read flush buffer to generate correct chunk offsets
           // data header layout (mapId, attemptId, nextBatchId, length)
-          ByteBuffer headerBuf = ByteBuffer.allocate(16);
-          while (dupBuf.isReadable()) {
-            headerBuf.rewind();
-            dupBuf.readBytes(headerBuf);
-            byte[] batchHeader = headerBuf.array();
-            int compressedSize = Platform.getInt(batchHeader, 
Platform.BYTE_ARRAY_OFFSET + 12);
-            dupBuf.skipBytes(compressedSize);
-            diskFileInfo.updateBytesFlushed(compressedSize + 16);
+          if (numBytes > chunkSize) {
+            ByteBuffer headerBuf = ByteBuffer.allocate(16);
+            while (dupBuf.isReadable()) {
+              headerBuf.rewind();
+              dupBuf.readBytes(headerBuf);
+              byte[] batchHeader = headerBuf.array();
+              int compressedSize = Platform.getInt(batchHeader, 
Platform.BYTE_ARRAY_OFFSET + 12);
+              dupBuf.skipBytes(compressedSize);
+              diskFileInfo.updateBytesFlushed(compressedSize + 16);
+            }
+            dupBuf.release();
+          } else {
+            diskFileInfo.updateBytesFlushed(numBytes);
           }
-          dupBuf.release();
         } else {
           if (!isMemoryShuffleFile.get()) {
             notifier.numPendingFlushes.incrementAndGet();
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 151e1df4c..4cbd28b46 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -213,7 +213,12 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
               .getStoragePooledByteBufAllocator()
               .compositeBuffer(Integer.MAX_VALUE);
       ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
-          startMapIndex, endMapIndex, indexesMap, 
memoryFileInfo.getSortedBuffer(), targetBuffer);
+          startMapIndex,
+          endMapIndex,
+          indexesMap,
+          memoryFileInfo.getSortedBuffer(),
+          targetBuffer,
+          shuffleChunkSize);
       return new MemoryFileInfo(
           memoryFileInfo.getUserIdentifier(),
           memoryFileInfo.isPartitionSplitEnabled(),

Reply via email to