This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7fa1d32a9 [CELEBORN-1374] Refactor SortBuffer and PartitionSortedBuffer
7fa1d32a9 is described below
commit 7fa1d32a98ed98c147490d470fc687a971b21715
Author: SteNicholas <[email protected]>
AuthorDate: Tue Apr 9 15:47:57 2024 +0800
[CELEBORN-1374] Refactor SortBuffer and PartitionSortedBuffer
### What changes were proposed in this pull request?
Refactor `SortBuffer` and `PartitionSortedBuffer` with introduction of
`DataBuffer` and `SortBasedDataBuffer`.
### Why are the changes needed?
`SortBuffer` and `PartitionSortedBuffer` is refactored in
https://github.com/apache/flink/pull/18505. Celeborn Flink should also refactor
`SortBuffer` and `PartitionSortedBuffer` to sync the interface changes in
Flink. Meanwhile, `SortBuffer` and `PartitionSortedBuffer` should distinguish
channel and subpartition for https://github.com/apache/flink/pull/23927.
### Does this PR introduce _any_ user-facing change?
- `SortBuffer` renames to `DataBuffer`.
- `PartitionSortedBuffer` renames to `SortBasedDataBuffer`.
- `SortBuffer.BufferWithChannel` renames to `BufferWithSubpartition`
### How was this patch tested?
UT and IT.
Closes #2448 from SteNicholas/CELEBORN-1374.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
LICENSE | 4 +-
.../RemoteShuffleResultPartitionDelegation.java | 136 ++++++++---------
.../flink/buffer/BufferWithSubpartition.java | 43 ++++++
.../celeborn/plugin/flink/buffer/DataBuffer.java | 74 +++++++++
...nSortedBuffer.java => SortBasedDataBuffer.java} | 100 +++++++------
.../celeborn/plugin/flink/buffer/SortBuffer.java | 92 ------------
...rSuiteJ.java => SortBasedDataBufferSuiteJ.java} | 165 +++++++++++----------
.../plugin/flink/RemoteShuffleResultPartition.java | 27 ++--
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 8 +-
.../plugin/flink/RemoteShuffleResultPartition.java | 26 ++--
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 8 +-
.../plugin/flink/RemoteShuffleResultPartition.java | 28 ++--
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 8 +-
.../plugin/flink/RemoteShuffleResultPartition.java | 28 ++--
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 8 +-
.../plugin/flink/RemoteShuffleResultPartition.java | 28 ++--
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 8 +-
17 files changed, 396 insertions(+), 395 deletions(-)
diff --git a/LICENSE b/LICENSE
index 30b865ffc..3c8858ec9 100644
--- a/LICENSE
+++ b/LICENSE
@@ -253,6 +253,6 @@ Remote Shuffle Service for Flink
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferRecycler.java
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/CreditListener.java
-./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
-./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
+./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java
+./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
index 703fcf10f..1bd4285ee 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
@@ -34,8 +34,9 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.celeborn.plugin.flink.buffer.PartitionSortedBuffer;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
+import org.apache.celeborn.plugin.flink.buffer.SortBasedDataBuffer;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
@@ -46,11 +47,11 @@ public class RemoteShuffleResultPartitionDelegation {
/** Size of network buffer and write buffer. */
public int networkBufferSize;
- /** {@link SortBuffer} for records sent by broadcastRecord. */
- public SortBuffer broadcastSortBuffer;
+ /** {@link DataBuffer} for records sent by broadcastRecord. */
+ public DataBuffer broadcastDataBuffer;
- /** {@link SortBuffer} for records sent by emitRecord. */
- public SortBuffer unicastSortBuffer;
+ /** {@link DataBuffer} for records sent by emitRecord. */
+ public DataBuffer unicastDataBuffer;
/** Utility to spill data to shuffle workers. */
public RemoteShuffleOutputGate outputGate;
@@ -58,17 +59,17 @@ public class RemoteShuffleResultPartitionDelegation {
/** Whether notifyEndOfData has been called or not. */
private boolean endOfDataNotified;
- private int numSubpartitions;
+ private final int numSubpartitions;
private BufferPool bufferPool;
private BufferCompressor bufferCompressor;
private Function<Buffer, Boolean> canBeCompressed;
private Runnable checkProducerState;
- private BiConsumer<SortBuffer.BufferWithChannel, Boolean> statisticsConsumer;
+ private final BiConsumer<BufferWithSubpartition, Boolean> statisticsConsumer;
public RemoteShuffleResultPartitionDelegation(
int networkBufferSize,
RemoteShuffleOutputGate outputGate,
- BiConsumer<SortBuffer.BufferWithChannel, Boolean> statisticsConsumer,
+ BiConsumer<BufferWithSubpartition, Boolean> statisticsConsumer,
int numSubpartitions) {
this.networkBufferSize = networkBufferSize;
this.outputGate = outputGate;
@@ -105,92 +106,92 @@ public class RemoteShuffleResultPartitionDelegation {
targetSubpartition == 0, "Target subpartition index can only be 0
when broadcast.");
}
- SortBuffer sortBuffer = isBroadcast ? getBroadcastSortBuffer() :
getUnicastSortBuffer();
- if (sortBuffer.append(record, targetSubpartition, dataType)) {
+ DataBuffer dataBuffer = isBroadcast ? getBroadcastDataBuffer() :
getUnicastDataBuffer();
+ if (dataBuffer.append(record, targetSubpartition, dataType)) {
return;
}
try {
- if (!sortBuffer.hasRemaining()) {
- // the record can not be appended to the free sort buffer because it
is too large
- sortBuffer.finish();
- sortBuffer.release();
+ if (!dataBuffer.hasRemaining()) {
+ // the record can not be appended to the free data buffer because it
is too large
+ dataBuffer.finish();
+ dataBuffer.release();
writeLargeRecord(record, targetSubpartition, dataType, isBroadcast);
return;
}
- flushSortBuffer(sortBuffer, isBroadcast);
+ flushDataBuffer(dataBuffer, isBroadcast);
} catch (InterruptedException e) {
- LOG.error("Failed to flush the sort buffer.", e);
+ LOG.error("Failed to flush the data buffer.", e);
Utils.rethrowAsRuntimeException(e);
}
emit(record, targetSubpartition, dataType, isBroadcast);
}
@VisibleForTesting
- public SortBuffer getUnicastSortBuffer() throws IOException {
- flushBroadcastSortBuffer();
+ public DataBuffer getUnicastDataBuffer() throws IOException {
+ flushBroadcastDataBuffer();
- if (unicastSortBuffer != null && !unicastSortBuffer.isFinished()) {
- return unicastSortBuffer;
+ if (unicastDataBuffer != null && !unicastDataBuffer.isFinished()) {
+ return unicastDataBuffer;
}
- unicastSortBuffer =
- new PartitionSortedBuffer(bufferPool, numSubpartitions,
networkBufferSize, null);
- return unicastSortBuffer;
+ unicastDataBuffer =
+ new SortBasedDataBuffer(bufferPool, numSubpartitions,
networkBufferSize, null);
+ return unicastDataBuffer;
}
- public SortBuffer getBroadcastSortBuffer() throws IOException {
- flushUnicastSortBuffer();
+ public DataBuffer getBroadcastDataBuffer() throws IOException {
+ flushUnicastDataBuffer();
- if (broadcastSortBuffer != null && !broadcastSortBuffer.isFinished()) {
- return broadcastSortBuffer;
+ if (broadcastDataBuffer != null && !broadcastDataBuffer.isFinished()) {
+ return broadcastDataBuffer;
}
- broadcastSortBuffer =
- new PartitionSortedBuffer(bufferPool, numSubpartitions,
networkBufferSize, null);
- return broadcastSortBuffer;
+ broadcastDataBuffer =
+ new SortBasedDataBuffer(bufferPool, numSubpartitions,
networkBufferSize, null);
+ return broadcastDataBuffer;
}
- public void flushBroadcastSortBuffer() throws IOException {
- flushSortBuffer(broadcastSortBuffer, true);
+ public void flushBroadcastDataBuffer() throws IOException {
+ flushDataBuffer(broadcastDataBuffer, true);
}
- public void flushUnicastSortBuffer() throws IOException {
- flushSortBuffer(unicastSortBuffer, false);
+ public void flushUnicastDataBuffer() throws IOException {
+ flushDataBuffer(unicastDataBuffer, false);
}
@VisibleForTesting
- void flushSortBuffer(SortBuffer sortBuffer, boolean isBroadcast) throws
IOException {
- if (sortBuffer == null || sortBuffer.isReleased()) {
+ void flushDataBuffer(DataBuffer dataBuffer, boolean isBroadcast) throws
IOException {
+ if (dataBuffer == null || dataBuffer.isReleased()) {
return;
}
- sortBuffer.finish();
- if (sortBuffer.hasRemaining()) {
+ dataBuffer.finish();
+ if (dataBuffer.hasRemaining()) {
try {
outputGate.regionStart(isBroadcast);
- while (sortBuffer.hasRemaining()) {
+ while (dataBuffer.hasRemaining()) {
MemorySegment segment =
outputGate.getBufferPool().requestMemorySegmentBlocking();
- SortBuffer.BufferWithChannel bufferWithChannel;
+ BufferWithSubpartition bufferWithSubpartition;
try {
- bufferWithChannel =
- sortBuffer.copyIntoSegment(
+ bufferWithSubpartition =
+ dataBuffer.getNextBuffer(
segment, outputGate.getBufferPool(),
BufferUtils.HEADER_LENGTH);
} catch (Throwable t) {
outputGate.getBufferPool().recycle(segment);
throw new FlinkRuntimeException("Shuffle write failure.", t);
}
- Buffer buffer = bufferWithChannel.getBuffer();
- int subpartitionIndex = bufferWithChannel.getChannelIndex();
- statisticsConsumer.accept(bufferWithChannel, isBroadcast);
+ Buffer buffer = bufferWithSubpartition.getBuffer();
+ int subpartitionIndex =
bufferWithSubpartition.getSubpartitionIndex();
+ statisticsConsumer.accept(bufferWithSubpartition, isBroadcast);
writeCompressedBufferIfPossible(buffer, subpartitionIndex);
}
outputGate.regionFinish();
} catch (InterruptedException e) {
- throw new IOException("Failed to flush the sort buffer, broadcast=" +
isBroadcast, e);
+ throw new IOException("Failed to flush the data buffer, broadcast=" +
isBroadcast, e);
}
}
- releaseSortBuffer(sortBuffer);
+ releaseDataBuffer(dataBuffer);
}
public void writeCompressedBufferIfPossible(Buffer buffer, int
targetSubpartition)
@@ -234,9 +235,9 @@ public class RemoteShuffleResultPartitionDelegation {
dataType,
toCopy + BufferUtils.HEADER_LENGTH);
- SortBuffer.BufferWithChannel bufferWithChannel =
- new SortBuffer.BufferWithChannel(buffer, targetSubpartition);
- statisticsConsumer.accept(bufferWithChannel, isBroadcast);
+ BufferWithSubpartition bufferWithSubpartition =
+ new BufferWithSubpartition(buffer, targetSubpartition);
+ statisticsConsumer.accept(bufferWithSubpartition, isBroadcast);
writeCompressedBufferIfPossible(buffer, targetSubpartition);
}
outputGate.regionFinish();
@@ -246,17 +247,17 @@ public class RemoteShuffleResultPartitionDelegation {
emit(record, 0, dataType, true);
}
- public void releaseSortBuffer(SortBuffer sortBuffer) {
- if (sortBuffer != null) {
- sortBuffer.release();
+ public void releaseDataBuffer(DataBuffer dataBuffer) {
+ if (dataBuffer != null) {
+ dataBuffer.release();
}
}
public void finish() throws IOException {
Utils.checkState(
- unicastSortBuffer == null || unicastSortBuffer.isReleased(),
- "The unicast sort buffer should be either null or released.");
- flushBroadcastSortBuffer();
+ unicastDataBuffer == null || unicastDataBuffer.isReleased(),
+ "The unicast data buffer should be either null or released.");
+ flushBroadcastDataBuffer();
try {
outputGate.finish();
} catch (InterruptedException e) {
@@ -265,22 +266,21 @@ public class RemoteShuffleResultPartitionDelegation {
}
public synchronized void close(Runnable closeHandler) {
- Throwable closeException = null;
+ Throwable closeException;
closeException =
checkException(
- () -> releaseSortBuffer(unicastSortBuffer),
- closeException,
- "Failed to release unicast sort buffer.");
+ () -> releaseDataBuffer(unicastDataBuffer),
+ null,
+ "Failed to release unicast data buffer.");
closeException =
checkException(
- () -> releaseSortBuffer(broadcastSortBuffer),
+ () -> releaseDataBuffer(broadcastDataBuffer),
closeException,
- "Failed to release broadcast sort buffer.");
+ "Failed to release broadcast data buffer.");
closeException =
- checkException(
- () -> closeHandler.run(), closeException, "Failed to call
super#close() method.");
+ checkException(closeHandler, closeException, "Failed to call
super#close() method.");
try {
outputGate.close();
@@ -307,10 +307,10 @@ public class RemoteShuffleResultPartitionDelegation {
public void flushAll() {
try {
- flushUnicastSortBuffer();
- flushBroadcastSortBuffer();
+ flushUnicastDataBuffer();
+ flushBroadcastDataBuffer();
} catch (Throwable t) {
- LOG.error("Failed to flush the current sort buffer.", t);
+ LOG.error("Failed to flush the current data buffer.", t);
Utils.rethrowAsRuntimeException(t);
}
}
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferWithSubpartition.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferWithSubpartition.java
new file mode 100644
index 000000000..2d2f767a9
--- /dev/null
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferWithSubpartition.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.buffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+/** Buffer and the corresponding subpartition index. */
+public class BufferWithSubpartition {
+
+ private final Buffer buffer;
+
+ private final int subpartitionIndex;
+
+ public BufferWithSubpartition(Buffer buffer, int subpartitionIndex) {
+ this.buffer = checkNotNull(buffer);
+ this.subpartitionIndex = subpartitionIndex;
+ }
+
+ public Buffer getBuffer() {
+ return buffer;
+ }
+
+ public int getSubpartitionIndex() {
+ return subpartitionIndex;
+ }
+}
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java
new file mode 100644
index 000000000..562db900f
--- /dev/null
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import javax.annotation.Nullable;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+/**
+ * Data of different subpartitions can be appended to a {@link DataBuffer} and
after the {@link
+ * DataBuffer} is full or finished, the appended data can be copied from it in
subpartition index
+ * order.
+ *
+ * <p>The lifecycle of a {@link DataBuffer} can be: new, write, [read, reset,
write], finish, read,
+ * release. There can be multiple [read, reset, write] operations before
finish.
+ */
+public interface DataBuffer {
+
+ /**
+ * Appends data of the specified subpartition to this {@link DataBuffer} and
returns true if this
+ * {@link DataBuffer} is full.
+ */
+ boolean append(ByteBuffer source, int targetSubpartition, Buffer.DataType
dataType)
+ throws IOException;
+
+ /**
+ * Copies data in this {@link DataBuffer} to the target {@link
MemorySegment} in subpartition
+ * index order and returns {@link BufferWithSubpartition} which contains the
copied data and the
+ * corresponding subpartition index.
+ */
+ BufferWithSubpartition getNextBuffer(
+ @Nullable MemorySegment transitBuffer, BufferRecycler recycler, int
offset);
+
+ /** Returns the total number of records written to this {@link DataBuffer}.
*/
+ long numTotalRecords();
+
+ /** Returns the total number of bytes written to this {@link DataBuffer}. */
+ long numTotalBytes();
+
+ /** Returns true if not all data appended to this {@link DataBuffer} is
consumed. */
+ boolean hasRemaining();
+
+ /** Finishes this {@link DataBuffer} which means no record can be appended
anymore. */
+ void finish();
+
+ /** Whether this {@link DataBuffer} is finished or not. */
+ boolean isFinished();
+
+ /** Releases this {@link DataBuffer} which releases all resources. */
+ void release();
+
+ /** Whether this {@link DataBuffer} is released or not. */
+ boolean isReleased();
+}
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java
similarity index 79%
rename from
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
rename to
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java
index c11e5d373..fafc2c8cd 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java
@@ -38,19 +38,20 @@ import
org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.FlinkRuntimeException;
/**
- * A {@link SortBuffer} implementation which sorts all appended records only
by subpartition index.
+ * A {@link DataBuffer} implementation which sorts all appended records only
by subpartition index.
* Records of the same subpartition keep the appended order.
*
* <p>It maintains a list of {@link MemorySegment}s as a joint buffer. Data
will be appended to the
* joint buffer sequentially. When writing a record, an index entry will be
appended first. An index
- * entry consists of 4 fields: 4 bytes for record length, 4 bytes for {@link
DataType} and 8 bytes
- * for address pointing to the next index entry of the same channel which will
be used to index the
- * next record to read when coping data from this {@link SortBuffer}. For
simplicity, no index entry
- * can span multiple segments. The corresponding record data is seated right
after its index entry
- * and different from the index entry, records have variable length thus may
span multiple segments.
+ * entry consists of 4 fields: 4 bytes for record length, 4 bytes for {@link
Buffer.DataType} and 8
+ * bytes for address pointing to the next index entry of the same subpartition
which will be used to
+ * index the next record to read when coping data from this {@link
DataBuffer}. For simplicity, no
+ * index entry can span multiple segments. The corresponding record data is
seated right after its
+ * index entry and different from the index entry, records have variable
length thus may span
+ * multiple segments.
*/
@NotThreadSafe
-public class PartitionSortedBuffer implements SortBuffer {
+public class SortBasedDataBuffer implements DataBuffer {
/**
* Size of an index entry: 4 bytes for record length, 4 bytes for data type
and 8 bytes for
@@ -71,25 +72,25 @@ public class PartitionSortedBuffer implements SortBuffer {
private final long[] lastIndexEntryAddresses;
/** Size of buffers requested from buffer pool. All buffers must be of the
same size. */
private final int bufferSize;
- /** Data of different subpartitions in this sort buffer will be read in this
order. */
+ /** Data of different subpartitions in this data buffer will be read in this
order. */
private final int[] subpartitionReadOrder;
//
---------------------------------------------------------------------------------------------
// Statistics and states
//
---------------------------------------------------------------------------------------------
- /** Total number of bytes already appended to this sort buffer. */
+ /** Total number of bytes already appended to this data buffer. */
private long numTotalBytes;
- /** Total number of records already appended to this sort buffer. */
+ /** Total number of records already appended to this data buffer. */
private long numTotalRecords;
- /** Total number of bytes already read from this sort buffer. */
+ /** Total number of bytes already read from this data buffer. */
private long numTotalBytesRead;
- /** Whether this sort buffer is finished. One can only read a finished sort
buffer. */
+ /** Whether this data buffer is finished. One can only read a finished data
buffer. */
private boolean isFinished;
//
---------------------------------------------------------------------------------------------
// For writing
//
---------------------------------------------------------------------------------------------
- /** Whether this sort buffer is released. A released sort buffer can not be
used. */
+ /** Whether this data buffer is released. A released data buffer can not be
used. */
private boolean isReleased;
/** Array index in the segment list of the current available buffer for
writing. */
private int writeSegmentIndex;
@@ -105,10 +106,10 @@ public class PartitionSortedBuffer implements SortBuffer {
/** Record bytes remaining after last copy, which must be read first in next
copy. */
private int recordRemainingBytes;
- /** Used to index the current available channel to read data from. */
+ /** Used to index the current available subpartition to read data from. */
private int readOrderIndex = -1;
- public PartitionSortedBuffer(
+ public SortBasedDataBuffer(
BufferPool bufferPool,
int numSubpartitions,
int bufferSize,
@@ -120,7 +121,7 @@ public class PartitionSortedBuffer implements SortBuffer {
this.firstIndexEntryAddresses = new long[numSubpartitions];
this.lastIndexEntryAddresses = new long[numSubpartitions];
- // initialized with -1 means the corresponding channel has no data.
+ // initialized with -1 means the corresponding subpartition has no data.
Arrays.fill(firstIndexEntryAddresses, -1L);
Arrays.fill(lastIndexEntryAddresses, -1L);
@@ -129,18 +130,18 @@ public class PartitionSortedBuffer implements SortBuffer {
checkArgument(customReadOrder.length == numSubpartitions, "Illegal data
read order.");
System.arraycopy(customReadOrder, 0, this.subpartitionReadOrder, 0,
numSubpartitions);
} else {
- for (int channel = 0; channel < numSubpartitions; ++channel) {
- this.subpartitionReadOrder[channel] = channel;
+ for (int subpartition = 0; subpartition < numSubpartitions;
++subpartition) {
+ this.subpartitionReadOrder[subpartition] = subpartition;
}
}
}
@Override
- public boolean append(ByteBuffer source, int targetChannel, DataType
dataType)
+ public boolean append(ByteBuffer source, int targetSubpartition, DataType
dataType)
throws IOException {
checkArgument(source.hasRemaining(), "Cannot append empty data.");
- checkState(!isFinished, "Sort buffer is already finished.");
- checkState(!isReleased, "Sort buffer is already released.");
+ checkState(!isFinished, "Data buffer is already finished.");
+ checkState(!isReleased, "Data buffer is already released.");
int totalBytes = source.remaining();
@@ -150,7 +151,7 @@ public class PartitionSortedBuffer implements SortBuffer {
}
// write the index entry and record or event data
- writeIndex(targetChannel, totalBytes, dataType);
+ writeIndex(targetSubpartition, totalBytes, dataType);
writeRecord(source);
++numTotalRecords;
@@ -159,7 +160,7 @@ public class PartitionSortedBuffer implements SortBuffer {
return true;
}
- private void writeIndex(int channelIndex, int numRecordBytes, DataType
dataType) {
+ private void writeIndex(int subpartitionIndex, int numRecordBytes, DataType
dataType) {
MemorySegment segment = buffers.get(writeSegmentIndex);
// record length takes the high 32 bits and data type takes the low 32 bits
@@ -168,15 +169,15 @@ public class PartitionSortedBuffer implements SortBuffer {
// segment index takes the high 32 bits and segment offset takes the low
32 bits
long indexEntryAddress = ((long) writeSegmentIndex << 32) |
writeSegmentOffset;
- long lastIndexEntryAddress = lastIndexEntryAddresses[channelIndex];
- lastIndexEntryAddresses[channelIndex] = indexEntryAddress;
+ long lastIndexEntryAddress = lastIndexEntryAddresses[subpartitionIndex];
+ lastIndexEntryAddresses[subpartitionIndex] = indexEntryAddress;
if (lastIndexEntryAddress >= 0) {
- // link the previous index entry of the given channel to the new index
entry
+ // link the previous index entry of the given subpartition to the new
index entry
segment = buffers.get(getSegmentIndexFromPointer(lastIndexEntryAddress));
segment.putLong(getSegmentOffsetFromPointer(lastIndexEntryAddress) + 8,
indexEntryAddress);
} else {
- firstIndexEntryAddresses[channelIndex] = indexEntryAddress;
+ firstIndexEntryAddresses[subpartitionIndex] = indexEntryAddress;
}
// move the writer position forward to write the corresponding record
@@ -232,7 +233,7 @@ public class PartitionSortedBuffer implements SortBuffer {
if (isReleased) {
bufferPool.recycle(segment);
- throw new IllegalStateException("Sort buffer is already released.");
+ throw new IllegalStateException("Data buffer is already released.");
}
buffers.add(segment);
@@ -262,15 +263,15 @@ public class PartitionSortedBuffer implements SortBuffer {
}
@Override
- public BufferWithChannel copyIntoSegment(
- MemorySegment target, BufferRecycler recycler, int offset) {
+ public BufferWithSubpartition getNextBuffer(
+ MemorySegment transitBuffer, BufferRecycler recycler, int offset) {
checkState(hasRemaining(), "No data remaining.");
- checkState(isFinished, "Should finish the sort buffer first before coping
any data.");
- checkState(!isReleased, "Sort buffer is already released.");
+ checkState(isFinished, "Should finish the data buffer first before coping
any data.");
+ checkState(!isReleased, "Data buffer is already released.");
int numBytesCopied = 0;
DataType bufferDataType = DataType.DATA_BUFFER;
- int channelIndex = subpartitionReadOrder[readOrderIndex];
+ int subpartitionIndex = subpartitionReadOrder[readOrderIndex];
do {
int sourceSegmentIndex =
getSegmentIndexFromPointer(readIndexEntryAddress);
@@ -292,27 +293,32 @@ public class PartitionSortedBuffer implements SortBuffer {
sourceSegmentOffset += INDEX_ENTRY_SIZE;
// throws if the event is too big to be accommodated by a buffer.
- if (bufferDataType.isEvent() && target.size() - offset < length) {
+ if (bufferDataType.isEvent() && transitBuffer.size() - offset < length) {
throw new FlinkRuntimeException("Event is too big to be accommodated
by a buffer");
}
numBytesCopied +=
copyRecordOrEvent(
- target, numBytesCopied + offset, sourceSegmentIndex,
sourceSegmentOffset, length);
+ transitBuffer,
+ numBytesCopied + offset,
+ sourceSegmentIndex,
+ sourceSegmentOffset,
+ length);
if (recordRemainingBytes == 0) {
- // move to next channel if the current channel has been finished
- if (readIndexEntryAddress == lastIndexEntryAddresses[channelIndex]) {
+ // move to next subpartition if the current subpartition has been
finished
+ if (readIndexEntryAddress ==
lastIndexEntryAddresses[subpartitionIndex]) {
updateReadChannelAndIndexEntryAddress();
break;
}
readIndexEntryAddress = nextReadIndexEntryAddress;
}
- } while (numBytesCopied < target.size() - offset &&
bufferDataType.isBuffer());
+ } while (numBytesCopied < transitBuffer.size() - offset &&
bufferDataType.isBuffer());
numTotalBytesRead += numBytesCopied;
- Buffer buffer = new NetworkBuffer(target, recycler, bufferDataType,
numBytesCopied + offset);
- return new BufferWithChannel(buffer, channelIndex);
+ Buffer buffer =
+ new NetworkBuffer(transitBuffer, recycler, bufferDataType,
numBytesCopied + offset);
+ return new BufferWithSubpartition(buffer, subpartitionIndex);
}
private int copyRecordOrEvent(
@@ -354,10 +360,10 @@ public class PartitionSortedBuffer implements SortBuffer {
}
private void updateReadChannelAndIndexEntryAddress() {
- // skip the channels without any data
+ // skip the subpartitions without any data
while (++readOrderIndex < firstIndexEntryAddresses.length) {
- int channelIndex = subpartitionReadOrder[readOrderIndex];
- if ((readIndexEntryAddress = firstIndexEntryAddresses[channelIndex]) >=
0) {
+ int subpartitionIndex = subpartitionReadOrder[readOrderIndex];
+ if ((readIndexEntryAddress =
firstIndexEntryAddresses[subpartitionIndex]) >= 0) {
break;
}
}
@@ -372,12 +378,12 @@ public class PartitionSortedBuffer implements SortBuffer {
}
@Override
- public long numRecords() {
+ public long numTotalRecords() {
return numTotalRecords;
}
@Override
- public long numBytes() {
+ public long numTotalBytes() {
return numTotalBytes;
}
@@ -388,7 +394,7 @@ public class PartitionSortedBuffer implements SortBuffer {
@Override
public void finish() {
- checkState(!isFinished, SortBuffer.class.getCanonicalName() + " is already
finished.");
+ checkState(!isFinished, DataBuffer.class.getCanonicalName() + " is already
finished.");
isFinished = true;
@@ -403,7 +409,7 @@ public class PartitionSortedBuffer implements SortBuffer {
@Override
public void release() {
- // the sort buffer can be released by other threads
+ // the data buffer can be released by other threads
if (isReleased) {
return;
}
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
deleted file mode 100644
index 7318d6bb1..000000000
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.plugin.flink.buffer;
-
-import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-/**
- * Data of different channels can be appended to a {@link SortBuffer}., after
appending finished,
- * data can be copied from it in channel index order.
- */
-public interface SortBuffer {
-
- /**
- * Appends data of the specified channel to this {@link SortBuffer} and
returns true if all bytes
- * of the source buffer is copied to this {@link SortBuffer} successfully,
otherwise if returns
- * false, nothing will be copied.
- */
- boolean append(ByteBuffer source, int targetChannel, Buffer.DataType
dataType) throws IOException;
-
- /**
- * Copies data from this {@link SortBuffer} to the target {@link
MemorySegment} in channel index
- * order and returns {@link BufferWithChannel} which contains the copied
data and the
- * corresponding channel index.
- */
- BufferWithChannel copyIntoSegment(MemorySegment target, BufferRecycler
recycler, int offset);
-
- /** Returns the number of records written to this {@link SortBuffer}. */
- long numRecords();
-
- /** Returns the number of bytes written to this {@link SortBuffer}. */
- long numBytes();
-
- /** Returns true if there is still data can be consumed in this {@link
SortBuffer}. */
- boolean hasRemaining();
-
- /** Finishes this {@link SortBuffer} which means no record can be appended
anymore. */
- void finish();
-
- /** Whether this {@link SortBuffer} is finished or not. */
- boolean isFinished();
-
- /** Releases this {@link SortBuffer} which releases all resources. */
- void release();
-
- /** Whether this {@link SortBuffer} is released or not. */
- boolean isReleased();
-
- /** Buffer and the corresponding channel index returned to reader. */
- class BufferWithChannel {
-
- private final Buffer buffer;
-
- private final int channelIndex;
-
- public BufferWithChannel(Buffer buffer, int channelIndex) {
- this.buffer = checkNotNull(buffer);
- this.channelIndex = channelIndex;
- }
-
- /** Get {@link Buffer}. */
- public Buffer getBuffer() {
- return buffer;
- }
-
- /** Get channel index. */
- public int getChannelIndex() {
- return channelIndex;
- }
- }
-}
diff --git
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PartitionSortedBufferSuiteJ.java
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/SortBasedDataBufferSuiteJ.java
similarity index 66%
rename from
client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PartitionSortedBufferSuiteJ.java
rename to
client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/SortBasedDataBufferSuiteJ.java
index b1f5ced91..e68ebda99 100644
---
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PartitionSortedBufferSuiteJ.java
+++
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/SortBasedDataBufferSuiteJ.java
@@ -38,18 +38,19 @@ import
org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.junit.Test;
-import org.apache.celeborn.plugin.flink.buffer.PartitionSortedBuffer;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
+import org.apache.celeborn.plugin.flink.buffer.SortBasedDataBuffer;
-public class PartitionSortedBufferSuiteJ {
+public class SortBasedDataBufferSuiteJ {
@Test
- public void testWriteAndReadSortBuffer() throws Exception {
+ public void testWriteAndReadDataBuffer() throws Exception {
int numSubpartitions = 10;
int bufferSize = 1024;
int bufferPoolSize = 1000;
Random random = new Random(1111);
- // used to store data written to and read from sort buffer for correctness
check
+ // used to store data written to and read from data buffer for correctness
check
Queue<DataAndType>[] dataWritten = new Queue[numSubpartitions];
Queue<Buffer>[] buffersRead = new Queue[numSubpartitions];
for (int i = 0; i < numSubpartitions; ++i) {
@@ -62,10 +63,10 @@ public class PartitionSortedBufferSuiteJ {
Arrays.fill(numBytesWritten, 0);
Arrays.fill(numBytesRead, 0);
- // fill the sort buffer with randomly generated data
+ // fill the data buffer with randomly generated data
int totalBytesWritten = 0;
- SortBuffer sortBuffer =
- createSortBuffer(
+ DataBuffer dataBuffer =
+ createDataBuffer(
bufferPoolSize,
bufferSize,
numSubpartitions,
@@ -86,8 +87,8 @@ public class PartitionSortedBufferSuiteJ {
boolean isBuffer = random.nextBoolean() || recordSize > bufferSize;
Buffer.DataType dataType =
isBuffer ? Buffer.DataType.DATA_BUFFER :
Buffer.DataType.EVENT_BUFFER;
- if (!sortBuffer.append(record, subpartition, dataType)) {
- sortBuffer.finish();
+ if (!dataBuffer.append(record, subpartition, dataType)) {
+ dataBuffer.finish();
break;
}
record.rewind();
@@ -96,17 +97,17 @@ public class PartitionSortedBufferSuiteJ {
totalBytesWritten += recordSize;
}
- // read all data from the sort buffer
- while (sortBuffer.hasRemaining()) {
+ // read all data from the data buffer
+ while (dataBuffer.hasRemaining()) {
MemorySegment readBuffer =
MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
- SortBuffer.BufferWithChannel bufferAndChannel =
- sortBuffer.copyIntoSegment(readBuffer, ignore -> {}, 0);
- int subpartition = bufferAndChannel.getChannelIndex();
- buffersRead[subpartition].add(bufferAndChannel.getBuffer());
- numBytesRead[subpartition] +=
bufferAndChannel.getBuffer().readableBytes();
+ BufferWithSubpartition bufferWithSubpartition =
+ dataBuffer.getNextBuffer(readBuffer, ignore -> {}, 0);
+ int subpartition = bufferWithSubpartition.getSubpartitionIndex();
+ buffersRead[subpartition].add(bufferWithSubpartition.getBuffer());
+ numBytesRead[subpartition] +=
bufferWithSubpartition.getBuffer().readableBytes();
}
- assertEquals(totalBytesWritten, sortBuffer.numBytes());
+ assertEquals(totalBytesWritten, dataBuffer.numTotalBytes());
checkWriteReadResult(numSubpartitions, numBytesWritten, numBytesRead,
dataWritten, buffersRead);
}
@@ -161,68 +162,68 @@ public class PartitionSortedBufferSuiteJ {
ByteBuffer.allocate(128), null, ByteBuffer.allocate(1536), null,
ByteBuffer.allocate(1024)
};
- SortBuffer sortBuffer = createSortBuffer(bufferPoolSize, bufferSize,
numSubpartitions);
+ DataBuffer dataBuffer = createDataBuffer(bufferPoolSize, bufferSize,
numSubpartitions);
for (int subpartition = 0; subpartition < numSubpartitions;
++subpartition) {
ByteBuffer record = subpartitionRecords[subpartition];
if (record != null) {
- sortBuffer.append(record, subpartition, Buffer.DataType.DATA_BUFFER);
+ dataBuffer.append(record, subpartition, Buffer.DataType.DATA_BUFFER);
record.rewind();
}
}
- sortBuffer.finish();
+ dataBuffer.finish();
- checkReadResult(sortBuffer, subpartitionRecords[0], 0, bufferSize);
+ checkReadResult(dataBuffer, subpartitionRecords[0], 0, bufferSize);
ByteBuffer expected1 = subpartitionRecords[2].duplicate();
expected1.limit(bufferSize);
- checkReadResult(sortBuffer, expected1.slice(), 2, bufferSize);
+ checkReadResult(dataBuffer, expected1.slice(), 2, bufferSize);
ByteBuffer expected2 = subpartitionRecords[2].duplicate();
expected2.position(bufferSize);
- checkReadResult(sortBuffer, expected2.slice(), 2, bufferSize);
+ checkReadResult(dataBuffer, expected2.slice(), 2, bufferSize);
- checkReadResult(sortBuffer, subpartitionRecords[4], 4, bufferSize);
+ checkReadResult(dataBuffer, subpartitionRecords[4], 4, bufferSize);
}
private void checkReadResult(
- SortBuffer sortBuffer, ByteBuffer expectedBuffer, int expectedChannel,
int bufferSize) {
+ DataBuffer dataBuffer, ByteBuffer expectedBuffer, int
expectedSubpartition, int bufferSize) {
MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
- SortBuffer.BufferWithChannel bufferWithChannel =
- sortBuffer.copyIntoSegment(segment, ignore -> {}, 0);
- assertEquals(expectedChannel, bufferWithChannel.getChannelIndex());
- assertEquals(expectedBuffer,
bufferWithChannel.getBuffer().getNioBufferReadable());
+ BufferWithSubpartition bufferWithSubpartition =
+ dataBuffer.getNextBuffer(segment, ignore -> {}, 0);
+ assertEquals(expectedSubpartition,
bufferWithSubpartition.getSubpartitionIndex());
+ assertEquals(expectedBuffer,
bufferWithSubpartition.getBuffer().getNioBufferReadable());
}
@Test(expected = IllegalArgumentException.class)
public void testWriteEmptyData() throws Exception {
int bufferSize = 1024;
- SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
+ DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
ByteBuffer record = ByteBuffer.allocate(1);
record.position(1);
- sortBuffer.append(record, 0, Buffer.DataType.DATA_BUFFER);
+ dataBuffer.append(record, 0, Buffer.DataType.DATA_BUFFER);
}
@Test(expected = IllegalStateException.class)
- public void testWriteFinishedSortBuffer() throws Exception {
+ public void testWriteFinishedDataBuffer() throws Exception {
int bufferSize = 1024;
- SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
- sortBuffer.finish();
+ DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
+ dataBuffer.finish();
- sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
+ dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
}
@Test(expected = IllegalStateException.class)
- public void testWriteReleasedSortBuffer() throws Exception {
+ public void testWriteReleasedDataBuffer() throws Exception {
int bufferSize = 1024;
- SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
- sortBuffer.release();
+ DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
+ dataBuffer.release();
- sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
+ dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
}
@Test
@@ -230,15 +231,15 @@ public class PartitionSortedBufferSuiteJ {
int bufferPoolSize = 10;
int bufferSize = 1024;
- SortBuffer sortBuffer = createSortBuffer(bufferPoolSize, bufferSize, 1);
+ DataBuffer dataBuffer = createDataBuffer(bufferPoolSize, bufferSize, 1);
for (int i = 1; i < bufferPoolSize; ++i) {
- appendAndCheckResult(sortBuffer, bufferSize, true, bufferSize * i, i,
true);
+ appendAndCheckResult(dataBuffer, bufferSize, true, bufferSize * i, i,
true);
}
// append should fail for insufficient capacity
int numRecords = bufferPoolSize - 1;
- appendAndCheckResult(sortBuffer, bufferSize, false, bufferSize *
numRecords, numRecords, true);
+ appendAndCheckResult(dataBuffer, bufferSize, false, bufferSize *
numRecords, numRecords, true);
}
@Test
@@ -246,13 +247,13 @@ public class PartitionSortedBufferSuiteJ {
int bufferPoolSize = 10;
int bufferSize = 1024;
- SortBuffer sortBuffer = createSortBuffer(bufferPoolSize, bufferSize, 1);
+ DataBuffer dataBuffer = createDataBuffer(bufferPoolSize, bufferSize, 1);
// append should fail for insufficient capacity
- appendAndCheckResult(sortBuffer, bufferPoolSize * bufferSize, false, 0, 0,
false);
+ appendAndCheckResult(dataBuffer, bufferPoolSize * bufferSize, false, 0, 0,
false);
}
private void appendAndCheckResult(
- SortBuffer sortBuffer,
+ DataBuffer dataBuffer,
int recordSize,
boolean isSuccessful,
long numBytes,
@@ -261,54 +262,54 @@ public class PartitionSortedBufferSuiteJ {
throws IOException {
ByteBuffer largeRecord = ByteBuffer.allocate(recordSize);
- assertEquals(isSuccessful, sortBuffer.append(largeRecord, 0,
Buffer.DataType.DATA_BUFFER));
- assertEquals(numBytes, sortBuffer.numBytes());
- assertEquals(numRecords, sortBuffer.numRecords());
- assertEquals(hasRemaining, sortBuffer.hasRemaining());
+ assertEquals(isSuccessful, dataBuffer.append(largeRecord, 0,
Buffer.DataType.DATA_BUFFER));
+ assertEquals(numBytes, dataBuffer.numTotalBytes());
+ assertEquals(numRecords, dataBuffer.numTotalRecords());
+ assertEquals(hasRemaining, dataBuffer.hasRemaining());
}
@Test(expected = IllegalStateException.class)
- public void testReadUnfinishedSortBuffer() throws Exception {
+ public void testReadUnfinishedDataBuffer() throws Exception {
int bufferSize = 1024;
- SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
- sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
+ DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
+ dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
- assertTrue(sortBuffer.hasRemaining());
- sortBuffer.copyIntoSegment(
+ assertTrue(dataBuffer.hasRemaining());
+ dataBuffer.getNextBuffer(
MemorySegmentFactory.allocateUnpooledSegment(bufferSize), ignore ->
{}, 0);
}
@Test(expected = IllegalStateException.class)
- public void testReadReleasedSortBuffer() throws Exception {
+ public void testReadReleasedDataBuffer() throws Exception {
int bufferSize = 1024;
- SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
- sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
- sortBuffer.finish();
- assertTrue(sortBuffer.hasRemaining());
+ DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
+ dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
+ dataBuffer.finish();
+ assertTrue(dataBuffer.hasRemaining());
- sortBuffer.release();
- assertFalse(sortBuffer.hasRemaining());
+ dataBuffer.release();
+ assertFalse(dataBuffer.hasRemaining());
- sortBuffer.copyIntoSegment(
+ dataBuffer.getNextBuffer(
MemorySegmentFactory.allocateUnpooledSegment(bufferSize), ignore ->
{}, 0);
}
@Test(expected = IllegalStateException.class)
- public void testReadEmptySortBuffer() throws Exception {
+ public void testReadEmptyDataBuffer() throws Exception {
int bufferSize = 1024;
- SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
- sortBuffer.finish();
+ DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
+ dataBuffer.finish();
- assertFalse(sortBuffer.hasRemaining());
- sortBuffer.copyIntoSegment(
+ assertFalse(dataBuffer.hasRemaining());
+ dataBuffer.getNextBuffer(
MemorySegmentFactory.allocateUnpooledSegment(bufferSize), ignore ->
{}, 0);
}
@Test
- public void testReleaseSortBuffer() throws Exception {
+ public void testReleaseDataBuffer() throws Exception {
int bufferPoolSize = 10;
int bufferSize = 1024;
int recordSize = (bufferPoolSize - 1) * bufferSize;
@@ -316,34 +317,34 @@ public class PartitionSortedBufferSuiteJ {
NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize,
bufferSize);
BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize,
bufferPoolSize);
- SortBuffer sortBuffer = new PartitionSortedBuffer(bufferPool, 1,
bufferSize, null);
- sortBuffer.append(ByteBuffer.allocate(recordSize), 0,
Buffer.DataType.DATA_BUFFER);
+ DataBuffer dataBuffer = new SortBasedDataBuffer(bufferPool, 1, bufferSize,
null);
+ dataBuffer.append(ByteBuffer.allocate(recordSize), 0,
Buffer.DataType.DATA_BUFFER);
assertEquals(bufferPoolSize, bufferPool.bestEffortGetNumOfUsedBuffers());
- assertTrue(sortBuffer.hasRemaining());
- assertEquals(1, sortBuffer.numRecords());
- assertEquals(recordSize, sortBuffer.numBytes());
+ assertTrue(dataBuffer.hasRemaining());
+ assertEquals(1, dataBuffer.numTotalRecords());
+ assertEquals(recordSize, dataBuffer.numTotalBytes());
// should release all data and resources
- sortBuffer.release();
+ dataBuffer.release();
assertEquals(0, bufferPool.bestEffortGetNumOfUsedBuffers());
- assertFalse(sortBuffer.hasRemaining());
- assertEquals(0, sortBuffer.numRecords());
- assertEquals(0, sortBuffer.numBytes());
+ assertFalse(dataBuffer.hasRemaining());
+ assertEquals(0, dataBuffer.numTotalRecords());
+ assertEquals(0, dataBuffer.numTotalBytes());
}
- private SortBuffer createSortBuffer(int bufferPoolSize, int bufferSize, int
numSubpartitions)
+ private DataBuffer createDataBuffer(int bufferPoolSize, int bufferSize, int
numSubpartitions)
throws IOException {
- return createSortBuffer(bufferPoolSize, bufferSize, numSubpartitions,
null);
+ return createDataBuffer(bufferPoolSize, bufferSize, numSubpartitions,
null);
}
- private SortBuffer createSortBuffer(
+ private DataBuffer createDataBuffer(
int bufferPoolSize, int bufferSize, int numSubpartitions, int[]
customReadOrder)
throws IOException {
NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize,
bufferSize);
BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize,
bufferPoolSize);
- return new PartitionSortedBuffer(bufferPool, numSubpartitions, bufferSize,
customReadOrder);
+ return new SortBasedDataBuffer(bufferPool, numSubpartitions, bufferSize,
customReadOrder);
}
public static int[] getRandomSubpartitionOrder(int numSubpartitions) {
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index d28e78285..fa8b8692a 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -40,16 +40,16 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.util.function.SupplierWithException;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
/**
- * A {@link ResultPartition} which appends records and events to {@link
SortBuffer} and after the
- * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be
copied and spilled to the
+ * A {@link ResultPartition} which appends records and events to {@link
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be
copied and spilled to the
* remote shuffle service in subpartition index order sequentially. Large
records that can not be
- * appended to an empty {@link
org.apache.flink.runtime.io.network.partition.SortBuffer} will be
- * spilled directly.
+ * appended to an empty {@link DataBuffer} will be spilled directly.
*/
public class RemoteShuffleResultPartition extends ResultPartition {
@@ -81,10 +81,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
- networkBufferSize,
- outputGate,
- (bufferWithChannel, isBroadcast) ->
updateStatistics(bufferWithChannel, isBroadcast),
- numSubpartitions);
+ networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
}
@Override
@@ -92,10 +89,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
super.setup();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
- bufferPool,
- bufferCompressor,
- buffer -> canBeCompressed(buffer),
- () -> checkInProduceState());
+ bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
}
@Override
@@ -129,7 +123,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
@Override
public synchronized void close() {
- delegation.close(() -> super.close());
+ delegation.close(super::close);
}
@Override
@@ -199,11 +193,10 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
return delegation;
}
- public void updateStatistics(
- SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+ public void updateStatistics(BufferWithSubpartition bufferWithSubpartition,
boolean isBroadcast) {
numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
long readableBytes =
- (long) bufferWithChannel.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
+ (long) bufferWithSubpartition.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions :
readableBytes);
}
}
diff --git
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 34bd3bab4..1a96a9bb6 100644
---
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -62,7 +62,7 @@ import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
@@ -129,10 +129,10 @@ public class RemoteShuffleResultPartitionSuiteJ {
doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
doNothing().when(remoteShuffleOutputGate).regionFinish();
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
- SortBuffer sortBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+ DataBuffer dataBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
- sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
- remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer,
true);
+ dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+ remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer,
true);
}
private List<SupplierWithException<BufferPool, IOException>>
createBufferPoolFactory() {
diff --git
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 3b25a4a0a..3e35b788f 100644
---
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -36,15 +36,16 @@ import
org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.*;
import org.apache.flink.util.function.SupplierWithException;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
/**
- * A {@link ResultPartition} which appends records and events to {@link
SortBuffer} and after the
- * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be
copied and spilled to the
+ * A {@link ResultPartition} which appends records and events to {@link
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be
copied and spilled to the
* remote shuffle service in subpartition index order sequentially. Large
records that can not be
- * appended to an empty {@link SortBuffer} will be spilled directly.
+ * appended to an empty {@link DataBuffer} will be spilled directly.
*/
public class RemoteShuffleResultPartition extends ResultPartition {
@@ -76,10 +77,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
- networkBufferSize,
- outputGate,
- (bufferWithChannel, isBroadcast) ->
updateStatistics(bufferWithChannel, isBroadcast),
- numSubpartitions);
+ networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
}
@Override
@@ -87,10 +85,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
super.setup();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
- bufferPool,
- bufferCompressor,
- buffer -> canBeCompressed(buffer),
- () -> checkInProduceState());
+ bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
}
@Override
@@ -124,7 +119,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
@Override
public synchronized void close() {
- delegation.close(() -> super.close());
+ delegation.close(super::close);
}
@Override
@@ -199,11 +194,10 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
return delegation;
}
- public void updateStatistics(
- SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+ public void updateStatistics(BufferWithSubpartition bufferWithSubpartition,
boolean isBroadcast) {
numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
long readableBytes =
- (long) bufferWithChannel.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
+ (long) bufferWithSubpartition.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
numBytesProduced.inc(readableBytes);
numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions :
readableBytes);
}
diff --git
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 34bd3bab4..1a96a9bb6 100644
---
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -62,7 +62,7 @@ import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
@@ -129,10 +129,10 @@ public class RemoteShuffleResultPartitionSuiteJ {
doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
doNothing().when(remoteShuffleOutputGate).regionFinish();
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
- SortBuffer sortBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+ DataBuffer dataBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
- sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
- remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer,
true);
+ dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+ remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer,
true);
}
private List<SupplierWithException<BufferPool, IOException>>
createBufferPoolFactory() {
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index c888b6ee9..37ff33903 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -37,15 +37,16 @@ import
org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.*;
import org.apache.flink.util.function.SupplierWithException;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
/**
- * A {@link ResultPartition} which appends records and events to {@link
SortBuffer} and after the
- * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be
copied and spilled to the
+ * A {@link ResultPartition} which appends records and events to {@link
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be
copied and spilled to the
* remote shuffle service in subpartition index order sequentially. Large
records that can not be
- * appended to an empty {@link SortBuffer} will be spilled directly.
+ * appended to an empty {@link DataBuffer} will be spilled directly.
*/
public class RemoteShuffleResultPartition extends ResultPartition {
@@ -77,10 +78,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
- networkBufferSize,
- outputGate,
- (bufferWithChannel, isBroadcast) ->
updateStatistics(bufferWithChannel, isBroadcast),
- numSubpartitions);
+ networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
}
@Override
@@ -88,10 +86,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
super.setup();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
- bufferPool,
- bufferCompressor,
- buffer -> canBeCompressed(buffer),
- () -> checkInProduceState());
+ bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
}
@Override
@@ -136,7 +131,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
@Override
public synchronized void close() {
- delegation.close(() -> super.close());
+ delegation.close(super::close);
}
@Override
@@ -211,15 +206,14 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
return delegation;
}
- public void updateStatistics(
- SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+ public void updateStatistics(BufferWithSubpartition bufferWithSubpartition,
boolean isBroadcast) {
numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
long readableBytes =
- (long) bufferWithChannel.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
+ (long) bufferWithSubpartition.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
if (isBroadcast) {
resultPartitionBytes.incAll(readableBytes);
} else {
- resultPartitionBytes.inc(bufferWithChannel.getChannelIndex(),
readableBytes);
+ resultPartitionBytes.inc(bufferWithSubpartition.getSubpartitionIndex(),
readableBytes);
}
numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions :
readableBytes);
}
diff --git
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 34bd3bab4..1a96a9bb6 100644
---
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -62,7 +62,7 @@ import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
@@ -129,10 +129,10 @@ public class RemoteShuffleResultPartitionSuiteJ {
doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
doNothing().when(remoteShuffleOutputGate).regionFinish();
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
- SortBuffer sortBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+ DataBuffer dataBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
- sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
- remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer,
true);
+ dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+ remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer,
true);
}
private List<SupplierWithException<BufferPool, IOException>>
createBufferPoolFactory() {
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index c888b6ee9..37ff33903 100644
---
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -37,15 +37,16 @@ import
org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.*;
import org.apache.flink.util.function.SupplierWithException;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
/**
- * A {@link ResultPartition} which appends records and events to {@link
SortBuffer} and after the
- * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be
copied and spilled to the
+ * A {@link ResultPartition} which appends records and events to {@link
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be
copied and spilled to the
* remote shuffle service in subpartition index order sequentially. Large
records that can not be
- * appended to an empty {@link SortBuffer} will be spilled directly.
+ * appended to an empty {@link DataBuffer} will be spilled directly.
*/
public class RemoteShuffleResultPartition extends ResultPartition {
@@ -77,10 +78,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
- networkBufferSize,
- outputGate,
- (bufferWithChannel, isBroadcast) ->
updateStatistics(bufferWithChannel, isBroadcast),
- numSubpartitions);
+ networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
}
@Override
@@ -88,10 +86,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
super.setup();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
- bufferPool,
- bufferCompressor,
- buffer -> canBeCompressed(buffer),
- () -> checkInProduceState());
+ bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
}
@Override
@@ -136,7 +131,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
@Override
public synchronized void close() {
- delegation.close(() -> super.close());
+ delegation.close(super::close);
}
@Override
@@ -211,15 +206,14 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
return delegation;
}
- public void updateStatistics(
- SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+ public void updateStatistics(BufferWithSubpartition bufferWithSubpartition,
boolean isBroadcast) {
numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
long readableBytes =
- (long) bufferWithChannel.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
+ (long) bufferWithSubpartition.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
if (isBroadcast) {
resultPartitionBytes.incAll(readableBytes);
} else {
- resultPartitionBytes.inc(bufferWithChannel.getChannelIndex(),
readableBytes);
+ resultPartitionBytes.inc(bufferWithSubpartition.getSubpartitionIndex(),
readableBytes);
}
numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions :
readableBytes);
}
diff --git
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 34bd3bab4..1a96a9bb6 100644
---
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -62,7 +62,7 @@ import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
@@ -129,10 +129,10 @@ public class RemoteShuffleResultPartitionSuiteJ {
doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
doNothing().when(remoteShuffleOutputGate).regionFinish();
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
- SortBuffer sortBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+ DataBuffer dataBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
- sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
- remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer,
true);
+ dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+ remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer,
true);
}
private List<SupplierWithException<BufferPool, IOException>>
createBufferPoolFactory() {
diff --git
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index c888b6ee9..37ff33903 100644
---
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -37,15 +37,16 @@ import
org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.*;
import org.apache.flink.util.function.SupplierWithException;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
/**
- * A {@link ResultPartition} which appends records and events to {@link
SortBuffer} and after the
- * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be
copied and spilled to the
+ * A {@link ResultPartition} which appends records and events to {@link
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be
copied and spilled to the
* remote shuffle service in subpartition index order sequentially. Large
records that can not be
- * appended to an empty {@link SortBuffer} will be spilled directly.
+ * appended to an empty {@link DataBuffer} will be spilled directly.
*/
public class RemoteShuffleResultPartition extends ResultPartition {
@@ -77,10 +78,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
- networkBufferSize,
- outputGate,
- (bufferWithChannel, isBroadcast) ->
updateStatistics(bufferWithChannel, isBroadcast),
- numSubpartitions);
+ networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
}
@Override
@@ -88,10 +86,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
super.setup();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
- bufferPool,
- bufferCompressor,
- buffer -> canBeCompressed(buffer),
- () -> checkInProduceState());
+ bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
}
@Override
@@ -136,7 +131,7 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
@Override
public synchronized void close() {
- delegation.close(() -> super.close());
+ delegation.close(super::close);
}
@Override
@@ -211,15 +206,14 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
return delegation;
}
- public void updateStatistics(
- SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+ public void updateStatistics(BufferWithSubpartition bufferWithSubpartition,
boolean isBroadcast) {
numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
long readableBytes =
- (long) bufferWithChannel.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
+ (long) bufferWithSubpartition.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
if (isBroadcast) {
resultPartitionBytes.incAll(readableBytes);
} else {
- resultPartitionBytes.inc(bufferWithChannel.getChannelIndex(),
readableBytes);
+ resultPartitionBytes.inc(bufferWithSubpartition.getSubpartitionIndex(),
readableBytes);
}
numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions :
readableBytes);
}
diff --git
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index df3c25191..dffb6239e 100644
---
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -62,7 +62,7 @@ import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
@@ -129,10 +129,10 @@ public class RemoteShuffleResultPartitionSuiteJ {
doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
doNothing().when(remoteShuffleOutputGate).regionFinish();
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
- SortBuffer sortBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+ DataBuffer dataBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
- sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
- remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer,
true);
+ dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+ remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer,
true);
}
private List<SupplierWithException<BufferPool, IOException>>
createBufferPoolFactory() {