xintongsong commented on code in PR #19960:
URL: https://github.com/apache/flink/pull/19960#discussion_r915383242


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/SubpartitionFileReader.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.Queue;
+
+public interface SubpartitionFileReader extends 
Comparable<SubpartitionFileReader> {

Review Comment:
   We should name this interface `HsSubpartitionFileReader`, and the 
implementation class `HsSubpartitionFileReaderImpl`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Deque;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This component is responsible for reading data from disk for a specific 
subpartition.
+ *
+ * <p>In order to access the disk as sequentially as possible {@link 
HsSubpartitionFileReader} need
+ * to be able to compare priorities.
+ *
+ * <p>Note: This class is not thread safe.
+ */
+public class HsSubpartitionFileReader implements SubpartitionFileReader {
+
+    private final ByteBuffer headerBuf = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+
+    private final int subpartitionId;
+
+    private final FileChannel dataFileChannel;
+
+    private final HsSubpartitionViewInternalOperations operations;
+
+    private final CachedRegionManager cachedRegionManager;
+
+    private final BufferIndexManager bufferIndexManager;
+
+    private final Deque<BufferIndexOrError> loadedBuffers = new 
LinkedBlockingDeque<>();
+
+    private volatile boolean isFailed;
+
+    public HsSubpartitionFileReader(
+            int subpartitionId,
+            FileChannel dataFileChannel,
+            HsSubpartitionViewInternalOperations operations,
+            HsFileDataIndex dataIndex,
+            int maxBufferReadAhead) {
+        this.subpartitionId = subpartitionId;
+        this.dataFileChannel = dataFileChannel;
+        this.operations = operations;
+        this.bufferIndexManager = new BufferIndexManager(maxBufferReadAhead);
+        this.cachedRegionManager = new CachedRegionManager(subpartitionId, 
dataIndex);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        HsSubpartitionFileReader that = (HsSubpartitionFileReader) o;
+        return subpartitionId == that.subpartitionId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(subpartitionId);
+    }
+
+    /**
+     * Read subpartition data into buffers.
+     *
+     * <p>This transfers the ownership of used buffers to this class. It's 
this class'
+     * responsibility to release the buffers using the recycler when no longer 
needed.
+     *
+     * <p>Calling this method does not always use up all the provided buffers. 
It's this class'
+     * decision when to stop reading. Currently, it stops reading when: 1) 
buffers are used up, or
+     * 2) reaches the end of the subpartition data within the region, or 3) 
enough data have been
+     * read ahead the downstream consuming offset.
+     */
+    @Override
+    public synchronized void readBuffers(Queue<MemorySegment> buffers, 
BufferRecycler recycler)
+            throws IOException {
+        if (isFailed) {
+            throw new IOException("subpartition reader has already failed.");
+        }
+        int firstBufferToLoad = bufferIndexManager.getNextToLoad();
+        if (firstBufferToLoad < 0) {
+            return;
+        }
+
+        // If lookup result is empty, it means that one the following things 
have happened:
+        // 1) The target buffer has not been spilled into disk.
+        // 2) The target buffer has not been released from memory.
+        // So, just skip this round reading.
+        int numRemainingBuffer = 
cachedRegionManager.getRemainingBuffersInRegion(firstBufferToLoad);
+        if (numRemainingBuffer == 0) {
+            return;
+        }
+        moveFileOffsetToBuffer(firstBufferToLoad);
+
+        int indexToLoad;
+        int numLoaded = 0;
+        while (!buffers.isEmpty()
+                && (indexToLoad = bufferIndexManager.getNextToLoad()) >= 0
+                && numRemainingBuffer-- > 0) {
+            MemorySegment segment = buffers.poll();
+            Buffer buffer;
+            try {
+                if ((buffer = readFromByteChannel(dataFileChannel, headerBuf, 
segment, recycler))
+                        == null) {
+                    buffers.add(segment);
+                    break;
+                }
+            } catch (Throwable throwable) {
+                buffers.add(segment);
+                throw throwable;
+            }
+
+            loadedBuffers.add(BufferIndexOrError.newBuffer(buffer, 
indexToLoad));
+            bufferIndexManager.updateLastLoaded(indexToLoad);
+            cachedRegionManager.advance(
+                    buffer.readableBytes() + 
BufferReaderWriterUtil.HEADER_LENGTH);
+            ++numLoaded;
+        }
+
+        if (loadedBuffers.size() <= numLoaded) {
+            operations.notifyDataAvailableFromDisk();
+        }
+    }
+
+    @Override
+    public synchronized void fail(Throwable failureCause) {
+        if (isFailed) {
+            return;
+        }
+        isFailed = true;
+        BufferIndexOrError bufferIndexOrError;
+        // empty from tail, in-case subpartition view consumes concurrently 
and gets the wrong order
+        while ((bufferIndexOrError = loadedBuffers.pollLast()) != null) {
+            if (bufferIndexOrError.getBuffer().isPresent()) {
+                checkNotNull(bufferIndexOrError.buffer).recycleBuffer();
+            }
+        }
+
+        loadedBuffers.add(BufferIndexOrError.newError(failureCause));
+        operations.notifyDataAvailableFromDisk();
+    }
+
+    @Override
+    public void prepareForScheduling() {
+        updateConsumptionProgress();
+    }
+
+    public Deque<BufferIndexOrError> getLoadedBuffers() {
+        return loadedBuffers;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    /** Refresh downstream consumption progress for another round scheduling 
of reading. */
+    private void updateConsumptionProgress() {
+        bufferIndexManager.updateLastConsumed(operations.getConsumingOffset());
+    }
+
+    private void moveFileOffsetToBuffer(int bufferIndex) throws IOException {
+        Tuple2<Integer, Long> indexAndOffset =
+                cachedRegionManager.getNumSkipAndFileOffset(bufferIndex);
+        dataFileChannel.position(indexAndOffset.f1);
+        for (int i = 0; i < indexAndOffset.f0; ++i) {
+            positionToNextBuffer(dataFileChannel, headerBuf);
+        }
+        cachedRegionManager.skipAll(dataFileChannel.position());
+    }
+
+    /** Returns Long.MAX_VALUE if it shouldn't load. */
+    private long getNextOffsetToLoad() {
+        int bufferIndex = bufferIndexManager.getNextToLoad();
+        if (bufferIndex < 0) {
+            return Long.MAX_VALUE;
+        } else {
+            return cachedRegionManager.getFileOffset(bufferIndex);
+        }
+    }
+
+    /** Provides priority calculation logic for io scheduler. */
+    @Override
+    public int compareTo(SubpartitionFileReader that) {
+        checkArgument(that instanceof HsSubpartitionFileReader);
+        return Long.compare(
+                getNextOffsetToLoad(), ((HsSubpartitionFileReader) 
that).getNextOffsetToLoad());
+    }
+
+    /** Indicates a buffer with index or an error. */
+    public static class BufferIndexOrError {
+        @Nullable private final Buffer buffer;
+        private final int index;
+        @Nullable private final Throwable throwable;
+
+        private BufferIndexOrError(
+                @Nullable Buffer buffer, int index, @Nullable Throwable 
throwable) {
+            this.buffer = buffer;
+            this.index = index;
+            this.throwable = throwable;
+        }
+
+        public Buffer.DataType getDataType() {
+            return buffer == null ? Buffer.DataType.NONE : 
buffer.getDataType();
+        }
+
+        private static BufferIndexOrError newError(Throwable throwable) {
+            return new BufferIndexOrError(null, -1, checkNotNull(throwable));
+        }
+
+        private static BufferIndexOrError newBuffer(Buffer buffer, int index) {
+            return new BufferIndexOrError(checkNotNull(buffer), index, null);
+        }
+
+        public Optional<Buffer> getBuffer() {
+            return Optional.ofNullable(buffer);
+        }
+
+        public Optional<Throwable> getThrowable() {
+            return Optional.ofNullable(throwable);
+        }
+
+        public int getIndex() {
+            return index;

Review Comment:
   We probably should check `buffer != null` here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Deque;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This component is responsible for reading data from disk for a specific 
subpartition.
+ *
+ * <p>In order to access the disk as sequentially as possible {@link 
HsSubpartitionFileReader} need
+ * to be able to compare priorities.
+ *
+ * <p>Note: This class is not thread safe.
+ */
+public class HsSubpartitionFileReader implements SubpartitionFileReader {
+
+    private final ByteBuffer headerBuf = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+
+    private final int subpartitionId;
+
+    private final FileChannel dataFileChannel;
+
+    private final HsSubpartitionViewInternalOperations operations;
+
+    private final CachedRegionManager cachedRegionManager;
+
+    private final BufferIndexManager bufferIndexManager;
+
+    private final Deque<BufferIndexOrError> loadedBuffers = new 
LinkedBlockingDeque<>();
+
+    private volatile boolean isFailed;
+
+    public HsSubpartitionFileReader(
+            int subpartitionId,
+            FileChannel dataFileChannel,
+            HsSubpartitionViewInternalOperations operations,
+            HsFileDataIndex dataIndex,
+            int maxBufferReadAhead) {
+        this.subpartitionId = subpartitionId;
+        this.dataFileChannel = dataFileChannel;
+        this.operations = operations;
+        this.bufferIndexManager = new BufferIndexManager(maxBufferReadAhead);
+        this.cachedRegionManager = new CachedRegionManager(subpartitionId, 
dataIndex);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        HsSubpartitionFileReader that = (HsSubpartitionFileReader) o;
+        return subpartitionId == that.subpartitionId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(subpartitionId);
+    }
+
+    /**
+     * Read subpartition data into buffers.
+     *
+     * <p>This transfers the ownership of used buffers to this class. It's 
this class'
+     * responsibility to release the buffers using the recycler when no longer 
needed.
+     *
+     * <p>Calling this method does not always use up all the provided buffers. 
It's this class'
+     * decision when to stop reading. Currently, it stops reading when: 1) 
buffers are used up, or
+     * 2) reaches the end of the subpartition data within the region, or 3) 
enough data have been
+     * read ahead the downstream consuming offset.
+     */
+    @Override
+    public synchronized void readBuffers(Queue<MemorySegment> buffers, 
BufferRecycler recycler)
+            throws IOException {
+        if (isFailed) {
+            throw new IOException("subpartition reader has already failed.");
+        }
+        int firstBufferToLoad = bufferIndexManager.getNextToLoad();
+        if (firstBufferToLoad < 0) {
+            return;
+        }
+
+        // If lookup result is empty, it means that one the following things 
have happened:
+        // 1) The target buffer has not been spilled into disk.
+        // 2) The target buffer has not been released from memory.
+        // So, just skip this round reading.
+        int numRemainingBuffer = 
cachedRegionManager.getRemainingBuffersInRegion(firstBufferToLoad);
+        if (numRemainingBuffer == 0) {
+            return;
+        }
+        moveFileOffsetToBuffer(firstBufferToLoad);
+
+        int indexToLoad;
+        int numLoaded = 0;
+        while (!buffers.isEmpty()
+                && (indexToLoad = bufferIndexManager.getNextToLoad()) >= 0
+                && numRemainingBuffer-- > 0) {
+            MemorySegment segment = buffers.poll();
+            Buffer buffer;
+            try {
+                if ((buffer = readFromByteChannel(dataFileChannel, headerBuf, 
segment, recycler))
+                        == null) {
+                    buffers.add(segment);
+                    break;
+                }
+            } catch (Throwable throwable) {
+                buffers.add(segment);
+                throw throwable;
+            }
+
+            loadedBuffers.add(BufferIndexOrError.newBuffer(buffer, 
indexToLoad));
+            bufferIndexManager.updateLastLoaded(indexToLoad);
+            cachedRegionManager.advance(
+                    buffer.readableBytes() + 
BufferReaderWriterUtil.HEADER_LENGTH);
+            ++numLoaded;
+        }
+
+        if (loadedBuffers.size() <= numLoaded) {
+            operations.notifyDataAvailableFromDisk();
+        }
+    }
+
+    @Override
+    public synchronized void fail(Throwable failureCause) {
+        if (isFailed) {
+            return;
+        }
+        isFailed = true;
+        BufferIndexOrError bufferIndexOrError;
+        // empty from tail, in-case subpartition view consumes concurrently 
and gets the wrong order
+        while ((bufferIndexOrError = loadedBuffers.pollLast()) != null) {
+            if (bufferIndexOrError.getBuffer().isPresent()) {
+                checkNotNull(bufferIndexOrError.buffer).recycleBuffer();
+            }
+        }
+
+        loadedBuffers.add(BufferIndexOrError.newError(failureCause));
+        operations.notifyDataAvailableFromDisk();
+    }
+
+    @Override
+    public void prepareForScheduling() {
+        updateConsumptionProgress();
+    }
+
+    public Deque<BufferIndexOrError> getLoadedBuffers() {
+        return loadedBuffers;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    /** Refresh downstream consumption progress for another round scheduling 
of reading. */
+    private void updateConsumptionProgress() {
+        bufferIndexManager.updateLastConsumed(operations.getConsumingOffset());

Review Comment:
   Why not just move this statement into `prepareForScheduling`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    @Test
+    void testBufferReleasedTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        Set<MemorySegment> memorySegmentSets = new HashSet<>();
+        AtomicReference<BufferRecycler> recycleRef = new 
AtomicReference<>(null);
+        factory.setReadBuffersConsumer(
+                (requestedBuffer, recycle) -> {
+                    while (!requestedBuffer.isEmpty()) {
+                        memorySegmentSets.add(requestedBuffer.poll());
+                    }
+                    recycleRef.set(recycle);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(recycleRef).isNotNull();
+        assertThat(memorySegmentSets).hasSize(2);
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        recycleRef.get().recycle(memorySegmentSets.iterator().next());
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    /** Test all not used buffers will be released after run method finish. */
+    @Test
+    void testRunReleaseAllBuffers() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);
+        factory.setPrepareForSchedulingRunnable(() -> 
prepareForSchedulingFinished.set(true));
+        factory.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(prepareForSchedulingFinished).isTrue();
+                    assertThat(buffers).hasSize(2);
+                    // poll one buffer, return another buffer to scheduler.
+                    buffers.poll();
+                    assertThat(bufferPool.getAvailableBuffers()).isEqualTo(0);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        // not used buffer should be recycled.
+        assertThat(bufferPool.getAvailableBuffers()).isEqualTo(1);

Review Comment:
   Here it can be `isEqualTo(BUFFER_POOL_SIZE - 1)`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    @Test
+    void testBufferReleasedTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        Set<MemorySegment> memorySegmentSets = new HashSet<>();
+        AtomicReference<BufferRecycler> recycleRef = new 
AtomicReference<>(null);
+        factory.setReadBuffersConsumer(
+                (requestedBuffer, recycle) -> {
+                    while (!requestedBuffer.isEmpty()) {
+                        memorySegmentSets.add(requestedBuffer.poll());
+                    }
+                    recycleRef.set(recycle);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(recycleRef).isNotNull();
+        assertThat(memorySegmentSets).hasSize(2);
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        recycleRef.get().recycle(memorySegmentSets.iterator().next());
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    /** Test all not used buffers will be released after run method finish. */
+    @Test
+    void testRunReleaseAllBuffers() throws Exception {

Review Comment:
   `testRunReleaseUnusedBuffers`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/SubpartitionFileReader.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.Queue;
+
+public interface SubpartitionFileReader extends 
Comparable<SubpartitionFileReader> {

Review Comment:
   And we need JavaDoc for this interface and its methods.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    @Test
+    void testBufferReleasedTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        Set<MemorySegment> memorySegmentSets = new HashSet<>();
+        AtomicReference<BufferRecycler> recycleRef = new 
AtomicReference<>(null);
+        factory.setReadBuffersConsumer(
+                (requestedBuffer, recycle) -> {
+                    while (!requestedBuffer.isEmpty()) {
+                        memorySegmentSets.add(requestedBuffer.poll());
+                    }
+                    recycleRef.set(recycle);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(recycleRef).isNotNull();
+        assertThat(memorySegmentSets).hasSize(2);
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        recycleRef.get().recycle(memorySegmentSets.iterator().next());
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    /** Test all not used buffers will be released after run method finish. */
+    @Test
+    void testRunReleaseAllBuffers() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);
+        factory.setPrepareForSchedulingRunnable(() -> 
prepareForSchedulingFinished.set(true));
+        factory.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(prepareForSchedulingFinished).isTrue();
+                    assertThat(buffers).hasSize(2);
+                    // poll one buffer, return another buffer to scheduler.
+                    buffers.poll();
+                    assertThat(bufferPool.getAvailableBuffers()).isEqualTo(0);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        // not used buffer should be recycled.
+        assertThat(bufferPool.getAvailableBuffers()).isEqualTo(1);
+    }
+
+    /** Test scheduler will schedule readers in order. */
+    @Test
+    void testScheduleReadersOrdered() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory1 =
+                new TestingHsSubpartitionFileReader.Factory();
+        TestingHsSubpartitionFileReader.Factory factory2 =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean readBuffersFinished1 = new AtomicBoolean(false);
+        AtomicBoolean readBuffersFinished2 = new AtomicBoolean(false);
+        factory1.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(readBuffersFinished2).isFalse();
+                    readBuffersFinished1.set(true);
+                });
+        factory2.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(readBuffersFinished1).isTrue();
+                    readBuffersFinished2.set(true);
+                });
+
+        factory1.setPrioritySupplier(() -> 1);
+        factory2.setPrioritySupplier(() -> 2);
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory1);
+        readScheduler.registerNewSubpartition(1, subpartitionViewOperation, 
factory2);
+
+        // trigger run.
+        ioExecutor.trigger();
+
+        assertThat(readBuffersFinished2).isTrue();
+    }
+
+    @Test
+    void testRunRequestBufferTimeout() throws Exception {
+        Duration bufferRequestTimeout = Duration.ofSeconds(3);
+
+        // request all buffer first.
+        bufferPool.requestBuffers();
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(),
+                                NUM_SUBPARTITIONS,
+                                bufferRequestTimeout));
+
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);
+        AtomicReference<Throwable> cause = new AtomicReference<>();
+        factory.setPrepareForSchedulingRunnable(() -> 
prepareForSchedulingFinished.set(true));
+        factory.setFailConsumer((cause::set));
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(prepareForSchedulingFinished).isTrue();
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        .isInstanceOf(TimeoutException.class)
+                                        .hasMessageContaining("Buffer request 
timeout"));
+    }
+
+    /**
+     * When {@link SubpartitionFileReader#readBuffers(Queue, BufferRecycler)} 
throw IOException,
+     * subpartition reader should fail.
+     */
+    @Test
+    void testRunReadBuffersThrowException() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicReference<Throwable> cause = new AtomicReference<>();
+        factory.setFailConsumer((cause::set));
+        factory.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    throw new IOException("expected exception.");
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        .isInstanceOf(IOException.class)
+                                        .hasMessageContaining("expected 
exception."));
+    }
+
+    // ----------------------- test release 
---------------------------------------
+
+    /** Test scheduler release when reader is reading buffers. */
+    @Test
+    void testReleasedWhenReading() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        AtomicReference<Throwable> cause = new AtomicReference<>(null);
+        factory.setFailConsumer((cause::set));
+        factory.setReadBuffersConsumer((buffers, recycle) -> 
readScheduler.release());

Review Comment:
   Depending on the internal implementation, this may cause a deadlock.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    @Test
+    void testBufferReleasedTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        Set<MemorySegment> memorySegmentSets = new HashSet<>();
+        AtomicReference<BufferRecycler> recycleRef = new 
AtomicReference<>(null);
+        factory.setReadBuffersConsumer(
+                (requestedBuffer, recycle) -> {
+                    while (!requestedBuffer.isEmpty()) {
+                        memorySegmentSets.add(requestedBuffer.poll());
+                    }
+                    recycleRef.set(recycle);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(recycleRef).isNotNull();
+        assertThat(memorySegmentSets).hasSize(2);
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        recycleRef.get().recycle(memorySegmentSets.iterator().next());
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    /** Test all not used buffers will be released after run method finish. */
+    @Test
+    void testRunReleaseAllBuffers() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);

Review Comment:
   `CompletableFuture` is preferred.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Deque;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This component is responsible for reading data from disk for a specific 
subpartition.
+ *
+ * <p>In order to access the disk as sequentially as possible {@link 
HsSubpartitionFileReader} need
+ * to be able to compare priorities.
+ *
+ * <p>Note: This class is not thread safe.
+ */
+public class HsSubpartitionFileReader implements SubpartitionFileReader {
+
+    private final ByteBuffer headerBuf = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+
+    private final int subpartitionId;
+
+    private final FileChannel dataFileChannel;
+
+    private final HsSubpartitionViewInternalOperations operations;
+
+    private final CachedRegionManager cachedRegionManager;
+
+    private final BufferIndexManager bufferIndexManager;
+
+    private final Deque<BufferIndexOrError> loadedBuffers = new 
LinkedBlockingDeque<>();
+
+    private volatile boolean isFailed;
+
+    public HsSubpartitionFileReader(
+            int subpartitionId,
+            FileChannel dataFileChannel,
+            HsSubpartitionViewInternalOperations operations,
+            HsFileDataIndex dataIndex,
+            int maxBufferReadAhead) {
+        this.subpartitionId = subpartitionId;
+        this.dataFileChannel = dataFileChannel;
+        this.operations = operations;
+        this.bufferIndexManager = new BufferIndexManager(maxBufferReadAhead);
+        this.cachedRegionManager = new CachedRegionManager(subpartitionId, 
dataIndex);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        HsSubpartitionFileReader that = (HsSubpartitionFileReader) o;
+        return subpartitionId == that.subpartitionId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(subpartitionId);
+    }
+
+    /**
+     * Read subpartition data into buffers.
+     *
+     * <p>This transfers the ownership of used buffers to this class. It's 
this class'
+     * responsibility to release the buffers using the recycler when no longer 
needed.
+     *
+     * <p>Calling this method does not always use up all the provided buffers. 
It's this class'
+     * decision when to stop reading. Currently, it stops reading when: 1) 
buffers are used up, or
+     * 2) reaches the end of the subpartition data within the region, or 3) 
enough data have been
+     * read ahead the downstream consuming offset.
+     */
+    @Override
+    public synchronized void readBuffers(Queue<MemorySegment> buffers, 
BufferRecycler recycler)
+            throws IOException {
+        if (isFailed) {
+            throw new IOException("subpartition reader has already failed.");
+        }
+        int firstBufferToLoad = bufferIndexManager.getNextToLoad();
+        if (firstBufferToLoad < 0) {
+            return;
+        }
+
+        // If lookup result is empty, it means that one the following things 
have happened:
+        // 1) The target buffer has not been spilled into disk.
+        // 2) The target buffer has not been released from memory.
+        // So, just skip this round reading.
+        int numRemainingBuffer = 
cachedRegionManager.getRemainingBuffersInRegion(firstBufferToLoad);
+        if (numRemainingBuffer == 0) {
+            return;
+        }
+        moveFileOffsetToBuffer(firstBufferToLoad);
+
+        int indexToLoad;
+        int numLoaded = 0;
+        while (!buffers.isEmpty()
+                && (indexToLoad = bufferIndexManager.getNextToLoad()) >= 0
+                && numRemainingBuffer-- > 0) {
+            MemorySegment segment = buffers.poll();
+            Buffer buffer;
+            try {
+                if ((buffer = readFromByteChannel(dataFileChannel, headerBuf, 
segment, recycler))
+                        == null) {
+                    buffers.add(segment);
+                    break;
+                }
+            } catch (Throwable throwable) {
+                buffers.add(segment);
+                throw throwable;
+            }
+
+            loadedBuffers.add(BufferIndexOrError.newBuffer(buffer, 
indexToLoad));
+            bufferIndexManager.updateLastLoaded(indexToLoad);
+            cachedRegionManager.advance(
+                    buffer.readableBytes() + 
BufferReaderWriterUtil.HEADER_LENGTH);
+            ++numLoaded;
+        }
+
+        if (loadedBuffers.size() <= numLoaded) {
+            operations.notifyDataAvailableFromDisk();
+        }
+    }
+
+    @Override
+    public synchronized void fail(Throwable failureCause) {
+        if (isFailed) {
+            return;
+        }
+        isFailed = true;
+        BufferIndexOrError bufferIndexOrError;
+        // empty from tail, in-case subpartition view consumes concurrently 
and gets the wrong order
+        while ((bufferIndexOrError = loadedBuffers.pollLast()) != null) {
+            if (bufferIndexOrError.getBuffer().isPresent()) {
+                checkNotNull(bufferIndexOrError.buffer).recycleBuffer();
+            }
+        }
+
+        loadedBuffers.add(BufferIndexOrError.newError(failureCause));
+        operations.notifyDataAvailableFromDisk();
+    }
+
+    @Override
+    public void prepareForScheduling() {
+        updateConsumptionProgress();
+    }
+
+    public Deque<BufferIndexOrError> getLoadedBuffers() {
+        return loadedBuffers;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    /** Refresh downstream consumption progress for another round scheduling 
of reading. */
+    private void updateConsumptionProgress() {
+        bufferIndexManager.updateLastConsumed(operations.getConsumingOffset());
+    }
+
+    private void moveFileOffsetToBuffer(int bufferIndex) throws IOException {
+        Tuple2<Integer, Long> indexAndOffset =
+                cachedRegionManager.getNumSkipAndFileOffset(bufferIndex);
+        dataFileChannel.position(indexAndOffset.f1);
+        for (int i = 0; i < indexAndOffset.f0; ++i) {
+            positionToNextBuffer(dataFileChannel, headerBuf);
+        }
+        cachedRegionManager.skipAll(dataFileChannel.position());
+    }
+
+    /** Returns Long.MAX_VALUE if it shouldn't load. */
+    private long getNextOffsetToLoad() {
+        int bufferIndex = bufferIndexManager.getNextToLoad();
+        if (bufferIndex < 0) {
+            return Long.MAX_VALUE;
+        } else {
+            return cachedRegionManager.getFileOffset(bufferIndex);
+        }
+    }
+
+    /** Provides priority calculation logic for io scheduler. */
+    @Override
+    public int compareTo(SubpartitionFileReader that) {

Review Comment:
   This is not an internal method.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);

Review Comment:
   We can check whether the reader starts read.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * IO scheduler for HsResultPartition, which schedules {@link 
HsSubpartitionFileReader} for loading
+ * data w.r.t. their offset in the file.
+ */
+@ThreadSafe
+public class HsResultPartitionReadScheduler implements Runnable, 
BufferRecycler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HsResultPartitionReadScheduler.class);
+
+    /** Executor to run the shuffle data reading task. */
+    private final Executor ioExecutor;
+
+    /** Maximum number of buffers can be allocated by this partition reader. */
+    private final int maxRequestedBuffers;
+
+    /**
+     * Maximum time to wait when requesting read buffers from the buffer pool 
before throwing an
+     * exception.
+     */
+    private final Duration bufferRequestTimeout;
+
+    /** Lock used to synchronize multi-thread access to thread-unsafe fields. 
*/
+    private final Object lock = new Object();
+
+    /**
+     * A {@link CompletableFuture} to be completed when this read scheduler 
including all resources
+     * is released.
+     */
+    @GuardedBy("lock")
+    private final CompletableFuture<?> releaseFuture = new 
CompletableFuture<>();
+
+    /** Buffer pool from which to allocate buffers for shuffle data reading. */
+    private final BatchShuffleReadBufferPool bufferPool;
+
+    private final Path dataFilePath;
+
+    private final HsFileDataIndex dataIndex;
+
+    private final HybridShuffleConfiguration hybridShuffleConfiguration;
+
+    /** All failed subpartition readers to be released. */
+    @GuardedBy("lock")
+    private final Set<SubpartitionFileReader> failedReaders = new HashSet<>();
+
+    /** All readers waiting to read data of different subpartitions. */
+    @GuardedBy("lock")
+    private final Set<SubpartitionFileReader> allReaders = new HashSet<>();
+
+    /**
+     * Whether the data reading task is currently running or not. This flag is 
used when trying to
+     * submit the data reading task.
+     */
+    @GuardedBy("lock")
+    private boolean isRunning;
+
+    /** Number of buffers already allocated and still not recycled by this 
partition reader. */
+    @GuardedBy("lock")
+    private volatile int numRequestedBuffers;
+
+    /** Whether this read scheduler has been released or not. */
+    @GuardedBy("lock")
+    private volatile boolean isReleased;
+
+    @GuardedBy("lock")
+    private FileChannel dataFileChannel;
+
+    public HsResultPartitionReadScheduler(
+            BatchShuffleReadBufferPool bufferPool,
+            Executor ioExecutor,
+            HsFileDataIndex dataIndex,
+            Path dataFilePath,
+            HybridShuffleConfiguration hybridShuffleConfiguration) {
+        this.hybridShuffleConfiguration = 
checkNotNull(hybridShuffleConfiguration);
+        this.dataIndex = checkNotNull(dataIndex);
+        this.dataFilePath = checkNotNull(dataFilePath);
+        this.bufferPool = checkNotNull(bufferPool);
+        this.ioExecutor = checkNotNull(ioExecutor);
+        this.maxRequestedBuffers = 
hybridShuffleConfiguration.getMaxRequestedBuffers();
+        this.bufferRequestTimeout =
+                
checkNotNull(hybridShuffleConfiguration.getBufferRequestTimeout());
+    }
+
+    @Override
+    // Note, this method is synchronized on `this`, not `lock`. The purpose 
here is to prevent
+    // concurrent `run()` executions. Concurrent calls to other methods are 
allowed.
+    public synchronized void run() {
+        int numBuffersRead = tryRead();
+        endCurrentRoundOfReading(numBuffersRead);
+    }
+
+    /** This method only called by result partition to create 
subpartitionFileReader. */
+    public SubpartitionFileReader registerNewSubpartition(
+            int subpartitionId,
+            HsSubpartitionViewInternalOperations operation,
+            SubpartitionFileReader.Factory fileReaderFactory)

Review Comment:
   The factory should be decided when creating the scheduler.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    @Test
+    void testBufferReleasedTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        Set<MemorySegment> memorySegmentSets = new HashSet<>();
+        AtomicReference<BufferRecycler> recycleRef = new 
AtomicReference<>(null);
+        factory.setReadBuffersConsumer(
+                (requestedBuffer, recycle) -> {
+                    while (!requestedBuffer.isEmpty()) {
+                        memorySegmentSets.add(requestedBuffer.poll());
+                    }
+                    recycleRef.set(recycle);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(recycleRef).isNotNull();
+        assertThat(memorySegmentSets).hasSize(2);
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        recycleRef.get().recycle(memorySegmentSets.iterator().next());
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);

Review Comment:
   We can check how many buffers the reader has read.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    @Test
+    void testBufferReleasedTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        Set<MemorySegment> memorySegmentSets = new HashSet<>();
+        AtomicReference<BufferRecycler> recycleRef = new 
AtomicReference<>(null);
+        factory.setReadBuffersConsumer(
+                (requestedBuffer, recycle) -> {
+                    while (!requestedBuffer.isEmpty()) {
+                        memorySegmentSets.add(requestedBuffer.poll());
+                    }
+                    recycleRef.set(recycle);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(recycleRef).isNotNull();
+        assertThat(memorySegmentSets).hasSize(2);
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        recycleRef.get().recycle(memorySegmentSets.iterator().next());
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    /** Test all not used buffers will be released after run method finish. */
+    @Test
+    void testRunReleaseAllBuffers() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);
+        factory.setPrepareForSchedulingRunnable(() -> 
prepareForSchedulingFinished.set(true));
+        factory.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(prepareForSchedulingFinished).isTrue();
+                    assertThat(buffers).hasSize(2);

Review Comment:
   It would be better to have a constant `BUFFER_POOL_SIZE`, and use it like 
`hasSize(BUFFER_POOL_SIZE)` in the test cases.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    @Test
+    void testBufferReleasedTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        Set<MemorySegment> memorySegmentSets = new HashSet<>();
+        AtomicReference<BufferRecycler> recycleRef = new 
AtomicReference<>(null);

Review Comment:
   `CompletableFuture<BufferRecycler>` is probably better.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    @Test
+    void testBufferReleasedTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        Set<MemorySegment> memorySegmentSets = new HashSet<>();
+        AtomicReference<BufferRecycler> recycleRef = new 
AtomicReference<>(null);
+        factory.setReadBuffersConsumer(
+                (requestedBuffer, recycle) -> {
+                    while (!requestedBuffer.isEmpty()) {
+                        memorySegmentSets.add(requestedBuffer.poll());
+                    }
+                    recycleRef.set(recycle);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(recycleRef).isNotNull();
+        assertThat(memorySegmentSets).hasSize(2);
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        recycleRef.get().recycle(memorySegmentSets.iterator().next());
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    /** Test all not used buffers will be released after run method finish. */
+    @Test
+    void testRunReleaseAllBuffers() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);
+        factory.setPrepareForSchedulingRunnable(() -> 
prepareForSchedulingFinished.set(true));
+        factory.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(prepareForSchedulingFinished).isTrue();
+                    assertThat(buffers).hasSize(2);
+                    // poll one buffer, return another buffer to scheduler.
+                    buffers.poll();
+                    assertThat(bufferPool.getAvailableBuffers()).isEqualTo(0);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        // not used buffer should be recycled.
+        assertThat(bufferPool.getAvailableBuffers()).isEqualTo(1);
+    }
+
+    /** Test scheduler will schedule readers in order. */
+    @Test
+    void testScheduleReadersOrdered() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory1 =
+                new TestingHsSubpartitionFileReader.Factory();
+        TestingHsSubpartitionFileReader.Factory factory2 =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean readBuffersFinished1 = new AtomicBoolean(false);
+        AtomicBoolean readBuffersFinished2 = new AtomicBoolean(false);
+        factory1.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(readBuffersFinished2).isFalse();
+                    readBuffersFinished1.set(true);
+                });
+        factory2.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(readBuffersFinished1).isTrue();
+                    readBuffersFinished2.set(true);
+                });
+
+        factory1.setPrioritySupplier(() -> 1);
+        factory2.setPrioritySupplier(() -> 2);
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory1);
+        readScheduler.registerNewSubpartition(1, subpartitionViewOperation, 
factory2);
+
+        // trigger run.
+        ioExecutor.trigger();
+
+        assertThat(readBuffersFinished2).isTrue();
+    }
+
+    @Test
+    void testRunRequestBufferTimeout() throws Exception {
+        Duration bufferRequestTimeout = Duration.ofSeconds(3);
+
+        // request all buffer first.
+        bufferPool.requestBuffers();
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(),
+                                NUM_SUBPARTITIONS,
+                                bufferRequestTimeout));
+
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);
+        AtomicReference<Throwable> cause = new AtomicReference<>();
+        factory.setPrepareForSchedulingRunnable(() -> 
prepareForSchedulingFinished.set(true));
+        factory.setFailConsumer((cause::set));
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(prepareForSchedulingFinished).isTrue();
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        .isInstanceOf(TimeoutException.class)
+                                        .hasMessageContaining("Buffer request 
timeout"));
+    }
+
+    /**
+     * When {@link SubpartitionFileReader#readBuffers(Queue, BufferRecycler)} 
throw IOException,
+     * subpartition reader should fail.
+     */
+    @Test
+    void testRunReadBuffersThrowException() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicReference<Throwable> cause = new AtomicReference<>();
+        factory.setFailConsumer((cause::set));
+        factory.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    throw new IOException("expected exception.");
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        .isInstanceOf(IOException.class)
+                                        .hasMessageContaining("expected 
exception."));
+    }
+
+    // ----------------------- test release 
---------------------------------------
+
+    /** Test scheduler release when reader is reading buffers. */
+    @Test
+    void testReleasedWhenReading() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        AtomicReference<Throwable> cause = new AtomicReference<>(null);
+        factory.setFailConsumer((cause::set));
+        factory.setReadBuffersConsumer((buffers, recycle) -> 
readScheduler.release());
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        
.isInstanceOf(IllegalStateException.class)
+                                        .hasMessageContaining(
+                                                "Result partition has been 
already released."));
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+    }
+
+    /** Test scheduler was released, but receive new subpartition reader 
registration. */
+    @Test
+    void testRegisterSubpartitionReaderAfterSchedulerReleased() {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        readScheduler.release();
+        assertThatThrownBy(
+                        () -> {
+                            readScheduler.registerNewSubpartition(
+                                    0, subpartitionViewOperation, factory);
+                            ioExecutor.trigger();
+                        })
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("HsResultPartitionReadScheduler is 
already released.");
+    }
+
+    private static FileChannel openFileChannel(Path path) throws IOException {
+        return FileChannel.open(path, StandardOpenOption.READ);
+    }
+
+    private static class TestingHsSubpartitionFileReader implements 
SubpartitionFileReader {
+        private final Runnable prepareForSchedulingRunnable;
+
+        private final BiConsumerWithException<Queue<MemorySegment>, 
BufferRecycler, IOException>
+                readBuffersConsumer;
+
+        private final Consumer<Throwable> failConsumer;
+
+        private final Supplier<Integer> prioritySupplier;
+
+        public TestingHsSubpartitionFileReader(
+                Runnable prepareForSchedulingRunnable,
+                BiConsumerWithException<Queue<MemorySegment>, BufferRecycler, 
IOException>
+                        readBuffersConsumer,
+                Consumer<Throwable> failConsumer,
+                Supplier<Integer> prioritySupplier) {
+            this.prepareForSchedulingRunnable = prepareForSchedulingRunnable;
+            this.readBuffersConsumer = readBuffersConsumer;
+            this.failConsumer = failConsumer;
+            this.prioritySupplier = prioritySupplier;
+        }
+
+        @Override
+        public void prepareForScheduling() {
+            prepareForSchedulingRunnable.run();
+        }
+
+        @Override
+        public void readBuffers(Queue<MemorySegment> buffers, BufferRecycler 
recycler)
+                throws IOException {
+            readBuffersConsumer.accept(buffers, recycler);
+        }
+
+        @Override
+        public void fail(Throwable failureCause) {
+            failConsumer.accept(failureCause);
+        }
+
+        @Override
+        public int compareTo(SubpartitionFileReader that) {
+            checkArgument(that instanceof TestingHsSubpartitionFileReader);
+            return Integer.compare(
+                    prioritySupplier.get(),
+                    ((TestingHsSubpartitionFileReader) 
that).prioritySupplier.get());
+        }
+
+        public static class Factory implements SubpartitionFileReader.Factory {

Review Comment:
   It's probably easier for this factory to just poll from a queue of readers 
on each `createFileReader`. E.g., we can construct 
`TestingHsSubpartitionFileReader` with a priority that is used for comparing 
w/o a priority supplier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to