This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b92fa30541c3ed29be5b1f9758d4bbb1b0724be4 Author: Anton Kalashnikov <[email protected]> AuthorDate: Wed Jul 28 18:18:10 2021 +0200 [FLINK-23453][runtime] Prepared Gates and Channels classes for either providing information for the calculation of buffer size and receiving the recalculated buffer size. --- .../network/partition/consumer/BufferManager.java | 6 +++ .../partition/consumer/IndexedInputGate.java | 4 ++ .../network/partition/consumer/InputChannel.java | 4 ++ .../partition/consumer/LocalInputChannel.java | 10 +++++ .../partition/consumer/RecoveredInputChannel.java | 12 +++++ .../partition/consumer/RemoteInputChannel.java | 22 ++++++++++ .../partition/consumer/SingleInputGate.java | 16 +++++++ .../partition/consumer/UnknownInputChannel.java | 10 +++++ .../runtime/taskmanager/InputGateWithMetrics.java | 10 +++++ ...editBasedPartitionRequestClientHandlerTest.java | 51 +++++++++++++++++++++- .../partition/PipelinedSubpartitionTest.java | 18 ++++++++ .../partition/consumer/InputChannelTest.java | 8 ++++ .../partition/consumer/RemoteInputChannelTest.java | 30 +++++++++++++ .../partition/consumer/TestInputChannel.java | 12 +++++ .../streaming/runtime/io/MockIndexedInputGate.java | 8 ++++ .../flink/streaming/runtime/io/MockInputGate.java | 8 ++++ .../AlignedCheckpointsMassiveRandomTest.java | 8 ++++ 17 files changed, 236 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java index f417f2d..8d8e774 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java @@ -367,6 +367,12 @@ public class BufferManager implements BufferListener, BufferRecycler { return numRequiredBuffers; } + int getNumberOfRequiredBuffers() { + synchronized (bufferQueue) { + return numRequiredBuffers; + } + } + @VisibleForTesting boolean unsynchronizedIsWaitingForFloatingBuffers() { return isWaitingForFloatingBuffers; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java index 9ac4be7..32cdca6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java @@ -64,4 +64,8 @@ public abstract class IndexedInputGate extends InputGate implements Checkpointab public void convertToPriorityEvent(int channelIndex, int sequenceNumber) throws IOException { getChannel(channelIndex).convertToPriorityEvent(sequenceNumber); } + + public abstract int getBuffersInUseCount(); + + public abstract void announceBufferSize(int bufferSize); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 474d192..3a37d20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -211,6 +211,10 @@ public abstract class InputChannel { /** Releases all resources of the channel. */ abstract void releaseAllResources() throws IOException; + abstract void announceBufferSize(int newBufferSize); + + abstract int getBuffersInUseCount(); + // ------------------------------------------------------------------------ // Error notification // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 15121b2..62918d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -337,6 +337,16 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit } @Override + void announceBufferSize(int newBufferSize) { + // Not supported. + } + + @Override + int getBuffersInUseCount() { + return subpartitionView.getNumberOfQueuedBuffers(); + } + + @Override public int unsynchronizedGetNumberOfQueuedBuffers() { ResultSubpartitionView view = subpartitionView; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java index 9c8ba13..7f31116 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java @@ -201,6 +201,13 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan } @Override + int getBuffersInUseCount() { + synchronized (receivedBuffers) { + return receivedBuffers.size(); + } + } + + @Override public void resumeConsumption() { throw new UnsupportedOperationException("RecoveredInputChannel should never be blocked."); } @@ -272,4 +279,9 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException { throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY); } + + @Override + void announceBufferSize(int newBufferSize) { + // Not supported. + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 3623f03..07de42c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.PrioritizedDeque; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators; @@ -285,6 +286,21 @@ public class RemoteInputChannel extends InputChannel { } } + @Override + int getBuffersInUseCount() { + return getNumberOfQueuedBuffers() + + Math.max(0, bufferManager.getNumberOfRequiredBuffers() - initialCredit); + } + + @Override + void announceBufferSize(int newBufferSize) { + try { + notifyNewBufferSize(newBufferSize); + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + } + private void failPartitionRequest() { setError(new PartitionNotFoundException(partitionId)); } @@ -307,6 +323,12 @@ public class RemoteInputChannel extends InputChannel { partitionRequestClient.notifyCreditAvailable(this); } + private void notifyNewBufferSize(int newBufferSize) throws IOException { + checkPartitionRequestQueueInitialized(); + + partitionRequestClient.notifyNewBufferSize(this, newBufferSize); + } + @VisibleForTesting public int getNumberOfAvailableBuffers() { return bufferManager.getNumberOfAvailableBuffers(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index a1b120d..a203d16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -366,6 +366,22 @@ public class SingleInputGate extends IndexedInputGate { return unfinishedChannels; } + @Override + public int getBuffersInUseCount() { + int total = 0; + for (InputChannel channel : channels) { + total += Math.max(1, channel.getBuffersInUseCount()); + } + return total; + } + + @Override + public void announceBufferSize(int newBufferSize) { + for (InputChannel channel : channels) { + channel.announceBufferSize(newBufferSize); + } + } + /** * Returns the type of this input channel's consumed result partition. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java index 9c204b9..88d91c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java @@ -131,6 +131,16 @@ class UnknownInputChannel extends InputChannel implements ChannelStateHolder { } @Override + void announceBufferSize(int newBufferSize) { + // Not supported. + } + + @Override + int getBuffersInUseCount() { + return 0; + } + + @Override public String toString() { return "UnknownInputChannel [" + partitionId + "]"; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java index cbfa594..bc6ca20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -93,6 +93,16 @@ public class InputGateWithMetrics extends IndexedInputGate { } @Override + public int getBuffersInUseCount() { + return inputGate.getBuffersInUseCount(); + } + + @Override + public void announceBufferSize(int bufferSize) { + inputGate.announceBufferSize(bufferSize); + } + + @Override public boolean isFinished() { return inputGate.isFinished(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java index 3978fc7..fc49356 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java @@ -64,13 +64,14 @@ import java.io.IOException; import static org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest.blockChannel; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -81,6 +82,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +/** Test for {@link CreditBasedPartitionRequestClientHandler}. */ public class CreditBasedPartitionRequestClientHandlerTest { /** @@ -674,6 +676,53 @@ public class CreditBasedPartitionRequestClientHandlerTest { } } + @Test + public void testAnnounceBufferSize() throws Exception { + final CreditBasedPartitionRequestClientHandler handler = + new CreditBasedPartitionRequestClientHandler(); + final EmbeddedChannel channel = new EmbeddedChannel(handler); + final PartitionRequestClient client = + new NettyPartitionRequestClient( + channel, + handler, + mock(ConnectionID.class), + mock(PartitionRequestClientFactory.class)); + + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32); + final SingleInputGate inputGate = createSingleInputGate(2, networkBufferPool); + final RemoteInputChannel[] inputChannels = new RemoteInputChannel[2]; + inputChannels[0] = createRemoteInputChannel(inputGate, client); + inputChannels[1] = createRemoteInputChannel(inputGate, client); + try { + inputGate.setInputChannels(inputChannels); + final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6); + inputGate.setBufferPool(bufferPool); + inputGate.setupChannels(); + + inputChannels[0].requestSubpartition(0); + inputChannels[1].requestSubpartition(0); + channel.readOutbound(); + channel.readOutbound(); + + inputGate.announceBufferSize(333); + + channel.runPendingTasks(); + + NettyMessage.NewBufferSize readOutbound = channel.readOutbound(); + assertThat(readOutbound, instanceOf(NettyMessage.NewBufferSize.class)); + assertThat(readOutbound.receiverId, is(inputChannels[0].getInputChannelId())); + assertThat(readOutbound.bufferSize, is(333L)); + + readOutbound = channel.readOutbound(); + assertThat(readOutbound.receiverId, is(inputChannels[1].getInputChannelId())); + assertThat(readOutbound.bufferSize, is(333L)); + + } finally { + releaseResource(inputGate, networkBufferPool); + channel.close(); + } + } + private void testReadBufferResponseWithReleasingOrRemovingChannel( boolean isRemoved, boolean readBeforeReleasingOrRemoving) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index c59e24e..8ef0937 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -243,6 +243,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { try { partition.add(buffer1); partition.add(buffer2); + assertEquals(2, partition.getNumberOfQueuedBuffers()); + // create the read view first ResultSubpartitionView view = null; if (createView) { @@ -250,6 +252,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { } partition.release(); + assertEquals(0, partition.getNumberOfQueuedBuffers()); assertTrue(partition.isReleased()); if (createView) { @@ -282,6 +285,21 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { verifyViewReleasedAfterParentRelease(partition); } + @Test + public void testNumberOfQueueBuffers() throws Exception { + final PipelinedSubpartition subpartition = createSubpartition(); + + subpartition.add(createFilledFinishedBufferConsumer(4096)); + assertEquals(1, subpartition.getNumberOfQueuedBuffers()); + + subpartition.add(createFilledFinishedBufferConsumer(4096)); + assertEquals(2, subpartition.getNumberOfQueuedBuffers()); + + subpartition.getNextBuffer(); + + assertEquals(1, subpartition.getNumberOfQueuedBuffers()); + } + private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception { // Add a bufferConsumer diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java index ba2c1f2..d6de0ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java @@ -156,5 +156,13 @@ public class InputChannelTest { @Override void releaseAllResources() throws IOException {} + + @Override + void announceBufferSize(int newBufferSize) {} + + @Override + int getBuffersInUseCount() { + return 0; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index bea6b6c..ff20ebb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -1847,6 +1847,36 @@ public class RemoteInputChannelTest { })); } + @Test + public void testBuffersInUseCount() throws Exception { + // Setup + RemoteInputChannel remoteInputChannel = buildInputGateAndGetChannel(); + + final Buffer buffer = createBuffer(TestBufferFactory.BUFFER_SIZE); + + // Receiving the buffer with backlog. + remoteInputChannel.onBuffer(buffer.retainBuffer(), 0, 1); + // 1 buffer + 1 backlog. + assertEquals(2, remoteInputChannel.getBuffersInUseCount()); + + remoteInputChannel.onBuffer(buffer.retainBuffer(), 1, 3); + // 2 buffer + 3 backlog. + assertEquals(5, remoteInputChannel.getBuffersInUseCount()); + + // 1 buffer + 3 backlog. + remoteInputChannel.getNextBuffer(); + assertEquals(4, remoteInputChannel.getBuffersInUseCount()); + + // 0 buffer + 3 backlog. + remoteInputChannel.getNextBuffer(); + assertEquals(3, remoteInputChannel.getBuffersInUseCount()); + + // 0 buffer + 3 backlog. Nothing changes from previous case because receivedBuffers was + // already empty. + remoteInputChannel.getNextBuffer(); + assertEquals(3, remoteInputChannel.getBuffersInUseCount()); + } + /** * Requests the buffers from input channel and buffer pool first and then recycles them by a * callable task. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java index a9ee13d..ae6753c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java @@ -60,6 +60,8 @@ public class TestInputChannel extends InputChannel { private int sequenceNumber; + private int currentBufferSize; + public TestInputChannel(SingleInputGate inputGate, int channelIndex) { this(inputGate, channelIndex, true, false); } @@ -187,6 +189,16 @@ public class TestInputChannel extends InputChannel { } @Override + void announceBufferSize(int newBufferSize) { + currentBufferSize = newBufferSize; + } + + @Override + int getBuffersInUseCount() { + return buffers.size(); + } + + @Override public void resumeConsumption() { isBlocked = false; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java index 3ec3ca7..e834bc4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java @@ -120,4 +120,12 @@ public class MockIndexedInputGate extends IndexedInputGate { public List<InputChannelInfo> getUnfinishedChannels() { return Collections.emptyList(); } + + @Override + public int getBuffersInUseCount() { + return 0; + } + + @Override + public void announceBufferSize(int bufferSize) {} } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index 94ae708..7b9b0f3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -158,6 +158,14 @@ public class MockInputGate extends IndexedInputGate { } } + @Override + public int getBuffersInUseCount() { + return 0; + } + + @Override + public void announceBufferSize(int bufferSize) {} + public Set<Integer> getBlockedChannels() { return blockedChannels; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java index 6c0ffa2..033c0a3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java @@ -235,6 +235,14 @@ public class AlignedCheckpointsMassiveRandomTest { @Override public void checkpointStopped(long cancelledCheckpointId) {} + @Override + public int getBuffersInUseCount() { + return 0; + } + + @Override + public void announceBufferSize(int bufferSize) {} + public void acknowledgeAllRecordsProcessed(InputChannelInfo channelInfo) throws IOException {}
