zhijiangW commented on a change in pull request #13595:
URL: https://github.com/apache/flink/pull/13595#discussion_r510014166



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import static 
org.apache.flink.runtime.io.network.partition.SortBuffer.BufferWithChannel;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SortMergeResultPartition} appends records and events to {@link 
SortBuffer} and after the {@link SortBuffer}
+ * is full, all data in the {@link SortBuffer} will be copied and spilled to a 
{@link PartitionedFile} in subpartition
+ * index order sequentially. Large records that can not be appended to an 
empty {@link SortBuffer} will be spilled to
+ * the {@link PartitionedFile} separately.
+ */
+@NotThreadSafe
+public class SortMergeResultPartition extends ResultPartition {
+
+       private final Object lock = new Object();
+
+       /** All active readers which are consuming data from this result 
partition now. */
+       @GuardedBy("lock")
+       private final Set<SortMergeSubpartitionReader> readers = new 
HashSet<>();
+
+       /** {@link PartitionedFile} produced by this result partition. */
+       @GuardedBy("lock")
+       private PartitionedFile resultFile;
+
+       /** Used to generate random file channel ID. */
+       private final FileChannelManager channelManager;
+
+       /** Number of data buffers (excluding events) written for each 
subpartition. */
+       private final int[] numDataBuffers;
+
+       /** A piece of unmanaged memory for data writing. */
+       private final MemorySegment writeBuffer;
+
+       /** Size of network buffer and write buffer. */
+       private final int networkBufferSize;
+
+       /** Current {@link SortBuffer} to append records to. */
+       private SortBuffer currentSortBuffer;
+
+       /** File writer for this result partition. */
+       private PartitionedFileWriter fileWriter;
+
+       public SortMergeResultPartition(
+                       String owningTaskName,
+                       int partitionIndex,
+                       ResultPartitionID partitionId,
+                       ResultPartitionType partitionType,
+                       int numSubpartitions,
+                       int numTargetKeyGroups,
+                       int networkBufferSize,
+                       ResultPartitionManager partitionManager,
+                       FileChannelManager channelManager,
+                       @Nullable BufferCompressor bufferCompressor,
+                       SupplierWithException<BufferPool, IOException> 
bufferPoolFactory) {
+
+               super(
+                       owningTaskName,
+                       partitionIndex,
+                       partitionId,
+                       partitionType,
+                       numSubpartitions,
+                       numTargetKeyGroups,
+                       partitionManager,
+                       bufferCompressor,
+                       bufferPoolFactory);
+
+               this.channelManager = checkNotNull(channelManager);
+               this.networkBufferSize = networkBufferSize;
+               this.numDataBuffers = new int[numSubpartitions];
+               this.writeBuffer = 
MemorySegmentFactory.allocateUnpooledOffHeapMemory(networkBufferSize);
+       }
+
+       @Override
+       protected void releaseInternal() {
+               synchronized (lock) {
+                       isFinished = true; // to fail writing faster
+
+                       // delete the produced file only when no reader is 
reading now
+                       if (readers.isEmpty()) {
+                               if (resultFile != null) {
+                                       resultFile.deleteQuietly();
+                                       resultFile = null;
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void emitRecord(ByteBuffer record, int targetSubpartition) 
throws IOException {
+               emit(record, targetSubpartition, DataType.DATA_BUFFER);
+       }
+
+       @Override
+       public void broadcastRecord(ByteBuffer record) throws IOException {
+               broadcast(record, DataType.DATA_BUFFER);
+       }
+
+       @Override
+       public void broadcastEvent(AbstractEvent event, boolean 
isPriorityEvent) throws IOException {
+               Buffer buffer = EventSerializer.toBuffer(event, 
isPriorityEvent);
+               try {
+                       ByteBuffer serializedEvent = 
buffer.getNioBufferReadable();
+                       broadcast(serializedEvent, buffer.getDataType());
+               } finally {
+                       buffer.recycleBuffer();
+               }
+       }
+
+       private void broadcast(ByteBuffer record, DataType dataType) throws 
IOException {
+               for (int channelIndex = 0; channelIndex < numSubpartitions; 
++channelIndex) {
+                       record.rewind();
+                       emit(record, channelIndex, dataType);
+               }
+       }
+
+       private void emit(ByteBuffer record, int targetSubpartition, DataType 
dataType) throws IOException {
+               checkInProduceState();
+
+               SortBuffer sortBuffer = getSortBuffer();
+               if (sortBuffer.append(record, targetSubpartition, dataType)) {
+                       return;
+               }
+
+               if (!sortBuffer.hasRemaining()) {
+                       // the record can not be appended to the free sort 
buffer because it is too large
+                       releaseCurrentSortBuffer();
+                       writeLargeRecord(record, targetSubpartition, dataType);
+                       return;
+               }
+
+               flushCurrentSortBuffer();
+               emit(record, targetSubpartition, dataType);
+       }
+
+       private void releaseCurrentSortBuffer() {
+               if (currentSortBuffer != null) {
+                       currentSortBuffer.release();
+                       currentSortBuffer = null;
+               }
+       }
+
+       private SortBuffer getSortBuffer() {
+               if (currentSortBuffer != null) {
+                       return currentSortBuffer;
+               }
+
+               currentSortBuffer = new PartitionSortedBuffer(bufferPool, 
numSubpartitions, networkBufferSize);
+               return currentSortBuffer;
+       }
+
+       private void flushCurrentSortBuffer() throws IOException {
+               if (currentSortBuffer == null || 
!currentSortBuffer.hasRemaining()) {
+                       releaseCurrentSortBuffer();
+                       return;
+               }
+
+               currentSortBuffer.finish();
+               PartitionedFileWriter fileWriter = getPartitionedFileWriter();
+
+               while (currentSortBuffer.hasRemaining()) {
+                       BufferWithChannel bufferWithChannel = 
currentSortBuffer.copyData(writeBuffer);
+                       Buffer buffer = bufferWithChannel.getBuffer();
+                       int subpartitionIndex = 
bufferWithChannel.getChannelIndex();
+
+                       writeCompressedBufferIfPossible(buffer, fileWriter, 
subpartitionIndex);
+                       updateStatistics(buffer, subpartitionIndex);
+               }
+
+               releaseCurrentSortBuffer();
+       }
+
+       private PartitionedFileWriter getPartitionedFileWriter() throws 
IOException {
+               if (fileWriter == null) {
+                       String basePath = 
channelManager.createChannel().getPath();
+                       fileWriter = new PartitionedFileWriter(basePath, 
numSubpartitions);
+                       fileWriter.open();
+               }
+
+               fileWriter.startNewRegion();
+               return fileWriter;
+       }
+
+       private void writeCompressedBufferIfPossible(
+                       Buffer buffer,
+                       PartitionedFileWriter fileWriter,
+                       int targetSubpartition) throws IOException {
+               if (canBeCompressed(buffer)) {
+                       buffer = 
bufferCompressor.compressToIntermediateBuffer(buffer);
+               }
+               fileWriter.writeBuffer(buffer, targetSubpartition);
+               buffer.recycleBuffer();
+       }
+
+       private void updateStatistics(Buffer buffer, int subpartitionIndex) {
+               numBuffersOut.inc();
+               numBytesOut.inc(buffer.readableBytes());
+               if (buffer.isBuffer()) {
+                       ++numDataBuffers[subpartitionIndex];
+               }
+       }
+
+       /**
+        * Spills the large record into the target {@link PartitionedFile} as a 
separate data region.
+        */
+       private void writeLargeRecord(ByteBuffer record, int 
targetSubpartition, DataType dataType) throws IOException {
+               PartitionedFileWriter fileWriter = getPartitionedFileWriter();
+
+               while (record.hasRemaining()) {
+                       int toCopy = Math.min(record.remaining(), 
writeBuffer.size());
+                       writeBuffer.put(0, record, toCopy);
+                       NetworkBuffer buffer = new NetworkBuffer(writeBuffer, 
(buf) -> {}, dataType, toCopy);
+
+                       writeCompressedBufferIfPossible(buffer, fileWriter, 
targetSubpartition);
+                       updateStatistics(buffer, targetSubpartition);
+               }
+       }
+
+       void releaseReader(SortMergeSubpartitionReader reader) {
+               synchronized (lock) {
+                       readers.remove(reader);
+
+                       // release the result partition if it has been marked 
as released
+                       if (readers.isEmpty() && isReleased()) {
+                               releaseInternal();
+                       }
+               }
+       }
+
+       @Override
+       public void finish() throws IOException {
+               checkInProduceState();
+
+               broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
+               flushCurrentSortBuffer();
+
+               synchronized (lock) {
+                       checkState(!isReleased());
+
+                       resultFile = fileWriter.finish();
+                       fileWriter = null;
+
+                       LOG.info("New partitioned file produced: {}.", 
resultFile);
+               }
+
+               super.finish();
+       }
+
+       @Override
+       public void close() {
+               releaseCurrentSortBuffer();
+
+               if (fileWriter != null) {
+                       fileWriter.releaseQuietly();

Review comment:
       It also has race condition for `fileWriter` as the similar case above. 
We can resolve this issue by defining it as final variable as commented 
https://github.com/apache/flink/pull/13595/files#r509983867




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to