zhuzhurk commented on a change in pull request #13924:
URL: https://github.com/apache/flink/pull/13924#discussion_r600316686



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -405,13 +413,8 @@ public ResultSubpartitionView createSubpartitionView(
             checkState(isFinished(), "Trying to read unfinished blocking 
partition.");
 
             SortMergeSubpartitionReader reader =
-                    new SortMergeSubpartitionReader(
-                            subpartitionIndex,
-                            numDataBuffers[subpartitionIndex],
-                            networkBufferSize,
-                            this,
-                            availabilityListener,
-                            resultFile);
+                    partitionReader.crateSubpartitionReader(
+                            this, availabilityListener, subpartitionIndex, 
resultFile);
             readers.add(reader);

Review comment:
       Could we let the `SortMergeResultPartitionReader` manage the 
`SortMergeSubpartitionReader`? 
   `releaseReader` can be moved into `SortMergeResultPartitionReader` and it 
releases the `SortMergeResultPartition` when all the readers are released.
   In this way, `SortMergeResultPartition` will not need to be aware of 
`SortMergeSubpartitionReader`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -120,8 +127,10 @@ public SortMergeResultPartition(
                 bufferCompressor,
                 bufferPoolFactory);
 
-        this.networkBufferSize = networkBufferSize;
-        this.numDataBuffers = new int[numSubpartitions];
+        this.networkBufferSize = readBufferPool.getBufferSize();
+        this.subpartitionOrder = getRandomSubpartitionOrder(numSubpartitions);

Review comment:
       could we have some comments about why we need to randomize the order?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
##########
@@ -176,21 +202,28 @@ public void resumeConsumption() {
 
     @Override
     public Throwable getFailureCause() {
-        // we can never throw an error after this was created
-        return null;
+        synchronized (lock) {
+            return failureCause;
+        }
     }
 
     @Override
     public boolean isAvailable(int numCreditsAvailable) {
-        if (numCreditsAvailable > 0) {
-            return !buffersRead.isEmpty();
-        }
+        synchronized (lock) {
+            if (isReleased) {
+                return true;
+            }
 
-        return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer();
+            if (numCreditsAvailable > 0) {
+                return !buffersRead.isEmpty();
+            }
+
+            return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer();
+        }
     }
 
     @Override
     public int unsynchronizedGetNumberOfQueuedBuffers() {
-        return 0;
+        return buffersRead.size();

Review comment:
       why don't we need a lock for it?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReader.java
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Data reader for {@link SortMergeResultPartition} which can read data for 
all downstream tasks
+ * consuming the corresponding {@link SortMergeResultPartition}. It always 
tries to read shuffle
+ * data in order of file offset, which maximums the sequential read so can 
improve the blocking
+ * shuffle performance.
+ */
+public class SortMergeResultPartitionReader implements Runnable, 
BufferRecycler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SortMergeResultPartitionReader.class);
+
+    /** Lock used to synchronize multi-thread access to thread-unsafe fields. 
*/
+    private final Object lock;
+
+    /** Buffer pool from which to allocate buffers for shuffle data reading. */
+    private final BatchShuffleReadBufferPool bufferPool;
+
+    /** Executor to run the shuffle data reading task. */
+    private final Executor ioExecutor;
+
+    /** Maximum number of buffers can be allocated by this partition reader. */
+    private final int maxRequestedBuffers;
+
+    /** All failed subpartition readers to be released. */
+    @GuardedBy("lock")
+    private final Set<SortMergeSubpartitionReader> failedReaders = new 
HashSet<>();
+
+    /** All readers waiting to read data of different subpartitions. */
+    @GuardedBy("lock")
+    private final Set<SortMergeSubpartitionReader> allReaders = new 
HashSet<>();
+
+    /** File channel shared by all subpartitions to read data from. */
+    @GuardedBy("lock")
+    private FileChannel dataFileChannel;
+
+    /** File channel shared by all subpartitions to read index from. */
+    @GuardedBy("lock")
+    private FileChannel indexFileChannel;
+
+    /** Number of buffers already allocated and still not recycled by this 
partition reader. */
+    @GuardedBy("lock")
+    private int numRequestedBuffers;
+
+    /**
+     * Whether the data reading task is currently running or not. This flag is 
used when trying to
+     * submit the data reading task.
+     */
+    @GuardedBy("lock")
+    private boolean isRunning;
+
+    /** Whether this reader has been released or not. */
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    public SortMergeResultPartitionReader(
+            BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object 
lock) {
+        this.lock = checkNotNull(lock);
+        this.bufferPool = checkNotNull(bufferPool);
+        this.ioExecutor = checkNotNull(ioExecutor);
+        // one partition reader can consume at most 32M buffers for data read, 
it is only an
+        // empirical value which can not be configured currently
+        this.maxRequestedBuffers = Math.max(1, 4 * 
bufferPool.getNumBuffersPerRequest());
+
+        bufferPool.initialize();
+    }
+
+    @Override
+    public synchronized void run() {
+        Set<SortMergeSubpartitionReader> finishedReaders = new HashSet<>();
+        Queue<SortMergeSubpartitionReader> availableReaders = 
getAvailableReaders();
+
+        Queue<MemorySegment> buffers = new ArrayDeque<>();
+        if (!availableReaders.isEmpty()) {
+            try {
+                buffers.addAll(bufferPool.requestBuffers());
+            } catch (Throwable throwable) {
+                // this can happen only when the buffer pool is destroyed or 
buffer request timeout
+                if (!(throwable instanceof TimeoutException) || 
numRequestedBuffers <= 0) {
+                    failSubpartitionReaders(availableReaders, throwable);
+                    LOG.error("Failed to request buffers for data reading.", 
throwable);
+                }
+            }
+        }
+
+        Throwable firstException = null;
+        while (!availableReaders.isEmpty() && !buffers.isEmpty()) {
+            Throwable exception = null;
+            SortMergeSubpartitionReader subpartitionReader = 
availableReaders.poll();
+
+            try {
+                if (!subpartitionReader.readBuffers(buffers, this)) {
+                    // there is no resource to release for finished readers 
currently
+                    finishedReaders.add(subpartitionReader);
+                }
+            } catch (Throwable throwable) {
+                exception = throwable;
+                firstException = firstException == null ? throwable : 
firstException;
+                LOG.debug("Failed to read shuffle data.", throwable);
+            }
+
+            if (exception != null) {
+                
failSubpartitionReaders(Collections.singletonList(subpartitionReader), 
exception);
+            }
+        }
+
+        int numBuffersRead = bufferPool.getNumBuffersPerRequest() - 
buffers.size();
+        if (buffers.size() > 0) {
+            try {
+                bufferPool.recycle(buffers);
+                buffers.clear();
+            } catch (Throwable throwable) {
+                // this should never happen so just log the error
+                LOG.error("Failed to release subpartition reader.", throwable);
+            }
+        }
+
+        removeFinishedAndFailedReaders(numBuffersRead, finishedReaders, 
failedReaders);
+
+        // only log the first exception to avoid too many logs for jobs of 
large parallelism
+        if (firstException != null) {
+            LOG.error("Encountered exception while reading shuffle data.", 
firstException);
+        }
+    }
+
+    private void failSubpartitionReaders(
+            Collection<SortMergeSubpartitionReader> readers, Throwable 
failureCause) {
+        synchronized (lock) {
+            failedReaders.addAll(readers);
+        }
+
+        for (SortMergeSubpartitionReader reader : readers) {
+            try {
+                reader.fail(failureCause);
+            } catch (Throwable throwable) {
+                // this should never happen so just log the error
+                LOG.error("Failed to release subpartition reader.", throwable);
+            }
+        }
+    }
+
+    private void removeFinishedAndFailedReaders(
+            int numBuffersRead,
+            Set<SortMergeSubpartitionReader> finishedReaders,
+            Set<SortMergeSubpartitionReader> failedReaders) {
+        synchronized (lock) {
+            for (SortMergeSubpartitionReader reader : finishedReaders) {
+                allReaders.remove(reader);
+            }
+            finishedReaders.clear();
+
+            for (SortMergeSubpartitionReader reader : failedReaders) {
+                allReaders.remove(reader);
+            }
+            failedReaders.clear();
+
+            if (allReaders.isEmpty()) {
+                closeFileChannels();
+            }
+
+            numRequestedBuffers += numBuffersRead;
+            isRunning = false;
+            mayTriggerReading();
+        }
+    }
+
+    private Queue<SortMergeSubpartitionReader> getAvailableReaders() {
+        List<SortMergeSubpartitionReader> readers;
+        synchronized (lock) {
+            if (!isReleased) {
+                return new PriorityQueue<>(allReaders);
+            }
+
+            readers = new ArrayList<>(allReaders);
+        }
+
+        failSubpartitionReaders(readers, new IllegalStateException("Partition 
has been released."));
+        return new ArrayDeque<>();
+    }
+
+    public SortMergeSubpartitionReader crateSubpartitionReader(
+            SortMergeResultPartition resultPartition,
+            BufferAvailabilityListener availabilityListener,
+            int targetSubpartition,
+            PartitionedFile resultFile)
+            throws IOException {
+        synchronized (lock) {
+            checkState(!isReleased, "Partition is already released.");
+
+            PartitionedFileReader fileReader = createFileReader(resultFile, 
targetSubpartition);
+            SortMergeSubpartitionReader subpartitionReader =
+                    new SortMergeSubpartitionReader(
+                            resultPartition, availabilityListener, fileReader);
+            allReaders.add(subpartitionReader);
+
+            mayTriggerReading();
+            return subpartitionReader;
+        }
+    }
+
+    public void releaseSubpartitionReader(SortMergeSubpartitionReader 
subpartitionReader) {
+        synchronized (lock) {
+            if (allReaders.contains(subpartitionReader)) {
+                failedReaders.add(subpartitionReader);
+            }
+        }
+    }
+
+    private PartitionedFileReader createFileReader(
+            PartitionedFile resultFile, int targetSubpartition) throws 
IOException {
+        assert Thread.holdsLock(lock);
+
+        try {
+            if (allReaders.isEmpty()) {
+                openFileChannels(resultFile);
+            }
+            return new PartitionedFileReader(
+                    resultFile, targetSubpartition, dataFileChannel, 
indexFileChannel);
+        } catch (Throwable throwable) {
+            if (allReaders.isEmpty()) {
+                closeFileChannels();
+            }
+            throw throwable;
+        }
+    }
+
+    private void openFileChannels(PartitionedFile resultFile) throws 
IOException {
+        assert Thread.holdsLock(lock);
+
+        closeFileChannels();
+        dataFileChannel = openFileChannel(resultFile.getDataFilePath());
+        indexFileChannel = openFileChannel(resultFile.getIndexFilePath());
+    }
+
+    private void closeFileChannels() {
+        assert Thread.holdsLock(lock);
+
+        IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
+        dataFileChannel = null;
+        indexFileChannel = null;
+    }
+
+    @Override
+    public void recycle(MemorySegment segment) {
+        synchronized (lock) {
+            bufferPool.recycle(segment);
+            --numRequestedBuffers;
+
+            mayTriggerReading();
+        }
+    }
+
+    private void mayTriggerReading() {
+        assert Thread.holdsLock(lock);
+
+        if (!isRunning
+                && !allReaders.isEmpty()
+                && numRequestedBuffers + bufferPool.getNumBuffersPerRequest()
+                        <= maxRequestedBuffers) {
+            isRunning = true;
+            ioExecutor.execute(this);
+        }
+    }
+
+    public void release() {
+        synchronized (lock) {
+            isReleased = true;
+        }
+    }
+
+    private FileChannel openFileChannel(Path path) throws IOException {

Review comment:
       can be static

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
##########
@@ -61,104 +56,69 @@
     /** Next data region to be read. */
     private int nextRegionToRead;
 
+    /** Next file offset to be read. */
+    private long nextOffsetToRead;
+
     /** Number of remaining buffers in the current data region read. */
     private int currentRegionRemainingBuffers;
 
-    /** Whether this partitioned file reader is closed. */
-    private boolean isClosed;
-
-    public PartitionedFileReader(PartitionedFile partitionedFile, int 
targetSubpartition)
+    public PartitionedFileReader(
+            PartitionedFile partitionedFile,
+            int targetSubpartition,
+            FileChannel dataFileChannel,
+            FileChannel indexFileChannel)
             throws IOException {
+        checkArgument(

Review comment:
       It's better to check them separately. And this check should be behind 
their `checkNotNull`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReader.java
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Data reader for {@link SortMergeResultPartition} which can read data for 
all downstream tasks
+ * consuming the corresponding {@link SortMergeResultPartition}. It always 
tries to read shuffle
+ * data in order of file offset, which maximums the sequential read so can 
improve the blocking
+ * shuffle performance.
+ */
+public class SortMergeResultPartitionReader implements Runnable, 
BufferRecycler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SortMergeResultPartitionReader.class);
+
+    /** Lock used to synchronize multi-thread access to thread-unsafe fields. 
*/
+    private final Object lock;
+
+    /** Buffer pool from which to allocate buffers for shuffle data reading. */
+    private final BatchShuffleReadBufferPool bufferPool;
+
+    /** Executor to run the shuffle data reading task. */
+    private final Executor ioExecutor;
+
+    /** Maximum number of buffers can be allocated by this partition reader. */
+    private final int maxRequestedBuffers;
+
+    /** All failed subpartition readers to be released. */
+    @GuardedBy("lock")
+    private final Set<SortMergeSubpartitionReader> failedReaders = new 
HashSet<>();
+
+    /** All readers waiting to read data of different subpartitions. */
+    @GuardedBy("lock")
+    private final Set<SortMergeSubpartitionReader> allReaders = new 
HashSet<>();
+
+    /** File channel shared by all subpartitions to read data from. */
+    @GuardedBy("lock")
+    private FileChannel dataFileChannel;
+
+    /** File channel shared by all subpartitions to read index from. */
+    @GuardedBy("lock")
+    private FileChannel indexFileChannel;
+
+    /** Number of buffers already allocated and still not recycled by this 
partition reader. */
+    @GuardedBy("lock")
+    private int numRequestedBuffers;
+
+    /**
+     * Whether the data reading task is currently running or not. This flag is 
used when trying to
+     * submit the data reading task.
+     */
+    @GuardedBy("lock")
+    private boolean isRunning;
+
+    /** Whether this reader has been released or not. */
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    public SortMergeResultPartitionReader(
+            BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object 
lock) {
+        this.lock = checkNotNull(lock);
+        this.bufferPool = checkNotNull(bufferPool);
+        this.ioExecutor = checkNotNull(ioExecutor);
+        // one partition reader can consume at most 32M buffers for data read, 
it is only an
+        // empirical value which can not be configured currently
+        this.maxRequestedBuffers = Math.max(1, 4 * 
bufferPool.getNumBuffersPerRequest());
+
+        bufferPool.initialize();
+    }
+
+    @Override
+    public synchronized void run() {
+        Set<SortMergeSubpartitionReader> finishedReaders = new HashSet<>();
+        Queue<SortMergeSubpartitionReader> availableReaders = 
getAvailableReaders();
+
+        Queue<MemorySegment> buffers = new ArrayDeque<>();
+        if (!availableReaders.isEmpty()) {
+            try {
+                buffers.addAll(bufferPool.requestBuffers());
+            } catch (Throwable throwable) {
+                // this can happen only when the buffer pool is destroyed or 
buffer request timeout
+                if (!(throwable instanceof TimeoutException) || 
numRequestedBuffers <= 0) {
+                    failSubpartitionReaders(availableReaders, throwable);
+                    LOG.error("Failed to request buffers for data reading.", 
throwable);
+                }
+            }
+        }
+
+        Throwable firstException = null;
+        while (!availableReaders.isEmpty() && !buffers.isEmpty()) {
+            Throwable exception = null;
+            SortMergeSubpartitionReader subpartitionReader = 
availableReaders.poll();
+
+            try {
+                if (!subpartitionReader.readBuffers(buffers, this)) {
+                    // there is no resource to release for finished readers 
currently
+                    finishedReaders.add(subpartitionReader);
+                }
+            } catch (Throwable throwable) {
+                exception = throwable;
+                firstException = firstException == null ? throwable : 
firstException;
+                LOG.debug("Failed to read shuffle data.", throwable);
+            }
+
+            if (exception != null) {
+                
failSubpartitionReaders(Collections.singletonList(subpartitionReader), 
exception);
+            }
+        }
+
+        int numBuffersRead = bufferPool.getNumBuffersPerRequest() - 
buffers.size();
+        if (buffers.size() > 0) {
+            try {
+                bufferPool.recycle(buffers);
+                buffers.clear();
+            } catch (Throwable throwable) {
+                // this should never happen so just log the error
+                LOG.error("Failed to release subpartition reader.", throwable);
+            }
+        }
+
+        removeFinishedAndFailedReaders(numBuffersRead, finishedReaders, 
failedReaders);
+
+        // only log the first exception to avoid too many logs for jobs of 
large parallelism
+        if (firstException != null) {
+            LOG.error("Encountered exception while reading shuffle data.", 
firstException);
+        }
+    }
+
+    private void failSubpartitionReaders(
+            Collection<SortMergeSubpartitionReader> readers, Throwable 
failureCause) {
+        synchronized (lock) {
+            failedReaders.addAll(readers);
+        }
+
+        for (SortMergeSubpartitionReader reader : readers) {
+            try {
+                reader.fail(failureCause);
+            } catch (Throwable throwable) {
+                // this should never happen so just log the error
+                LOG.error("Failed to release subpartition reader.", throwable);
+            }
+        }
+    }
+
+    private void removeFinishedAndFailedReaders(
+            int numBuffersRead,
+            Set<SortMergeSubpartitionReader> finishedReaders,
+            Set<SortMergeSubpartitionReader> failedReaders) {
+        synchronized (lock) {
+            for (SortMergeSubpartitionReader reader : finishedReaders) {
+                allReaders.remove(reader);
+            }
+            finishedReaders.clear();
+
+            for (SortMergeSubpartitionReader reader : failedReaders) {
+                allReaders.remove(reader);
+            }
+            failedReaders.clear();
+
+            if (allReaders.isEmpty()) {
+                closeFileChannels();
+            }
+
+            numRequestedBuffers += numBuffersRead;
+            isRunning = false;
+            mayTriggerReading();
+        }
+    }
+
+    private Queue<SortMergeSubpartitionReader> getAvailableReaders() {
+        List<SortMergeSubpartitionReader> readers;
+        synchronized (lock) {
+            if (!isReleased) {
+                return new PriorityQueue<>(allReaders);
+            }
+
+            readers = new ArrayList<>(allReaders);
+        }
+
+        failSubpartitionReaders(readers, new IllegalStateException("Partition 
has been released."));
+        return new ArrayDeque<>();
+    }
+
+    public SortMergeSubpartitionReader crateSubpartitionReader(
+            SortMergeResultPartition resultPartition,
+            BufferAvailabilityListener availabilityListener,
+            int targetSubpartition,
+            PartitionedFile resultFile)
+            throws IOException {
+        synchronized (lock) {
+            checkState(!isReleased, "Partition is already released.");
+
+            PartitionedFileReader fileReader = createFileReader(resultFile, 
targetSubpartition);
+            SortMergeSubpartitionReader subpartitionReader =
+                    new SortMergeSubpartitionReader(
+                            resultPartition, availabilityListener, fileReader);
+            allReaders.add(subpartitionReader);
+
+            mayTriggerReading();
+            return subpartitionReader;
+        }
+    }
+
+    public void releaseSubpartitionReader(SortMergeSubpartitionReader 
subpartitionReader) {
+        synchronized (lock) {
+            if (allReaders.contains(subpartitionReader)) {
+                failedReaders.add(subpartitionReader);
+            }
+        }
+    }
+
+    private PartitionedFileReader createFileReader(
+            PartitionedFile resultFile, int targetSubpartition) throws 
IOException {
+        assert Thread.holdsLock(lock);
+
+        try {
+            if (allReaders.isEmpty()) {
+                openFileChannels(resultFile);
+            }
+            return new PartitionedFileReader(
+                    resultFile, targetSubpartition, dataFileChannel, 
indexFileChannel);
+        } catch (Throwable throwable) {
+            if (allReaders.isEmpty()) {
+                closeFileChannels();
+            }
+            throw throwable;
+        }
+    }
+
+    private void openFileChannels(PartitionedFile resultFile) throws 
IOException {
+        assert Thread.holdsLock(lock);
+
+        closeFileChannels();
+        dataFileChannel = openFileChannel(resultFile.getDataFilePath());
+        indexFileChannel = openFileChannel(resultFile.getIndexFilePath());
+    }
+
+    private void closeFileChannels() {
+        assert Thread.holdsLock(lock);
+
+        IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
+        dataFileChannel = null;
+        indexFileChannel = null;
+    }
+
+    @Override
+    public void recycle(MemorySegment segment) {
+        synchronized (lock) {
+            bufferPool.recycle(segment);
+            --numRequestedBuffers;
+
+            mayTriggerReading();
+        }
+    }
+
+    private void mayTriggerReading() {
+        assert Thread.holdsLock(lock);
+
+        if (!isRunning
+                && !allReaders.isEmpty()
+                && numRequestedBuffers + bufferPool.getNumBuffersPerRequest()
+                        <= maxRequestedBuffers) {
+            isRunning = true;
+            ioExecutor.execute(this);
+        }
+    }
+
+    public void release() {

Review comment:
       can be package private

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -455,4 +458,28 @@ public int getNumberOfQueuedBuffers(int 
targetSubpartition) {
     PartitionedFile getResultFile() {
         return resultFile;
     }
+
+    @VisibleForTesting
+    SortMergeResultPartitionReader getPartitionReader() {
+        return partitionReader;
+    }
+
+    private int[] getRandomSubpartitionOrder(int numSubpartitions) {
+        Random random = new Random();
+
+        int shift = random.nextInt(numSubpartitions);

Review comment:
       IIUC, this shift does not make any difference after the following 
swapping.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReader.java
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Data reader for {@link SortMergeResultPartition} which can read data for 
all downstream tasks
+ * consuming the corresponding {@link SortMergeResultPartition}. It always 
tries to read shuffle
+ * data in order of file offset, which maximums the sequential read so can 
improve the blocking
+ * shuffle performance.
+ */
+public class SortMergeResultPartitionReader implements Runnable, 
BufferRecycler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SortMergeResultPartitionReader.class);
+
+    /** Lock used to synchronize multi-thread access to thread-unsafe fields. 
*/
+    private final Object lock;
+
+    /** Buffer pool from which to allocate buffers for shuffle data reading. */
+    private final BatchShuffleReadBufferPool bufferPool;
+
+    /** Executor to run the shuffle data reading task. */
+    private final Executor ioExecutor;
+
+    /** Maximum number of buffers can be allocated by this partition reader. */
+    private final int maxRequestedBuffers;
+
+    /** All failed subpartition readers to be released. */
+    @GuardedBy("lock")
+    private final Set<SortMergeSubpartitionReader> failedReaders = new 
HashSet<>();
+
+    /** All readers waiting to read data of different subpartitions. */
+    @GuardedBy("lock")
+    private final Set<SortMergeSubpartitionReader> allReaders = new 
HashSet<>();
+
+    /** File channel shared by all subpartitions to read data from. */
+    @GuardedBy("lock")
+    private FileChannel dataFileChannel;
+
+    /** File channel shared by all subpartitions to read index from. */
+    @GuardedBy("lock")
+    private FileChannel indexFileChannel;
+
+    /** Number of buffers already allocated and still not recycled by this 
partition reader. */
+    @GuardedBy("lock")
+    private int numRequestedBuffers;
+
+    /**
+     * Whether the data reading task is currently running or not. This flag is 
used when trying to
+     * submit the data reading task.
+     */
+    @GuardedBy("lock")
+    private boolean isRunning;
+
+    /** Whether this reader has been released or not. */
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    public SortMergeResultPartitionReader(
+            BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object 
lock) {
+        this.lock = checkNotNull(lock);
+        this.bufferPool = checkNotNull(bufferPool);
+        this.ioExecutor = checkNotNull(ioExecutor);
+        // one partition reader can consume at most 32M buffers for data read, 
it is only an
+        // empirical value which can not be configured currently
+        this.maxRequestedBuffers = Math.max(1, 4 * 
bufferPool.getNumBuffersPerRequest());

Review comment:
       I think `4` is the magic number to explain instead of `32M` which is a 
bit confusing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReader.java
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Data reader for {@link SortMergeResultPartition} which can read data for 
all downstream tasks
+ * consuming the corresponding {@link SortMergeResultPartition}. It always 
tries to read shuffle
+ * data in order of file offset, which maximums the sequential read so can 
improve the blocking
+ * shuffle performance.
+ */
+public class SortMergeResultPartitionReader implements Runnable, 
BufferRecycler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SortMergeResultPartitionReader.class);
+
+    /** Lock used to synchronize multi-thread access to thread-unsafe fields. 
*/
+    private final Object lock;
+
+    /** Buffer pool from which to allocate buffers for shuffle data reading. */
+    private final BatchShuffleReadBufferPool bufferPool;
+
+    /** Executor to run the shuffle data reading task. */
+    private final Executor ioExecutor;
+
+    /** Maximum number of buffers can be allocated by this partition reader. */
+    private final int maxRequestedBuffers;
+
+    /** All failed subpartition readers to be released. */
+    @GuardedBy("lock")
+    private final Set<SortMergeSubpartitionReader> failedReaders = new 
HashSet<>();
+
+    /** All readers waiting to read data of different subpartitions. */
+    @GuardedBy("lock")
+    private final Set<SortMergeSubpartitionReader> allReaders = new 
HashSet<>();
+
+    /** File channel shared by all subpartitions to read data from. */
+    @GuardedBy("lock")
+    private FileChannel dataFileChannel;
+
+    /** File channel shared by all subpartitions to read index from. */
+    @GuardedBy("lock")
+    private FileChannel indexFileChannel;
+
+    /** Number of buffers already allocated and still not recycled by this 
partition reader. */
+    @GuardedBy("lock")
+    private int numRequestedBuffers;
+
+    /**
+     * Whether the data reading task is currently running or not. This flag is 
used when trying to
+     * submit the data reading task.
+     */
+    @GuardedBy("lock")
+    private boolean isRunning;
+
+    /** Whether this reader has been released or not. */
+    @GuardedBy("lock")
+    private boolean isReleased;
+
+    public SortMergeResultPartitionReader(
+            BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object 
lock) {
+        this.lock = checkNotNull(lock);
+        this.bufferPool = checkNotNull(bufferPool);
+        this.ioExecutor = checkNotNull(ioExecutor);
+        // one partition reader can consume at most 32M buffers for data read, 
it is only an
+        // empirical value which can not be configured currently
+        this.maxRequestedBuffers = Math.max(1, 4 * 
bufferPool.getNumBuffersPerRequest());
+
+        bufferPool.initialize();

Review comment:
       why is it needed to eagerly initialize the pool here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -455,4 +458,28 @@ public int getNumberOfQueuedBuffers(int 
targetSubpartition) {
     PartitionedFile getResultFile() {
         return resultFile;
     }
+
+    @VisibleForTesting
+    SortMergeResultPartitionReader getPartitionReader() {
+        return partitionReader;
+    }
+
+    private int[] getRandomSubpartitionOrder(int numSubpartitions) {
+        Random random = new Random();
+
+        int shift = random.nextInt(numSubpartitions);
+        int[] subpartitionReadOrder = new int[numSubpartitions];
+        for (int channel = 0; channel < numSubpartitions; ++channel) {
+            subpartitionReadOrder[channel] = (channel + shift) % 
numSubpartitions;
+        }
+
+        for (int channel = numSubpartitions; channel > 1; --channel) {
+            int channelToSwap = random.nextInt(channel);

Review comment:
       how about let `channel` be the current channel and `channelToSwap` to be 
`random.nextInt(channel + 1)`?
   I think it will be easier for understanding.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
##########
@@ -61,104 +56,69 @@
     /** Next data region to be read. */
     private int nextRegionToRead;
 
+    /** Next file offset to be read. */
+    private long nextOffsetToRead;
+
     /** Number of remaining buffers in the current data region read. */
     private int currentRegionRemainingBuffers;
 
-    /** Whether this partitioned file reader is closed. */
-    private boolean isClosed;
-
-    public PartitionedFileReader(PartitionedFile partitionedFile, int 
targetSubpartition)
+    public PartitionedFileReader(
+            PartitionedFile partitionedFile,
+            int targetSubpartition,
+            FileChannel dataFileChannel,
+            FileChannel indexFileChannel)
             throws IOException {
+        checkArgument(
+                dataFileChannel.isOpen() && indexFileChannel.isOpen(),
+                "Both data file channel and index file channel must be 
opened.");
+
         this.partitionedFile = checkNotNull(partitionedFile);
         this.targetSubpartition = targetSubpartition;
+        this.dataFileChannel = checkNotNull(dataFileChannel);
+        this.indexFileChannel = checkNotNull(indexFileChannel);
 
         this.indexEntryBuf = 
ByteBuffer.allocateDirect(PartitionedFile.INDEX_ENTRY_SIZE);
         BufferReaderWriterUtil.configureByteBuffer(indexEntryBuf);
-
-        this.dataFileChannel = 
openFileChannel(partitionedFile.getDataFilePath());
-        try {
-            this.indexFileChannel = 
openFileChannel(partitionedFile.getIndexFilePath());
-        } catch (Throwable throwable) {
-            IOUtils.closeQuietly(dataFileChannel);
-            throw throwable;
-        }
     }
 
-    private FileChannel openFileChannel(Path path) throws IOException {
-        return FileChannel.open(path, StandardOpenOption.READ);
-    }
-
-    private boolean moveToNextReadableRegion() throws IOException {
-        if (currentRegionRemainingBuffers > 0) {
-            return true;
-        }
-
-        while (nextRegionToRead < partitionedFile.getNumRegions()) {
+    private void moveToNextReadableRegion() throws IOException {
+        while (currentRegionRemainingBuffers <= 0
+                && nextRegionToRead < partitionedFile.getNumRegions()) {
             partitionedFile.getIndexEntry(
                     indexFileChannel, indexEntryBuf, nextRegionToRead, 
targetSubpartition);
-            long dataOffset = indexEntryBuf.getLong();
+            nextOffsetToRead = indexEntryBuf.getLong();
             currentRegionRemainingBuffers = indexEntryBuf.getInt();
             ++nextRegionToRead;
-
-            if (currentRegionRemainingBuffers > 0) {
-                dataFileChannel.position(dataOffset);
-                return true;
-            }
         }
-
-        return false;
     }
 
     /**
-     * Reads a buffer from the {@link PartitionedFile} and moves the read 
position forward.
+     * Reads a buffer from the current region of the target {@link 
PartitionedFile} and moves the
+     * read position forward.
      *
      * <p>Note: The caller is responsible for recycling the target buffer if 
any exception occurs.

Review comment:
       Maybe add some comments to explain the params and returns for this 
public method?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
##########
@@ -61,104 +56,69 @@
     /** Next data region to be read. */
     private int nextRegionToRead;
 
+    /** Next file offset to be read. */
+    private long nextOffsetToRead;
+
     /** Number of remaining buffers in the current data region read. */
     private int currentRegionRemainingBuffers;
 
-    /** Whether this partitioned file reader is closed. */
-    private boolean isClosed;
-
-    public PartitionedFileReader(PartitionedFile partitionedFile, int 
targetSubpartition)
+    public PartitionedFileReader(
+            PartitionedFile partitionedFile,
+            int targetSubpartition,
+            FileChannel dataFileChannel,
+            FileChannel indexFileChannel)
             throws IOException {

Review comment:
       I think there's no need to throw `IOException` now

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
##########
@@ -176,21 +202,28 @@ public void resumeConsumption() {
 
     @Override
     public Throwable getFailureCause() {
-        // we can never throw an error after this was created
-        return null;
+        synchronized (lock) {
+            return failureCause;
+        }
     }
 
     @Override
     public boolean isAvailable(int numCreditsAvailable) {
-        if (numCreditsAvailable > 0) {
-            return !buffersRead.isEmpty();
-        }
+        synchronized (lock) {
+            if (isReleased) {
+                return true;
+            }
 
-        return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer();
+            if (numCreditsAvailable > 0) {
+                return !buffersRead.isEmpty();
+            }
+
+            return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer();
+        }
     }
 
     @Override
     public int unsynchronizedGetNumberOfQueuedBuffers() {

Review comment:
       It's out of the scope of this change. But I think we need some java docs 
for the methods of `ResultSubpartitionView`. At the moment I can hardly 
understand the definitions of some of them without looking into their 
invocations.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
##########
@@ -61,104 +56,69 @@
     /** Next data region to be read. */
     private int nextRegionToRead;
 
+    /** Next file offset to be read. */
+    private long nextOffsetToRead;
+
     /** Number of remaining buffers in the current data region read. */
     private int currentRegionRemainingBuffers;
 
-    /** Whether this partitioned file reader is closed. */
-    private boolean isClosed;
-
-    public PartitionedFileReader(PartitionedFile partitionedFile, int 
targetSubpartition)
+    public PartitionedFileReader(
+            PartitionedFile partitionedFile,
+            int targetSubpartition,
+            FileChannel dataFileChannel,
+            FileChannel indexFileChannel)
             throws IOException {
+        checkArgument(
+                dataFileChannel.isOpen() && indexFileChannel.isOpen(),
+                "Both data file channel and index file channel must be 
opened.");
+
         this.partitionedFile = checkNotNull(partitionedFile);
         this.targetSubpartition = targetSubpartition;
+        this.dataFileChannel = checkNotNull(dataFileChannel);
+        this.indexFileChannel = checkNotNull(indexFileChannel);
 
         this.indexEntryBuf = 
ByteBuffer.allocateDirect(PartitionedFile.INDEX_ENTRY_SIZE);
         BufferReaderWriterUtil.configureByteBuffer(indexEntryBuf);
-
-        this.dataFileChannel = 
openFileChannel(partitionedFile.getDataFilePath());
-        try {
-            this.indexFileChannel = 
openFileChannel(partitionedFile.getIndexFilePath());
-        } catch (Throwable throwable) {
-            IOUtils.closeQuietly(dataFileChannel);
-            throw throwable;
-        }
     }
 
-    private FileChannel openFileChannel(Path path) throws IOException {
-        return FileChannel.open(path, StandardOpenOption.READ);
-    }
-
-    private boolean moveToNextReadableRegion() throws IOException {
-        if (currentRegionRemainingBuffers > 0) {
-            return true;
-        }
-
-        while (nextRegionToRead < partitionedFile.getNumRegions()) {
+    private void moveToNextReadableRegion() throws IOException {
+        while (currentRegionRemainingBuffers <= 0
+                && nextRegionToRead < partitionedFile.getNumRegions()) {
             partitionedFile.getIndexEntry(
                     indexFileChannel, indexEntryBuf, nextRegionToRead, 
targetSubpartition);
-            long dataOffset = indexEntryBuf.getLong();
+            nextOffsetToRead = indexEntryBuf.getLong();
             currentRegionRemainingBuffers = indexEntryBuf.getInt();
             ++nextRegionToRead;
-
-            if (currentRegionRemainingBuffers > 0) {
-                dataFileChannel.position(dataOffset);
-                return true;
-            }
         }
-
-        return false;
     }
 
     /**
-     * Reads a buffer from the {@link PartitionedFile} and moves the read 
position forward.
+     * Reads a buffer from the current region of the target {@link 
PartitionedFile} and moves the
+     * read position forward.
      *
      * <p>Note: The caller is responsible for recycling the target buffer if 
any exception occurs.
      */
     @Nullable
-    public Buffer readBuffer(MemorySegment target, BufferRecycler recycler) 
throws IOException {
-        checkState(!isClosed, "File reader is already closed.");
-
-        if (moveToNextReadableRegion()) {
-            --currentRegionRemainingBuffers;
-            return readFromByteChannel(dataFileChannel, headerBuf, target, 
recycler);
+    public Buffer readCurrentRegion(MemorySegment target, BufferRecycler 
recycler)
+            throws IOException {
+        if (currentRegionRemainingBuffers == 0) {

Review comment:
       For such a non-private method, I think it's better to add a 
`moveToNextReadableRegion()` in ahead, so that `PartitionedFileReader` does not 
rely on others to advance its offsets.
   B.T.W I think all the public methods of this class can be package private. 
And this class can also be package private.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to