This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 098bda4c3 [CELEBORN-914][FOLLOWUP] optimize write and sort logic for
memory storage
098bda4c3 is described below
commit 098bda4c36e705d31b7c9ca19ac61b3f6e9718f8
Author: mingji <[email protected]>
AuthorDate: Fri Jun 21 16:17:59 2024 +0800
[CELEBORN-914][FOLLOWUP] optimize write and sort logic for memory storage
### What changes were proposed in this pull request?
1. Optimize writer check offsets logic
2. Optimize sort memory file logic
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes #2581 from FMX/b914-1.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit a9d1d64cf40db899e661e08523ebdb88e684a724)
Signed-off-by: Shuang <[email protected]>
---
.../common/util/ShuffleBlockInfoUtils.java | 36 ++++-
.../common/util/ShuffleBlockInfoUtilsTest.java | 146 +++++++++++++++++++++
.../deploy/worker/storage/PartitionDataWriter.java | 24 ++--
.../worker/storage/PartitionFilesSorter.java | 7 +-
4 files changed, 197 insertions(+), 16 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java
b/common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java
index a7f25cc48..d62bf95d3 100644
---
a/common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java
+++
b/common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
public class ShuffleBlockInfoUtils {
@@ -31,6 +30,13 @@ public class ShuffleBlockInfoUtils {
public static class ShuffleBlockInfo {
public long offset;
public long length;
+
+ public ShuffleBlockInfo() {}
+
+ public ShuffleBlockInfo(long offset, long length) {
+ this.offset = offset;
+ this.length = length;
+ }
}
public static List<Long> getChunkOffsetsFromShuffleBlockInfos(
@@ -125,17 +131,35 @@ public class ShuffleBlockInfoUtils {
int endMapIndex,
Map<Integer, List<ShuffleBlockInfo>> indexMap,
CompositeByteBuf sortedByteBuf,
- CompositeByteBuf targetByteBuf) {
+ CompositeByteBuf targetByteBuf,
+ long shuffleChunkSize) {
+ int offset = 0;
+ int length = 0;
+ boolean blockBoundary = true;
for (int i = startMapIndex; i < endMapIndex; i++) {
List<ShuffleBlockInfo> blockInfos = indexMap.get(i);
if (blockInfos != null) {
for (ShuffleBlockInfo blockInfo : blockInfos) {
- ByteBuf slice = sortedByteBuf.slice((int) blockInfo.offset, (int)
blockInfo.length);
- // Do not retain this buffer because this buffer
- // will be released when the fileinfo is released
- targetByteBuf.addComponent(slice);
+ if (blockBoundary) {
+ offset = (int) blockInfo.offset;
+ blockBoundary = false;
+ }
+ length += (int) blockInfo.length;
+ if (length - offset > shuffleChunkSize) {
+ // Do not retain this buffer because this buffer
+ // will be released when the fileinfo is released
+ targetByteBuf.addComponent(sortedByteBuf.slice(offset, length));
+ blockBoundary = true;
+ length = 0;
+ }
}
}
}
+ // process last small block
+ if (length != 0) {
+ // Do not retain this buffer because this buffer
+ // will be released when the fileinfo is released
+ targetByteBuf.addComponent(sortedByteBuf.slice(offset, length));
+ }
}
}
diff --git
a/common/src/test/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtilsTest.java
b/common/src/test/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtilsTest.java
new file mode 100644
index 000000000..72d7e6af5
--- /dev/null
+++
b/common/src/test/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtilsTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.util;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.celeborn.common.util.ShuffleBlockInfoUtils.ShuffleBlockInfo;
+
+public class ShuffleBlockInfoUtilsTest {
+ private CompositeByteBuf sortedByteBuf;
+ private CompositeByteBuf targetByteBuf;
+ private long shuffleChunkSize;
+
+ @Before
+ public void setUp() {
+ sortedByteBuf = Unpooled.compositeBuffer();
+ targetByteBuf = Unpooled.compositeBuffer();
+ shuffleChunkSize = 100L;
+ }
+
+ @Test
+ public void testSliceSortedBufferByMapRangeCase1() {
+ Map<Integer, List<ShuffleBlockInfo>> indexMap = new HashMap<>();
+
+ indexMap.put(0, Arrays.asList(new ShuffleBlockInfo(0, 50), new
ShuffleBlockInfo(50, 30)));
+ indexMap.put(1, Arrays.asList(new ShuffleBlockInfo(0, 20), new
ShuffleBlockInfo(20, 30)));
+
+ for (ShuffleBlockInfo blockInfo :
+
indexMap.values().stream().flatMap(List::stream).toArray(ShuffleBlockInfo[]::new))
{
+ byte[] data = new byte[(int) blockInfo.length];
+ Arrays.fill(data, (byte) blockInfo.offset); // Fill data with offset
value for simplicity
+ sortedByteBuf.addComponents(Unpooled.wrappedBuffer(data));
+ }
+
+ ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
+ 0, 1, indexMap, sortedByteBuf, targetByteBuf, shuffleChunkSize);
+
+ Assert.assertEquals(
+ "Unexpected number of components in target buffer", 1,
targetByteBuf.numComponents());
+ }
+
+ @Test
+ public void testSliceSortedBufferByMapRangeCase2() {
+ Map<Integer, List<ShuffleBlockInfo>> indexMap = new HashMap<>();
+
+ indexMap.put(0, Arrays.asList(new ShuffleBlockInfo(0, 50), new
ShuffleBlockInfo(50, 50)));
+ indexMap.put(1, Arrays.asList(new ShuffleBlockInfo(0, 20), new
ShuffleBlockInfo(20, 30)));
+
+ for (ShuffleBlockInfo blockInfo :
+
indexMap.values().stream().flatMap(List::stream).toArray(ShuffleBlockInfo[]::new))
{
+ byte[] data = new byte[(int) blockInfo.length];
+ Arrays.fill(data, (byte) blockInfo.offset); // Fill data with offset
value for simplicity
+ sortedByteBuf.addComponents(Unpooled.wrappedBuffer(data));
+ }
+
+ ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
+ 0, 1, indexMap, sortedByteBuf, targetByteBuf, shuffleChunkSize);
+
+ Assert.assertEquals(
+ "Unexpected number of components in target buffer", 1,
targetByteBuf.numComponents());
+ }
+
+ @Test
+ public void testSliceSortedBufferByMapRangeCase3() {
+ Map<Integer, List<ShuffleBlockInfo>> indexMap = new HashMap<>();
+
+ indexMap.put(0, Arrays.asList(new ShuffleBlockInfo(0, 50), new
ShuffleBlockInfo(50, 51)));
+ indexMap.put(1, Arrays.asList(new ShuffleBlockInfo(0, 20), new
ShuffleBlockInfo(20, 30)));
+
+ for (ShuffleBlockInfo blockInfo :
+
indexMap.values().stream().flatMap(List::stream).toArray(ShuffleBlockInfo[]::new))
{
+ byte[] data = new byte[(int) blockInfo.length];
+ Arrays.fill(data, (byte) blockInfo.offset); // Fill data with offset
value for simplicity
+ sortedByteBuf.addComponents(Unpooled.wrappedBuffer(data));
+ }
+
+ ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
+ 0, 1, indexMap, sortedByteBuf, targetByteBuf, shuffleChunkSize);
+
+ Assert.assertEquals(
+ "Unexpected number of components in target buffer", 1,
targetByteBuf.numComponents());
+ }
+
+ @Test
+ public void testSliceSortedBufferByMapRangeCase4() {
+ Map<Integer, List<ShuffleBlockInfo>> indexMap = new HashMap<>();
+
+ indexMap.put(
+ 0,
+ Arrays.asList(
+ new ShuffleBlockInfo(0, 50),
+ new ShuffleBlockInfo(50, 51),
+ new ShuffleBlockInfo(101, 49),
+ new ShuffleBlockInfo(150, 51)));
+ indexMap.put(1, Arrays.asList(new ShuffleBlockInfo(0, 20), new
ShuffleBlockInfo(20, 30)));
+
+ for (ShuffleBlockInfo blockInfo :
+
indexMap.values().stream().flatMap(List::stream).toArray(ShuffleBlockInfo[]::new))
{
+ byte[] data = new byte[(int) blockInfo.length];
+ Arrays.fill(data, (byte) blockInfo.offset); // Fill data with offset
value for simplicity
+ sortedByteBuf.addComponents(Unpooled.wrappedBuffer(data));
+ }
+
+ ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
+ 0, 1, indexMap, sortedByteBuf, targetByteBuf, shuffleChunkSize);
+
+ Assert.assertEquals(
+ "Unexpected number of components in target buffer", 2,
targetByteBuf.numComponents());
+ }
+
+ @Test
+ public void testSliceWithEmptyMap() {
+ Map<Integer, List<ShuffleBlockInfo>> indexMap = new HashMap<>();
+
+ ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
+ 0, 0, indexMap, sortedByteBuf, targetByteBuf, shuffleChunkSize);
+
+ Assert.assertTrue(
+ "Target buffer should remain empty with an empty indexMap",
+ targetByteBuf.numComponents() == 0);
+ }
+}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index a2f00b0a8..a39a4f9d7 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -102,6 +102,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
private final long hdfsFlusherBufferSize;
private Exception exception = null;
private boolean metricsCollectCriticalEnabled;
+ private long chunkSize;
public PartitionDataWriter(
StorageManager storageManager,
@@ -125,6 +126,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
this.localFlusherBufferSize = conf.workerFlusherBufferSize();
this.hdfsFlusherBufferSize = conf.workerHdfsFlusherBufferSize();
this.metricsCollectCriticalEnabled = conf.metricsCollectCriticalEnabled();
+ this.chunkSize = conf.shuffleChunkSize();
Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =
storageManager.createFile(writerContext, supportInMemory);
@@ -216,16 +218,20 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
MemoryManager.instance().incrementDiskBuffer(numBytes);
// read flush buffer to generate correct chunk offsets
// data header layout (mapId, attemptId, nextBatchId, length)
- ByteBuffer headerBuf = ByteBuffer.allocate(16);
- while (dupBuf.isReadable()) {
- headerBuf.rewind();
- dupBuf.readBytes(headerBuf);
- byte[] batchHeader = headerBuf.array();
- int compressedSize = Platform.getInt(batchHeader,
Platform.BYTE_ARRAY_OFFSET + 12);
- dupBuf.skipBytes(compressedSize);
- diskFileInfo.updateBytesFlushed(compressedSize + 16);
+ if (numBytes > chunkSize) {
+ ByteBuffer headerBuf = ByteBuffer.allocate(16);
+ while (dupBuf.isReadable()) {
+ headerBuf.rewind();
+ dupBuf.readBytes(headerBuf);
+ byte[] batchHeader = headerBuf.array();
+ int compressedSize = Platform.getInt(batchHeader,
Platform.BYTE_ARRAY_OFFSET + 12);
+ dupBuf.skipBytes(compressedSize);
+ diskFileInfo.updateBytesFlushed(compressedSize + 16);
+ }
+ dupBuf.release();
+ } else {
+ diskFileInfo.updateBytesFlushed(numBytes);
}
- dupBuf.release();
} else {
if (!isMemoryShuffleFile.get()) {
notifier.numPendingFlushes.incrementAndGet();
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 151e1df4c..4cbd28b46 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -213,7 +213,12 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
.getStoragePooledByteBufAllocator()
.compositeBuffer(Integer.MAX_VALUE);
ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
- startMapIndex, endMapIndex, indexesMap,
memoryFileInfo.getSortedBuffer(), targetBuffer);
+ startMapIndex,
+ endMapIndex,
+ indexesMap,
+ memoryFileInfo.getSortedBuffer(),
+ targetBuffer,
+ shuffleChunkSize);
return new MemoryFileInfo(
memoryFileInfo.getUserIdentifier(),
memoryFileInfo.isPartitionSplitEnabled(),