This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 75adb32 [FLINK-24035][network][refactor] Move the blocking allocation
of one floating buffer logic from the constructor of LocalBufferPool to
SingleInputGate#setupChannels()
75adb32 is described below
commit 75adb3214a4c4142e52e3af9d6f57da2e09b2849
Author: kevin.cyj <[email protected]>
AuthorDate: Tue Aug 31 20:26:56 2021 +0800
[FLINK-24035][network][refactor] Move the blocking allocation of one
floating buffer logic from the constructor of LocalBufferPool to
SingleInputGate#setupChannels()
This refactor makes the code cleaner and easier to understand. Besides, for
the output side, the blocking allocation of one floating buffer is not needed.
This closes #17075.
---
.../runtime/io/network/buffer/BufferPool.java | 8 +++++
.../runtime/io/network/buffer/LocalBufferPool.java | 30 ++++++++++++----
.../io/network/buffer/NetworkBufferPool.java | 5 +--
.../partition/consumer/SingleInputGate.java | 12 ++++++-
.../io/network/buffer/LocalBufferPoolTest.java | 42 ++++++++++++----------
.../runtime/io/network/buffer/NoOpBufferPool.java | 3 ++
.../io/network/buffer/UnpooledBufferPool.java | 3 ++
...editBasedPartitionRequestClientHandlerTest.java | 4 +--
.../NettyMessageClientDecoderDelegateTest.java | 2 +-
.../NettyMessageClientSideSerializationTest.java | 2 +-
.../partition/consumer/SingleInputGateBuilder.java | 6 ++--
11 files changed, 81 insertions(+), 36 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index 80d6e31..8474bf8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -18,10 +18,18 @@
package org.apache.flink.runtime.io.network.buffer;
+import java.io.IOException;
+
/** A dynamically sized buffer pool. */
public interface BufferPool extends BufferProvider, BufferRecycler {
/**
+ * Reserves the target number of segments to this pool. Will throw an
exception if it can not
+ * allocate enough segments.
+ */
+ void reserveSegments(int numberOfSegmentsToReserve) throws IOException;
+
+ /**
* Destroys this buffer pool.
*
* <p>If not all buffers are available, they are recycled lazily as soon
as they are recycled.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 2e57e5c..600c30d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -219,13 +219,9 @@ class LocalBufferPool implements BufferPool {
// Lock is only taken, because #checkAvailability asserts it. It's a
small penalty for
// thread safety.
synchronized (this.availableMemorySegments) {
- // Make sure that this buffer pool always has one buffer on
initialization. For input
- // side, it guarantees that the buffer listeners can get floating
buffers properly and
- // no deadlock will occur (see FLINK-24035 for more information).
For output side, it
- // means all result partitions will be available and ready for
output on initialization.
-
availableMemorySegments.add(networkBufferPool.requestMemorySegmentBlocking());
- ++numberOfRequestedMemorySegments;
- availabilityHelper.resetAvailable();
+ if (checkAvailability()) {
+ availabilityHelper.resetAvailable();
+ }
checkConsistentAvailability();
}
@@ -236,6 +232,26 @@ class LocalBufferPool implements BufferPool {
// ------------------------------------------------------------------------
@Override
+ public void reserveSegments(int numberOfSegmentsToReserve) throws
IOException {
+ checkArgument(
+ numberOfSegmentsToReserve <= numberOfRequiredMemorySegments,
+ "Can not reserve more segments than number of required
segments.");
+
+ CompletableFuture<?> toNotify = null;
+ synchronized (availableMemorySegments) {
+ checkState(!isDestroyed, "Buffer pool has been destroyed.");
+
+ if (numberOfRequestedMemorySegments < numberOfSegmentsToReserve) {
+ availableMemorySegments.addAll(
+ networkBufferPool.requestMemorySegmentsBlocking(
+ numberOfSegmentsToReserve -
numberOfRequestedMemorySegments));
+ toNotify = availabilityHelper.getUnavailableToResetAvailable();
+ }
+ }
+ mayNotifyAvailable(toNotify);
+ }
+
+ @Override
public boolean isDestroyed() {
synchronized (availableMemorySegments) {
return isDestroyed;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index ac8b369..945e37f 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -156,8 +156,9 @@ public class NetworkBufferPool
}
}
- public MemorySegment requestMemorySegmentBlocking() throws IOException {
- return internalRequestMemorySegments(1).get(0);
+ public List<MemorySegment> requestMemorySegmentsBlocking(int
numberOfSegmentsToRequest)
+ throws IOException {
+ return internalRequestMemorySegments(numberOfSegmentsToRequest);
}
public void recycle(MemorySegment segment) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 72afcca..8422e9e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -257,10 +257,11 @@ public class SingleInputGate extends IndexedInputGate {
checkState(
this.bufferPool == null,
"Bug in input gate setup logic: Already registered buffer
pool.");
- setupChannels();
BufferPool bufferPool = bufferPoolFactory.get();
setBufferPool(bufferPool);
+
+ setupChannels();
}
@Override
@@ -457,6 +458,15 @@ public class SingleInputGate extends IndexedInputGate {
/** Assign the exclusive buffers to all remote input channels directly for
credit-based mode. */
@VisibleForTesting
public void setupChannels() throws IOException {
+ // Allocate enough exclusive and floating buffers to guarantee that
job can make progress.
+ // Note: An exception will be thrown if there is no buffer available
in the given timeout.
+
+ // First allocate a single floating buffer to avoid potential deadlock
when the exclusive
+ // buffer is 0. See FLINK-24035 for more information.
+ bufferPool.reserveSegments(1);
+
+ // Next allocate the exclusive buffers per channel when the number of
exclusive buffer is
+ // larger than 0.
synchronized (requestLock) {
for (InputChannel inputChannel : inputChannels.values()) {
inputChannel.setup();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 1987286..37e685f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -54,6 +54,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
@@ -101,32 +102,37 @@ public class LocalBufferPoolTest extends TestLogger {
}
@Test
- public void testLocalBufferPoolInitialization() throws Exception {
+ public void testReserveSegments() throws Exception {
NetworkBufferPool networkBufferPool =
new NetworkBufferPool(2, memorySegmentSize,
Duration.ofSeconds(2));
+ try {
+ BufferPool bufferPool1 = networkBufferPool.createBufferPool(1, 2);
+ assertThrows(IllegalArgumentException.class, () ->
bufferPool1.reserveSegments(2));
- BufferPool localBufferPool = networkBufferPool.createBufferPool(1, 2);
- assertTrue(localBufferPool.isAvailable());
- assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+ // request all buffers
+ ArrayList<Buffer> buffers = new ArrayList<>(2);
+ buffers.add(bufferPool1.requestBuffer());
+ buffers.add(bufferPool1.requestBuffer());
+ assertEquals(2, buffers.size());
- // request all buffers
- ArrayList<Buffer> buffers = new ArrayList<>(2);
- buffers.add(localBufferPool.requestBuffer());
- buffers.add(localBufferPool.requestBuffer());
- assertEquals(2, buffers.size());
+ BufferPool bufferPool2 = networkBufferPool.createBufferPool(1, 10);
+ assertThrows(IOException.class, () ->
bufferPool2.reserveSegments(1));
+ assertFalse(bufferPool2.isAvailable());
- try {
- networkBufferPool.createBufferPool(1, 10);
- } catch (IOException exception) {
- // this is expected
- return;
- } finally {
buffers.forEach(Buffer::recycleBuffer);
- localBufferPool.lazyDestroy();
+ bufferPool1.lazyDestroy();
+ bufferPool2.lazyDestroy();
+
+ BufferPool bufferPool3 = networkBufferPool.createBufferPool(2, 10);
+ assertEquals(1, bufferPool3.getNumberOfAvailableMemorySegments());
+ bufferPool3.reserveSegments(2);
+ assertEquals(2, bufferPool3.getNumberOfAvailableMemorySegments());
+
+ bufferPool3.lazyDestroy();
+ assertThrows(IllegalStateException.class, () ->
bufferPool3.reserveSegments(1));
+ } finally {
networkBufferPool.destroy();
}
-
- fail("Should throw IOException");
}
@Test
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
index a7cc030..bdd6b4b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
@@ -28,6 +28,9 @@ import java.util.concurrent.CompletableFuture;
public class NoOpBufferPool implements BufferPool {
@Override
+ public void reserveSegments(int numberOfSegmentsToReserve) {}
+
+ @Override
public void lazyDestroy() {}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
index 73b0804..eb4d019 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
@@ -29,6 +29,9 @@ public class UnpooledBufferPool implements BufferPool {
private static final int SEGMENT_SIZE = 1024;
@Override
+ public void reserveSegments(int numberOfSegmentsToReserve) {}
+
+ @Override
public void lazyDestroy() {}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 6e8cab9..c53717e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -602,7 +602,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
try {
inputGate.setInputChannels(inputChannel);
- inputGate.setupChannels();
+ inputGate.setup();
inputGate.requestPartitions();
handler.addInputChannel(inputChannel);
@@ -732,7 +732,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
SingleInputGate inputGate = createSingleInputGate(1,
networkBufferPool);
RemoteInputChannel inputChannel = new
InputChannelBuilder().buildRemoteChannel(inputGate);
inputGate.setInputChannels(inputChannel);
- inputGate.setupChannels();
+ inputGate.setup();
CreditBasedPartitionRequestClientHandler handler =
new CreditBasedPartitionRequestClientHandler();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
index 67f9301..4931a2f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
@@ -84,7 +84,7 @@ public class NettyMessageClientDecoderDelegateTest extends
TestLogger {
createRemoteInputChannel(
inputGate, new TestingPartitionRequestClient(),
NUMBER_OF_BUFFER_RESPONSES);
inputGate.setInputChannels(inputChannel);
- inputGate.setupChannels();
+ inputGate.setup();
inputChannel.requestSubpartition(0);
handler.addInputChannel(inputChannel);
inputChannelId = inputChannel.getInputChannelId();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
index f3cc50f..cba3be2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
@@ -86,7 +86,7 @@ public class NettyMessageClientSideSerializationTest extends
TestLogger {
createRemoteInputChannel(inputGate, new
TestingPartitionRequestClient());
inputChannel.requestSubpartition(0);
inputGate.setInputChannels(inputChannel);
- inputGate.setupChannels();
+ inputGate.setup();
CreditBasedPartitionRequestClientHandler handler =
new CreditBasedPartitionRequestClientHandler();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 1bf5f27..2fff0c6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -23,6 +23,7 @@ import
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NoOpBufferPool;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -66,10 +67,7 @@ public class SingleInputGateBuilder {
@Nullable
private BiFunction<InputChannelBuilder, SingleInputGate, InputChannel>
channelFactory = null;
- private SupplierWithException<BufferPool, IOException> bufferPoolFactory =
- () -> {
- throw new UnsupportedOperationException();
- };
+ private SupplierWithException<BufferPool, IOException> bufferPoolFactory =
NoOpBufferPool::new;
public SingleInputGateBuilder setPartitionProducerStateProvider(
PartitionProducerStateProvider partitionProducerStateProvider) {