gaoyunhaii commented on a change in pull request #15192:
URL: https://github.com/apache/flink/pull/15192#discussion_r594938336
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -70,15 +81,18 @@
/** Number of data buffers (excluding events) written for each
subpartition. */
private final int[] numDataBuffers;
- /** A piece of unmanaged memory for data writing. */
- private final MemorySegment writeBuffer;
+ /** Buffers cut from the network buffer pool for data writing. */
+ private final List<MemorySegment> writeBuffers = new ArrayList<>();
Review comment:
@GuardBy("lock") ?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -212,31 +269,46 @@ private void flushCurrentSortBuffer() throws IOException {
if (currentSortBuffer.hasRemaining()) {
fileWriter.startNewRegion();
+ List<BufferWithChannel> toWrite = new ArrayList<>();
+ Queue<MemorySegment> segments = getWriteBuffers();
+
while (currentSortBuffer.hasRemaining()) {
- BufferWithChannel bufferWithChannel =
- currentSortBuffer.copyIntoSegment(writeBuffer);
- Buffer buffer = bufferWithChannel.getBuffer();
- int subpartitionIndex = bufferWithChannel.getChannelIndex();
+ if (segments.isEmpty()) {
+ fileWriter.writeBuffers(toWrite);
+ toWrite.clear();
+ segments = getWriteBuffers();
+ }
- writeCompressedBufferIfPossible(buffer, subpartitionIndex);
+ BufferWithChannel bufferWithChannel =
+
currentSortBuffer.copyIntoSegment(checkNotNull(segments.poll()));
+ toWrite.add(compressBufferIfPossible(bufferWithChannel));
}
+
+ fileWriter.writeBuffers(toWrite);
}
currentSortBuffer.release();
}
- private void writeCompressedBufferIfPossible(Buffer buffer, int
targetSubpartition)
- throws IOException {
- updateStatistics(buffer, targetSubpartition);
+ private Queue<MemorySegment> getWriteBuffers() {
+ synchronized (lock) {
+ checkState(!writeBuffers.isEmpty(), "Task has been canceled.");
+ return new ArrayDeque<>(writeBuffers);
+ }
+ }
- try {
- if (canBeCompressed(buffer)) {
- buffer = bufferCompressor.compressToIntermediateBuffer(buffer);
- }
- fileWriter.writeBuffer(buffer, targetSubpartition);
- } finally {
- buffer.recycleBuffer();
+ private BufferWithChannel compressBufferIfPossible(BufferWithChannel
bufferWithChannel) {
+ Buffer buffer = bufferWithChannel.getBuffer();
+ int channelIndex = bufferWithChannel.getChannelIndex();
+
+ updateStatistics(buffer, channelIndex);
Review comment:
After the modification, `updateStatistics` method called here seems to
be a bit inconsistent with the name of this method. We might move the
statistics out.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -289,8 +374,20 @@ public void finish() throws IOException {
super.finish();
}
+ private void releaseWriteBuffers() {
+ synchronized (lock) {
+ if (bufferPool != null) {
+ for (MemorySegment segment : writeBuffers) {
+ bufferPool.recycle(segment);
+ }
+ writeBuffers.clear();
+ }
+ }
+ }
+
@Override
public void close() {
+ releaseWriteBuffers();
releaseCurrentSortBuffer();
Review comment:
Although not fully related with this PR, currently the sort buffer seems
to have different treatment with the write buffers on lock. We might add a
comment on there would always be a close() call from the main thread or we
might change currentSortBuffer to be volatile.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]