zhijiangW commented on a change in pull request #13595:
URL: https://github.com/apache/flink/pull/13595#discussion_r509983669



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import static 
org.apache.flink.runtime.io.network.partition.SortBuffer.BufferWithChannel;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SortMergeResultPartition} appends records and events to {@link 
SortBuffer} and after the {@link SortBuffer}
+ * is full, all data in the {@link SortBuffer} will be copied and spilled to a 
{@link PartitionedFile} in subpartition
+ * index order sequentially. Large records that can not be appended to an 
empty {@link SortBuffer} will be spilled to
+ * the {@link PartitionedFile} separately.
+ */
+@NotThreadSafe
+public class SortMergeResultPartition extends ResultPartition {
+
+       private final Object lock = new Object();
+
+       /** All active readers which are consuming data from this result 
partition now. */
+       @GuardedBy("lock")
+       private final Set<SortMergeSubpartitionReader> readers = new 
HashSet<>();
+
+       /** {@link PartitionedFile} produced by this result partition. */
+       @GuardedBy("lock")
+       private PartitionedFile resultFile;
+
+       /** Used to generate random file channel ID. */
+       private final FileChannelManager channelManager;
+
+       /** Number of data buffers (excluding events) written for each 
subpartition. */
+       private final int[] numDataBuffers;
+
+       /** A piece of unmanaged memory for data writing. */
+       private final MemorySegment writeBuffer;
+
+       /** Size of network buffer and write buffer. */
+       private final int networkBufferSize;
+
+       /** Current {@link SortBuffer} to append records to. */
+       private SortBuffer currentSortBuffer;
+
+       /** File writer for this result partition. */
+       private PartitionedFileWriter fileWriter;
+
+       public SortMergeResultPartition(
+                       String owningTaskName,
+                       int partitionIndex,
+                       ResultPartitionID partitionId,
+                       ResultPartitionType partitionType,
+                       int numSubpartitions,
+                       int numTargetKeyGroups,
+                       int networkBufferSize,
+                       ResultPartitionManager partitionManager,
+                       FileChannelManager channelManager,
+                       @Nullable BufferCompressor bufferCompressor,
+                       SupplierWithException<BufferPool, IOException> 
bufferPoolFactory) {
+
+               super(
+                       owningTaskName,
+                       partitionIndex,
+                       partitionId,
+                       partitionType,
+                       numSubpartitions,
+                       numTargetKeyGroups,
+                       partitionManager,
+                       bufferCompressor,
+                       bufferPoolFactory);
+
+               this.channelManager = checkNotNull(channelManager);
+               this.networkBufferSize = networkBufferSize;
+               this.numDataBuffers = new int[numSubpartitions];
+               this.writeBuffer = 
MemorySegmentFactory.allocateUnpooledOffHeapMemory(networkBufferSize);
+       }
+
+       @Override
+       protected void releaseInternal() {
+               synchronized (lock) {
+                       isFinished = true; // to fail writing faster
+
+                       // delete the produced file only when no reader is 
reading now
+                       if (readers.isEmpty()) {
+                               if (resultFile != null) {
+                                       resultFile.deleteQuietly();
+                                       resultFile = null;
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void emitRecord(ByteBuffer record, int targetSubpartition) 
throws IOException {
+               emit(record, targetSubpartition, DataType.DATA_BUFFER);
+       }
+
+       @Override
+       public void broadcastRecord(ByteBuffer record) throws IOException {
+               broadcast(record, DataType.DATA_BUFFER);
+       }
+
+       @Override
+       public void broadcastEvent(AbstractEvent event, boolean 
isPriorityEvent) throws IOException {
+               Buffer buffer = EventSerializer.toBuffer(event, 
isPriorityEvent);
+               try {
+                       ByteBuffer serializedEvent = 
buffer.getNioBufferReadable();
+                       broadcast(serializedEvent, buffer.getDataType());
+               } finally {
+                       buffer.recycleBuffer();
+               }
+       }
+
+       private void broadcast(ByteBuffer record, DataType dataType) throws 
IOException {
+               for (int channelIndex = 0; channelIndex < numSubpartitions; 
++channelIndex) {
+                       record.rewind();
+                       emit(record, channelIndex, dataType);
+               }
+       }
+
+       private void emit(ByteBuffer record, int targetSubpartition, DataType 
dataType) throws IOException {
+               checkInProduceState();
+
+               SortBuffer sortBuffer = getSortBuffer();
+               if (sortBuffer.append(record, targetSubpartition, dataType)) {
+                       return;
+               }
+
+               if (!sortBuffer.hasRemaining()) {
+                       // the record can not be appended to the free sort 
buffer because it is too large
+                       releaseCurrentSortBuffer();
+                       writeLargeRecord(record, targetSubpartition, dataType);
+                       return;
+               }
+
+               flushCurrentSortBuffer();
+               emit(record, targetSubpartition, dataType);
+       }
+
+       private void releaseCurrentSortBuffer() {
+               if (currentSortBuffer != null) {
+                       currentSortBuffer.release();
+                       currentSortBuffer = null;
+               }
+       }
+
+       private SortBuffer getSortBuffer() {
+               if (currentSortBuffer != null) {
+                       return currentSortBuffer;
+               }
+
+               currentSortBuffer = new PartitionSortedBuffer(bufferPool, 
numSubpartitions, networkBufferSize);
+               return currentSortBuffer;
+       }
+
+       private void flushCurrentSortBuffer() throws IOException {
+               if (currentSortBuffer == null || 
!currentSortBuffer.hasRemaining()) {
+                       releaseCurrentSortBuffer();
+                       return;
+               }
+
+               currentSortBuffer.finish();
+               PartitionedFileWriter fileWriter = getPartitionedFileWriter();
+
+               while (currentSortBuffer.hasRemaining()) {
+                       BufferWithChannel bufferWithChannel = 
currentSortBuffer.copyData(writeBuffer);
+                       Buffer buffer = bufferWithChannel.getBuffer();
+                       int subpartitionIndex = 
bufferWithChannel.getChannelIndex();
+
+                       writeCompressedBufferIfPossible(buffer, fileWriter, 
subpartitionIndex);
+                       updateStatistics(buffer, subpartitionIndex);
+               }
+
+               releaseCurrentSortBuffer();
+       }
+
+       private PartitionedFileWriter getPartitionedFileWriter() throws 
IOException {
+               if (fileWriter == null) {
+                       String basePath = 
channelManager.createChannel().getPath();
+                       fileWriter = new PartitionedFileWriter(basePath, 
numSubpartitions);
+                       fileWriter.open();
+               }
+
+               fileWriter.startNewRegion();
+               return fileWriter;
+       }
+
+       private void writeCompressedBufferIfPossible(
+                       Buffer buffer,
+                       PartitionedFileWriter fileWriter,

Review comment:
       The `fileWriter` is not necessary to provide as an argument since it can 
be got directly from `getPartitionedFileWriter()`, then it can simplify a bit 
the callers.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import static 
org.apache.flink.runtime.io.network.partition.SortBuffer.BufferWithChannel;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SortMergeResultPartition} appends records and events to {@link 
SortBuffer} and after the {@link SortBuffer}
+ * is full, all data in the {@link SortBuffer} will be copied and spilled to a 
{@link PartitionedFile} in subpartition
+ * index order sequentially. Large records that can not be appended to an 
empty {@link SortBuffer} will be spilled to
+ * the {@link PartitionedFile} separately.
+ */
+@NotThreadSafe
+public class SortMergeResultPartition extends ResultPartition {
+
+       private final Object lock = new Object();
+
+       /** All active readers which are consuming data from this result 
partition now. */
+       @GuardedBy("lock")
+       private final Set<SortMergeSubpartitionReader> readers = new 
HashSet<>();
+
+       /** {@link PartitionedFile} produced by this result partition. */
+       @GuardedBy("lock")
+       private PartitionedFile resultFile;
+
+       /** Used to generate random file channel ID. */
+       private final FileChannelManager channelManager;
+
+       /** Number of data buffers (excluding events) written for each 
subpartition. */
+       private final int[] numDataBuffers;
+
+       /** A piece of unmanaged memory for data writing. */
+       private final MemorySegment writeBuffer;
+
+       /** Size of network buffer and write buffer. */
+       private final int networkBufferSize;
+
+       /** Current {@link SortBuffer} to append records to. */
+       private SortBuffer currentSortBuffer;
+
+       /** File writer for this result partition. */
+       private PartitionedFileWriter fileWriter;
+
+       public SortMergeResultPartition(
+                       String owningTaskName,
+                       int partitionIndex,
+                       ResultPartitionID partitionId,
+                       ResultPartitionType partitionType,
+                       int numSubpartitions,
+                       int numTargetKeyGroups,
+                       int networkBufferSize,
+                       ResultPartitionManager partitionManager,
+                       FileChannelManager channelManager,
+                       @Nullable BufferCompressor bufferCompressor,
+                       SupplierWithException<BufferPool, IOException> 
bufferPoolFactory) {
+
+               super(
+                       owningTaskName,
+                       partitionIndex,
+                       partitionId,
+                       partitionType,
+                       numSubpartitions,
+                       numTargetKeyGroups,
+                       partitionManager,
+                       bufferCompressor,
+                       bufferPoolFactory);
+
+               this.channelManager = checkNotNull(channelManager);
+               this.networkBufferSize = networkBufferSize;
+               this.numDataBuffers = new int[numSubpartitions];
+               this.writeBuffer = 
MemorySegmentFactory.allocateUnpooledOffHeapMemory(networkBufferSize);
+       }
+
+       @Override
+       protected void releaseInternal() {
+               synchronized (lock) {
+                       isFinished = true; // to fail writing faster
+
+                       // delete the produced file only when no reader is 
reading now
+                       if (readers.isEmpty()) {
+                               if (resultFile != null) {
+                                       resultFile.deleteQuietly();
+                                       resultFile = null;
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void emitRecord(ByteBuffer record, int targetSubpartition) 
throws IOException {
+               emit(record, targetSubpartition, DataType.DATA_BUFFER);
+       }
+
+       @Override
+       public void broadcastRecord(ByteBuffer record) throws IOException {
+               broadcast(record, DataType.DATA_BUFFER);
+       }
+
+       @Override
+       public void broadcastEvent(AbstractEvent event, boolean 
isPriorityEvent) throws IOException {
+               Buffer buffer = EventSerializer.toBuffer(event, 
isPriorityEvent);
+               try {
+                       ByteBuffer serializedEvent = 
buffer.getNioBufferReadable();
+                       broadcast(serializedEvent, buffer.getDataType());
+               } finally {
+                       buffer.recycleBuffer();
+               }
+       }
+
+       private void broadcast(ByteBuffer record, DataType dataType) throws 
IOException {
+               for (int channelIndex = 0; channelIndex < numSubpartitions; 
++channelIndex) {
+                       record.rewind();
+                       emit(record, channelIndex, dataType);
+               }
+       }
+
+       private void emit(ByteBuffer record, int targetSubpartition, DataType 
dataType) throws IOException {
+               checkInProduceState();
+
+               SortBuffer sortBuffer = getSortBuffer();
+               if (sortBuffer.append(record, targetSubpartition, dataType)) {
+                       return;
+               }
+
+               if (!sortBuffer.hasRemaining()) {
+                       // the record can not be appended to the free sort 
buffer because it is too large
+                       releaseCurrentSortBuffer();
+                       writeLargeRecord(record, targetSubpartition, dataType);
+                       return;
+               }
+
+               flushCurrentSortBuffer();
+               emit(record, targetSubpartition, dataType);

Review comment:
       I think the below logic is not very easily to understand and also has 
some overheads in practice. 
   
   ```
   if (!sortBuffer.hasRemaining()) {
        // the record can not be appended to the free sort buffer because it is 
too large
        releaseCurrentSortBuffer();
           writeLargeRecord(record, targetSubpartition, dataType);
        return;
   }
   
   flushCurrentSortBuffer();
   emit(record, targetSubpartition, dataType);
   ```
   
   Here we have to flush current buffer and call `emit` again to try out 
whether the unfilled sort buffer can hold the large record or not. During 
`sortBuffer.append` it would request all the segments from `LocalBufferPool` 
until exhausted before return `false` to decide flush large record.
   
   I think of two options for improvement here.
   
   - Make `SortMergeResultPartition` more light-weight component to only 
dispatch record/events, then the `SortBuffer` can decide itself when to append 
records in cache, when to flush them and when to flush large records directly 
internally. No need to expose many interfaces to interact with 
`SortMergeResultPartition`.
   
   - `SortBuffer` provide another method to expose the cache threshold, it can 
be easily got from the internal `LocalBufferPool`. Then 
`SortMergeResultPartition` can decide whether to flush the large record 
directly to avoid unnecessary requesting all the segments. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import static 
org.apache.flink.runtime.io.network.partition.SortBuffer.BufferWithChannel;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SortMergeResultPartition} appends records and events to {@link 
SortBuffer} and after the {@link SortBuffer}
+ * is full, all data in the {@link SortBuffer} will be copied and spilled to a 
{@link PartitionedFile} in subpartition
+ * index order sequentially. Large records that can not be appended to an 
empty {@link SortBuffer} will be spilled to
+ * the {@link PartitionedFile} separately.
+ */
+@NotThreadSafe
+public class SortMergeResultPartition extends ResultPartition {
+
+       private final Object lock = new Object();
+
+       /** All active readers which are consuming data from this result 
partition now. */
+       @GuardedBy("lock")
+       private final Set<SortMergeSubpartitionReader> readers = new 
HashSet<>();
+
+       /** {@link PartitionedFile} produced by this result partition. */
+       @GuardedBy("lock")
+       private PartitionedFile resultFile;
+
+       /** Used to generate random file channel ID. */
+       private final FileChannelManager channelManager;
+
+       /** Number of data buffers (excluding events) written for each 
subpartition. */
+       private final int[] numDataBuffers;
+
+       /** A piece of unmanaged memory for data writing. */
+       private final MemorySegment writeBuffer;
+
+       /** Size of network buffer and write buffer. */
+       private final int networkBufferSize;
+
+       /** Current {@link SortBuffer} to append records to. */
+       private SortBuffer currentSortBuffer;
+
+       /** File writer for this result partition. */
+       private PartitionedFileWriter fileWriter;
+
+       public SortMergeResultPartition(
+                       String owningTaskName,
+                       int partitionIndex,
+                       ResultPartitionID partitionId,
+                       ResultPartitionType partitionType,
+                       int numSubpartitions,
+                       int numTargetKeyGroups,
+                       int networkBufferSize,
+                       ResultPartitionManager partitionManager,
+                       FileChannelManager channelManager,
+                       @Nullable BufferCompressor bufferCompressor,
+                       SupplierWithException<BufferPool, IOException> 
bufferPoolFactory) {
+
+               super(
+                       owningTaskName,
+                       partitionIndex,
+                       partitionId,
+                       partitionType,
+                       numSubpartitions,
+                       numTargetKeyGroups,
+                       partitionManager,
+                       bufferCompressor,
+                       bufferPoolFactory);
+
+               this.channelManager = checkNotNull(channelManager);
+               this.networkBufferSize = networkBufferSize;
+               this.numDataBuffers = new int[numSubpartitions];
+               this.writeBuffer = 
MemorySegmentFactory.allocateUnpooledOffHeapMemory(networkBufferSize);
+       }
+
+       @Override
+       protected void releaseInternal() {
+               synchronized (lock) {
+                       isFinished = true; // to fail writing faster
+
+                       // delete the produced file only when no reader is 
reading now
+                       if (readers.isEmpty()) {
+                               if (resultFile != null) {
+                                       resultFile.deleteQuietly();
+                                       resultFile = null;
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void emitRecord(ByteBuffer record, int targetSubpartition) 
throws IOException {
+               emit(record, targetSubpartition, DataType.DATA_BUFFER);
+       }
+
+       @Override
+       public void broadcastRecord(ByteBuffer record) throws IOException {
+               broadcast(record, DataType.DATA_BUFFER);
+       }
+
+       @Override
+       public void broadcastEvent(AbstractEvent event, boolean 
isPriorityEvent) throws IOException {
+               Buffer buffer = EventSerializer.toBuffer(event, 
isPriorityEvent);
+               try {
+                       ByteBuffer serializedEvent = 
buffer.getNioBufferReadable();
+                       broadcast(serializedEvent, buffer.getDataType());
+               } finally {
+                       buffer.recycleBuffer();
+               }
+       }
+
+       private void broadcast(ByteBuffer record, DataType dataType) throws 
IOException {
+               for (int channelIndex = 0; channelIndex < numSubpartitions; 
++channelIndex) {
+                       record.rewind();
+                       emit(record, channelIndex, dataType);
+               }
+       }
+
+       private void emit(ByteBuffer record, int targetSubpartition, DataType 
dataType) throws IOException {
+               checkInProduceState();
+
+               SortBuffer sortBuffer = getSortBuffer();
+               if (sortBuffer.append(record, targetSubpartition, dataType)) {
+                       return;
+               }
+
+               if (!sortBuffer.hasRemaining()) {
+                       // the record can not be appended to the free sort 
buffer because it is too large
+                       releaseCurrentSortBuffer();

Review comment:
       I guess this call is not really necessary. Because the current sort 
buffer can still be used in the followup small data after flushing the large 
record to file directly.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import static 
org.apache.flink.runtime.io.network.partition.SortBuffer.BufferWithChannel;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SortMergeResultPartition} appends records and events to {@link 
SortBuffer} and after the {@link SortBuffer}
+ * is full, all data in the {@link SortBuffer} will be copied and spilled to a 
{@link PartitionedFile} in subpartition
+ * index order sequentially. Large records that can not be appended to an 
empty {@link SortBuffer} will be spilled to
+ * the {@link PartitionedFile} separately.
+ */
+@NotThreadSafe
+public class SortMergeResultPartition extends ResultPartition {
+
+       private final Object lock = new Object();
+
+       /** All active readers which are consuming data from this result 
partition now. */
+       @GuardedBy("lock")
+       private final Set<SortMergeSubpartitionReader> readers = new 
HashSet<>();
+
+       /** {@link PartitionedFile} produced by this result partition. */
+       @GuardedBy("lock")
+       private PartitionedFile resultFile;
+
+       /** Used to generate random file channel ID. */
+       private final FileChannelManager channelManager;
+
+       /** Number of data buffers (excluding events) written for each 
subpartition. */
+       private final int[] numDataBuffers;
+
+       /** A piece of unmanaged memory for data writing. */
+       private final MemorySegment writeBuffer;
+
+       /** Size of network buffer and write buffer. */
+       private final int networkBufferSize;
+
+       /** Current {@link SortBuffer} to append records to. */
+       private SortBuffer currentSortBuffer;
+
+       /** File writer for this result partition. */
+       private PartitionedFileWriter fileWriter;

Review comment:
       I guess this variable could be defined as final and setup in the 
constructor after removing `null` setting in below `finish()` method. Then we 
do not need to consider the condition of `fileWriter == null` is various usages.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to