zhuzhurk commented on a change in pull request #15199:
URL: https://github.com/apache/flink/pull/15199#discussion_r597382774



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -513,6 +513,30 @@
                                     + " the configured min/max size, the 
min/max size will be used. The exact size of Network Memory can be"
                                     + " explicitly specified by setting the 
min/max size to the same value.");
 
+    /**
+     * Memory used by blocking shuffle for shuffle data read (currently only 
used by sort-merge
+     * shuffle). The minimum valid value can be configured is 32M.

Review comment:
       Not sure why we must require it to be at least 32m? Even if for 
FRAMEWORK_OFF_HEAP_MEMORY it is not required to be larger than 32m.
   Or maybe the smallest allowed value should be 
BatchShuffleReadBufferPool.NUM_BYTES_PER_REQUEST which is 8m?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadIOExecutor.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.Hardware;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link Executor} pool used by batch shuffle for shuffle data 
read (currently only
+ * used by sort-merge blocking shuffle.
+ */
+@Internal
+public class BatchShuffleReadIOExecutor implements Executor {
+
+    /** Minimum valid number of executor threads. */
+    public static final int MIN_NUM_THREADS = 4;

Review comment:
       is it necessary to add a lower-bound limit to the threads?
   IIUC, this further results in a 32m requirement of the shuffle read memory 
size. maybe 32m as the default values of NETWORK_BATCH_SHUFFLE_READ_MEMORY is 
enough for out of box experience?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {
+        checkArgument(totalBytes > 0, "Total memory size must be positive.");
+        checkArgument(bufferSize > 0, "Size of buffer must be positive.");
+        checkArgument(requestTimeout > 0, "Request timeout must be positive.");
+
+        this.totalBytes = totalBytes;
+        this.bufferSize = bufferSize;
+        this.requestTimeout = requestTimeout;
+
+        this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, 
Integer.MAX_VALUE);
+        this.numBuffersPerRequest =
+                Math.min(numTotalBuffers, Math.max(1, NUM_BYTES_PER_REQUEST / 
bufferSize));
+        this.maxBuffersPerRequester =
+                Math.max(
+                        4 * numBuffersPerRequest,
+                        numTotalBuffers / 
BatchShuffleReadIOExecutor.MAX_NUM_THREADS);
+    }
+
+    public long getTotalBytes() {

Review comment:
       I can see that most these getters only serves for testing purpose or 
even not used. Let's we drop the unused ones and mark the ones for testing with 
`@VisibleForTesting`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();

Review comment:
       I think `IdentityHashMap` is better in this case.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {
+        checkArgument(totalBytes > 0, "Total memory size must be positive.");
+        checkArgument(bufferSize > 0, "Size of buffer must be positive.");
+        checkArgument(requestTimeout > 0, "Request timeout must be positive.");
+
+        this.totalBytes = totalBytes;
+        this.bufferSize = bufferSize;
+        this.requestTimeout = requestTimeout;
+
+        this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, 
Integer.MAX_VALUE);
+        this.numBuffersPerRequest =
+                Math.min(numTotalBuffers, Math.max(1, NUM_BYTES_PER_REQUEST / 
bufferSize));
+        this.maxBuffersPerRequester =
+                Math.max(
+                        4 * numBuffersPerRequest,
+                        numTotalBuffers / 
BatchShuffleReadIOExecutor.MAX_NUM_THREADS);
+    }
+
+    public long getTotalBytes() {
+        return totalBytes;
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public int getNumBuffersPerRequest() {
+        return numBuffersPerRequest;
+    }
+
+    public int getMaxBuffersPerRequester() {
+        return maxBuffersPerRequester;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public int getNumTotalBuffers() {
+        return numTotalBuffers;
+    }
+
+    public int getAvailableBuffers() {
+        synchronized (buffers) {
+            return buffers.size();
+        }
+    }
+
+    /** Initializes this buffer pool which allocates all the buffers. */
+    public void initialize() {
+        LOG.info(
+                "Initializing batch shuffle IO buffer pool: numBuffers={}, 
bufferSize={}.",
+                numTotalBuffers,
+                bufferSize);
+
+        checkState(
+                totalBytes >= bufferSize,
+                String.format(
+                        "Illegal configuration, config value for %s must be no 
smaller than %s,"
+                                + " please increase %s to at least %s bytes.",
+                        
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(),
+                        TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),

Review comment:
       I guess `TaskManagerOptions.MEMORY_SEGMENT_SIZE.key()` should be the 
last but one param of this `String.format`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {
+        checkArgument(totalBytes > 0, "Total memory size must be positive.");
+        checkArgument(bufferSize > 0, "Size of buffer must be positive.");
+        checkArgument(requestTimeout > 0, "Request timeout must be positive.");
+
+        this.totalBytes = totalBytes;
+        this.bufferSize = bufferSize;
+        this.requestTimeout = requestTimeout;
+
+        this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, 
Integer.MAX_VALUE);
+        this.numBuffersPerRequest =
+                Math.min(numTotalBuffers, Math.max(1, NUM_BYTES_PER_REQUEST / 
bufferSize));
+        this.maxBuffersPerRequester =
+                Math.max(
+                        4 * numBuffersPerRequest,
+                        numTotalBuffers / 
BatchShuffleReadIOExecutor.MAX_NUM_THREADS);
+    }
+
+    public long getTotalBytes() {
+        return totalBytes;
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public int getNumBuffersPerRequest() {
+        return numBuffersPerRequest;
+    }
+
+    public int getMaxBuffersPerRequester() {
+        return maxBuffersPerRequester;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public int getNumTotalBuffers() {
+        return numTotalBuffers;
+    }
+
+    public int getAvailableBuffers() {
+        synchronized (buffers) {
+            return buffers.size();
+        }
+    }
+
+    /** Initializes this buffer pool which allocates all the buffers. */
+    public void initialize() {
+        LOG.info(
+                "Initializing batch shuffle IO buffer pool: numBuffers={}, 
bufferSize={}.",
+                numTotalBuffers,
+                bufferSize);
+
+        checkState(
+                totalBytes >= bufferSize,
+                String.format(
+                        "Illegal configuration, config value for %s must be no 
smaller than %s,"
+                                + " please increase %s to at least %s bytes.",
+                        
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(),
+                        TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
+                        
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(),
+                        bufferSize));
+
+        synchronized (buffers) {
+            checkState(!destroyed, "Buffer pool is already destroyed.");
+
+            if (initialized) {
+                return;
+            }
+            initialized = true;
+
+            try {
+                for (int i = 0; i < numTotalBuffers; ++i) {
+                    
buffers.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize));
+                }
+            } catch (OutOfMemoryError outOfMemoryError) {
+                int allocated = buffers.size();
+                buffers.clear();

Review comment:
       I'm not quite sure here, is `buffer.clear()` enough or do we need to 
explicit return the buffers?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal

Review comment:
       I think the `@Internal` annotation is not needed because `flink-runtime` 
is not an api module.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
##########
@@ -119,13 +121,34 @@ static NettyShuffleEnvironment 
createNettyShuffleEnvironment(
                         config.networkBufferSize(),
                         config.getRequestSegmentsTimeout());
 
+        // we create a separated buffer pool here for batch shuffle instead of 
reusing the network
+        // buffer pool directly to avoid potential side effects of memory 
contention, for example,
+        // dead lock or "insufficient network buffer" error
+        BatchShuffleReadBufferPool batchShuffleReadBufferPool =
+                new BatchShuffleReadBufferPool(
+                        config.batchShuffleReadMemoryBytes(), 
config.networkBufferSize());
+
+        // we create a separated IO executor pool here for batch shuffle 
instead of reusing the
+        // TaskManager IO executor pool directly to avoid the potential side 
effects of execution
+        // contention, for example, too long IO or waiting time leading to 
starvation or timeout
+        int numThreads =
+                batchShuffleReadBufferPool.getNumTotalBuffers()
+                        / Math.max(1, 
batchShuffleReadBufferPool.getNumBuffersPerRequest());

Review comment:
       the `Math.max` is not needed I guess. 
   and maybe we can have a method 
`batchShuffleReadBufferPool.getMaxAcceptableConcurrentRequests()` and direct 
get it as `numThreads`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {

Review comment:
       In my understanding, this method is needed for testing cases that 
`requestTimeout` can happen. However, I did not find any test for the 
`requestTimeout` except for illegal argument check. But if we do not expose 
this method, there is no chance to have an illegal `requestTimeout` param.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {
+        checkArgument(totalBytes > 0, "Total memory size must be positive.");
+        checkArgument(bufferSize > 0, "Size of buffer must be positive.");
+        checkArgument(requestTimeout > 0, "Request timeout must be positive.");
+
+        this.totalBytes = totalBytes;
+        this.bufferSize = bufferSize;
+        this.requestTimeout = requestTimeout;
+
+        this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, 
Integer.MAX_VALUE);

Review comment:
       is it possible to make `numTotalBuffers` a long type so we do not need 
such a `Math.min`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {
+        checkArgument(totalBytes > 0, "Total memory size must be positive.");
+        checkArgument(bufferSize > 0, "Size of buffer must be positive.");
+        checkArgument(requestTimeout > 0, "Request timeout must be positive.");
+
+        this.totalBytes = totalBytes;
+        this.bufferSize = bufferSize;
+        this.requestTimeout = requestTimeout;
+
+        this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, 
Integer.MAX_VALUE);
+        this.numBuffersPerRequest =
+                Math.min(numTotalBuffers, Math.max(1, NUM_BYTES_PER_REQUEST / 
bufferSize));
+        this.maxBuffersPerRequester =
+                Math.max(
+                        4 * numBuffersPerRequest,
+                        numTotalBuffers / 
BatchShuffleReadIOExecutor.MAX_NUM_THREADS);
+    }
+
+    public long getTotalBytes() {
+        return totalBytes;
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public int getNumBuffersPerRequest() {
+        return numBuffersPerRequest;
+    }
+
+    public int getMaxBuffersPerRequester() {
+        return maxBuffersPerRequester;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public int getNumTotalBuffers() {
+        return numTotalBuffers;
+    }
+
+    public int getAvailableBuffers() {
+        synchronized (buffers) {
+            return buffers.size();
+        }
+    }
+
+    /** Initializes this buffer pool which allocates all the buffers. */
+    public void initialize() {
+        LOG.info(
+                "Initializing batch shuffle IO buffer pool: numBuffers={}, 
bufferSize={}.",
+                numTotalBuffers,
+                bufferSize);
+
+        checkState(
+                totalBytes >= bufferSize,

Review comment:
       this is inconsistent with the logic `this.numTotalBuffers = (int) 
Math.min(totalBytes / bufferSize, Integer.MAX_VALUE)` in the the constructor. 
Maybe move this check to the constructor and simplify the calculation of 
`numTotalBuffers`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadIOExecutor.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.Hardware;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link Executor} pool used by batch shuffle for shuffle data 
read (currently only
+ * used by sort-merge blocking shuffle.
+ */
+@Internal
+public class BatchShuffleReadIOExecutor implements Executor {

Review comment:
       I'm a bit wondering whether we need to implement such an executor, 
instead of directly the executor:
   `executors = Executors.newFixedThreadPool(numThreads, threadFactory)`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {
+        checkArgument(totalBytes > 0, "Total memory size must be positive.");
+        checkArgument(bufferSize > 0, "Size of buffer must be positive.");
+        checkArgument(requestTimeout > 0, "Request timeout must be positive.");
+
+        this.totalBytes = totalBytes;
+        this.bufferSize = bufferSize;
+        this.requestTimeout = requestTimeout;
+
+        this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, 
Integer.MAX_VALUE);
+        this.numBuffersPerRequest =
+                Math.min(numTotalBuffers, Math.max(1, NUM_BYTES_PER_REQUEST / 
bufferSize));
+        this.maxBuffersPerRequester =
+                Math.max(
+                        4 * numBuffersPerRequest,
+                        numTotalBuffers / 
BatchShuffleReadIOExecutor.MAX_NUM_THREADS);
+    }
+
+    public long getTotalBytes() {
+        return totalBytes;
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public int getNumBuffersPerRequest() {
+        return numBuffersPerRequest;
+    }
+
+    public int getMaxBuffersPerRequester() {
+        return maxBuffersPerRequester;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public int getNumTotalBuffers() {
+        return numTotalBuffers;
+    }
+
+    public int getAvailableBuffers() {
+        synchronized (buffers) {
+            return buffers.size();
+        }
+    }
+
+    /** Initializes this buffer pool which allocates all the buffers. */
+    public void initialize() {
+        LOG.info(
+                "Initializing batch shuffle IO buffer pool: numBuffers={}, 
bufferSize={}.",
+                numTotalBuffers,
+                bufferSize);
+
+        checkState(
+                totalBytes >= bufferSize,
+                String.format(
+                        "Illegal configuration, config value for %s must be no 
smaller than %s,"
+                                + " please increase %s to at least %s bytes.",
+                        
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(),
+                        TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
+                        
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(),
+                        bufferSize));
+
+        synchronized (buffers) {
+            checkState(!destroyed, "Buffer pool is already destroyed.");
+
+            if (initialized) {
+                return;
+            }
+            initialized = true;
+
+            try {
+                for (int i = 0; i < numTotalBuffers; ++i) {
+                    
buffers.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize));
+                }
+            } catch (OutOfMemoryError outOfMemoryError) {
+                int allocated = buffers.size();
+                buffers.clear();
+                throw new OutOfMemoryError(
+                        String.format(
+                                "Can't allocate enough direct buffer for batch 
shuffle read buffer "
+                                        + "pool (bytes allocated: %d, bytes 
still needed: %d). To "
+                                        + "avoid the exception, you need to do 
one of the following"
+                                        + " adjustments: 1) If you have ever 
decreased %s, you need"
+                                        + " to undo the decrement; 2) If you 
ever increased %s, you"
+                                        + " should also increase %s; 3) If 
neither the above cases,"
+                                        + " it usually means some other parts 
of your application "
+                                        + "have consumed too many direct 
memory and the value of %s"
+                                        + " should be increased.",
+                                allocated * bufferSize,
+                                (numTotalBuffers - allocated) * bufferSize,
+                                
TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key(),
+                                
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(),
+                                
TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key(),
+                                
TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key()));
+            }
+        }
+    }
+
+    /**
+     * Requests a collection of buffers (determined by {@link 
#numBuffersPerRequest}) from this
+     * buffer pool. Exception will be thrown if no enough buffers can be 
allocated in the given
+     * timeout and the corresponding requester is not holding any allocated 
buffers currently.
+     */
+    public List<MemorySegment> requestBuffers(Object owner) throws Exception {
+        checkArgument(owner != null, "Owner must be not null.");
+
+        List<MemorySegment> allocated = new ArrayList<>(numBuffersPerRequest);
+        synchronized (buffers) {
+            checkState(!destroyed, "Buffer pool is already destroyed.");
+
+            if (!initialized) {
+                initialize();
+            }
+
+            long startTime = System.currentTimeMillis();
+            Counter counter = numBuffersAllocated.get(owner);
+            while (buffers.size() < numBuffersPerRequest
+                    || (counter != null
+                            && counter.get() + numBuffersPerRequest > 
maxBuffersPerRequester)) {
+                checkState(!destroyed, "Buffer pool is already destroyed.");
+
+                buffers.wait(requestTimeout);

Review comment:
       seems `buffers` is not notified when `numBuffersAllocated` is updated.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {
+        checkArgument(totalBytes > 0, "Total memory size must be positive.");
+        checkArgument(bufferSize > 0, "Size of buffer must be positive.");
+        checkArgument(requestTimeout > 0, "Request timeout must be positive.");
+
+        this.totalBytes = totalBytes;
+        this.bufferSize = bufferSize;
+        this.requestTimeout = requestTimeout;
+
+        this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, 
Integer.MAX_VALUE);
+        this.numBuffersPerRequest =
+                Math.min(numTotalBuffers, Math.max(1, NUM_BYTES_PER_REQUEST / 
bufferSize));
+        this.maxBuffersPerRequester =
+                Math.max(
+                        4 * numBuffersPerRequest,

Review comment:
       why we do expect one requester to have at least 4 concurrent requests at 
the same time?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {
+        checkArgument(totalBytes > 0, "Total memory size must be positive.");
+        checkArgument(bufferSize > 0, "Size of buffer must be positive.");
+        checkArgument(requestTimeout > 0, "Request timeout must be positive.");
+
+        this.totalBytes = totalBytes;
+        this.bufferSize = bufferSize;
+        this.requestTimeout = requestTimeout;
+
+        this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, 
Integer.MAX_VALUE);
+        this.numBuffersPerRequest =
+                Math.min(numTotalBuffers, Math.max(1, NUM_BYTES_PER_REQUEST / 
bufferSize));
+        this.maxBuffersPerRequester =
+                Math.max(
+                        4 * numBuffersPerRequest,
+                        numTotalBuffers / 
BatchShuffleReadIOExecutor.MAX_NUM_THREADS);
+    }
+
+    public long getTotalBytes() {
+        return totalBytes;
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public int getNumBuffersPerRequest() {
+        return numBuffersPerRequest;
+    }
+
+    public int getMaxBuffersPerRequester() {
+        return maxBuffersPerRequester;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public int getNumTotalBuffers() {
+        return numTotalBuffers;
+    }
+
+    public int getAvailableBuffers() {
+        synchronized (buffers) {
+            return buffers.size();
+        }
+    }
+
+    /** Initializes this buffer pool which allocates all the buffers. */
+    public void initialize() {
+        LOG.info(
+                "Initializing batch shuffle IO buffer pool: numBuffers={}, 
bufferSize={}.",
+                numTotalBuffers,
+                bufferSize);
+
+        checkState(
+                totalBytes >= bufferSize,
+                String.format(
+                        "Illegal configuration, config value for %s must be no 
smaller than %s,"
+                                + " please increase %s to at least %s bytes.",
+                        
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(),
+                        TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
+                        
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(),
+                        bufferSize));
+
+        synchronized (buffers) {
+            checkState(!destroyed, "Buffer pool is already destroyed.");
+
+            if (initialized) {
+                return;
+            }
+            initialized = true;
+
+            try {
+                for (int i = 0; i < numTotalBuffers; ++i) {
+                    
buffers.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize));
+                }
+            } catch (OutOfMemoryError outOfMemoryError) {
+                int allocated = buffers.size();
+                buffers.clear();
+                throw new OutOfMemoryError(
+                        String.format(
+                                "Can't allocate enough direct buffer for batch 
shuffle read buffer "
+                                        + "pool (bytes allocated: %d, bytes 
still needed: %d). To "
+                                        + "avoid the exception, you need to do 
one of the following"
+                                        + " adjustments: 1) If you have ever 
decreased %s, you need"
+                                        + " to undo the decrement; 2) If you 
ever increased %s, you"
+                                        + " should also increase %s; 3) If 
neither the above cases,"
+                                        + " it usually means some other parts 
of your application "
+                                        + "have consumed too many direct 
memory and the value of %s"
+                                        + " should be increased.",
+                                allocated * bufferSize,
+                                (numTotalBuffers - allocated) * bufferSize,
+                                
TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key(),
+                                
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(),
+                                
TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key(),
+                                
TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key()));
+            }
+        }
+    }
+
+    /**
+     * Requests a collection of buffers (determined by {@link 
#numBuffersPerRequest}) from this
+     * buffer pool. Exception will be thrown if no enough buffers can be 
allocated in the given
+     * timeout and the corresponding requester is not holding any allocated 
buffers currently.
+     */
+    public List<MemorySegment> requestBuffers(Object owner) throws Exception {
+        checkArgument(owner != null, "Owner must be not null.");
+
+        List<MemorySegment> allocated = new ArrayList<>(numBuffersPerRequest);
+        synchronized (buffers) {
+            checkState(!destroyed, "Buffer pool is already destroyed.");
+
+            if (!initialized) {
+                initialize();
+            }
+
+            long startTime = System.currentTimeMillis();
+            Counter counter = numBuffersAllocated.get(owner);
+            while (buffers.size() < numBuffersPerRequest
+                    || (counter != null
+                            && counter.get() + numBuffersPerRequest > 
maxBuffersPerRequester)) {
+                checkState(!destroyed, "Buffer pool is already destroyed.");
+
+                buffers.wait(requestTimeout);
+                counter = numBuffersAllocated.get(owner);
+
+                if (counter == null && System.currentTimeMillis() - startTime 
>= requestTimeout) {

Review comment:
       should exception also be thrown if `counter != null && counter.get() + 
numBuffersPerRequest <= maxBuffersPerRequester`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {
+        checkArgument(totalBytes > 0, "Total memory size must be positive.");
+        checkArgument(bufferSize > 0, "Size of buffer must be positive.");
+        checkArgument(requestTimeout > 0, "Request timeout must be positive.");
+
+        this.totalBytes = totalBytes;
+        this.bufferSize = bufferSize;
+        this.requestTimeout = requestTimeout;
+
+        this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, 
Integer.MAX_VALUE);
+        this.numBuffersPerRequest =
+                Math.min(numTotalBuffers, Math.max(1, NUM_BYTES_PER_REQUEST / 
bufferSize));
+        this.maxBuffersPerRequester =
+                Math.max(

Review comment:
       IIUC, this should be `Math.min`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A fixed-size {@link MemorySegment} pool used by batch shuffle for shuffle 
data read (currently
+ * only used by sort-merge blocking shuffle).
+ */
+@Internal
+public class BatchShuffleReadBufferPool {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
+
+    /** Minimum total memory size in bytes of this buffer pool. */
+    public static final int MIN_TOTAL_BYTES = 32 * 1024 * 1024;
+
+    /**
+     * Memory size in bytes can be allocated from this buffer pool for a 
single request (8M is for
+     * better sequential read).
+     */
+    public static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;
+
+    /** Total direct memory size in bytes can can be allocated and used by 
this buffer pool. */
+    private final long totalBytes;
+
+    /**
+     * Maximum time to wait in milliseconds when requesting read buffers from 
this buffer pool
+     * before throwing an exception.
+     */
+    private final long requestTimeout;
+
+    /** The number of total buffers in this buffer pool. */
+    private final int numTotalBuffers;
+
+    /** Size of each buffer in bytes in this buffer pool. */
+    private final int bufferSize;
+
+    /** The number of buffers to be returned for a single request. */
+    private final int numBuffersPerRequest;
+
+    /**
+     * The maximum number of buffers can be allocated from this buffer pool 
for a single buffer
+     * requester.
+     */
+    private final int maxBuffersPerRequester;
+
+    /** All available buffers in this buffer pool currently. */
+    @GuardedBy("buffers")
+    private final Queue<MemorySegment> buffers = new ArrayDeque<>();
+
+    /** Account for all the buffers requested per requester. */
+    @GuardedBy("buffers")
+    private final Map<Object, Counter> numBuffersAllocated = new HashMap<>();
+
+    /** Whether this buffer pool has been destroyed or not. */
+    @GuardedBy("buffers")
+    private boolean destroyed;
+
+    /** Whether this buffer pool has been initialized or not. */
+    @GuardedBy("buffers")
+    private boolean initialized;
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
+        // 5 min default buffer request timeout
+        this(totalBytes, bufferSize, 5 * 60 * 1000);
+    }
+
+    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, long 
requestTimeout) {
+        checkArgument(totalBytes > 0, "Total memory size must be positive.");
+        checkArgument(bufferSize > 0, "Size of buffer must be positive.");
+        checkArgument(requestTimeout > 0, "Request timeout must be positive.");
+
+        this.totalBytes = totalBytes;
+        this.bufferSize = bufferSize;
+        this.requestTimeout = requestTimeout;
+
+        this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, 
Integer.MAX_VALUE);
+        this.numBuffersPerRequest =
+                Math.min(numTotalBuffers, Math.max(1, NUM_BYTES_PER_REQUEST / 
bufferSize));
+        this.maxBuffersPerRequester =
+                Math.max(
+                        4 * numBuffersPerRequest,
+                        numTotalBuffers / 
BatchShuffleReadIOExecutor.MAX_NUM_THREADS);

Review comment:
       maybe it should be `numTotalBuffers / numThreads`? And given that 
`numThreads` equals `numTotalBuffers/numBuffersPerRequest`(i.e. 
`maxAcceptableConcurrentRequests`), the does not need to aware of 
`BatchShuffleReadIOExecutor`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to