This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b92fa30541c3ed29be5b1f9758d4bbb1b0724be4
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Jul 28 18:18:10 2021 +0200

    [FLINK-23453][runtime] Prepared Gates and Channels classes for either 
providing information for the calculation of buffer size and receiving the 
recalculated buffer size.
---
 .../network/partition/consumer/BufferManager.java  |  6 +++
 .../partition/consumer/IndexedInputGate.java       |  4 ++
 .../network/partition/consumer/InputChannel.java   |  4 ++
 .../partition/consumer/LocalInputChannel.java      | 10 +++++
 .../partition/consumer/RecoveredInputChannel.java  | 12 +++++
 .../partition/consumer/RemoteInputChannel.java     | 22 ++++++++++
 .../partition/consumer/SingleInputGate.java        | 16 +++++++
 .../partition/consumer/UnknownInputChannel.java    | 10 +++++
 .../runtime/taskmanager/InputGateWithMetrics.java  | 10 +++++
 ...editBasedPartitionRequestClientHandlerTest.java | 51 +++++++++++++++++++++-
 .../partition/PipelinedSubpartitionTest.java       | 18 ++++++++
 .../partition/consumer/InputChannelTest.java       |  8 ++++
 .../partition/consumer/RemoteInputChannelTest.java | 30 +++++++++++++
 .../partition/consumer/TestInputChannel.java       | 12 +++++
 .../streaming/runtime/io/MockIndexedInputGate.java |  8 ++++
 .../flink/streaming/runtime/io/MockInputGate.java  |  8 ++++
 .../AlignedCheckpointsMassiveRandomTest.java       |  8 ++++
 17 files changed, 236 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
index f417f2d..8d8e774 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
@@ -367,6 +367,12 @@ public class BufferManager implements BufferListener, 
BufferRecycler {
         return numRequiredBuffers;
     }
 
+    int getNumberOfRequiredBuffers() {
+        synchronized (bufferQueue) {
+            return numRequiredBuffers;
+        }
+    }
+
     @VisibleForTesting
     boolean unsynchronizedIsWaitingForFloatingBuffers() {
         return isWaitingForFloatingBuffers;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index 9ac4be7..32cdca6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -64,4 +64,8 @@ public abstract class IndexedInputGate extends InputGate 
implements Checkpointab
     public void convertToPriorityEvent(int channelIndex, int sequenceNumber) 
throws IOException {
         getChannel(channelIndex).convertToPriorityEvent(sequenceNumber);
     }
+
+    public abstract int getBuffersInUseCount();
+
+    public abstract void announceBufferSize(int bufferSize);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 474d192..3a37d20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -211,6 +211,10 @@ public abstract class InputChannel {
     /** Releases all resources of the channel. */
     abstract void releaseAllResources() throws IOException;
 
+    abstract void announceBufferSize(int newBufferSize);
+
+    abstract int getBuffersInUseCount();
+
     // ------------------------------------------------------------------------
     // Error notification
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 15121b2..62918d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -337,6 +337,16 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
     }
 
     @Override
+    void announceBufferSize(int newBufferSize) {
+        // Not supported.
+    }
+
+    @Override
+    int getBuffersInUseCount() {
+        return subpartitionView.getNumberOfQueuedBuffers();
+    }
+
+    @Override
     public int unsynchronizedGetNumberOfQueuedBuffers() {
         ResultSubpartitionView view = subpartitionView;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 9c8ba13..7f31116 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -201,6 +201,13 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
     }
 
     @Override
+    int getBuffersInUseCount() {
+        synchronized (receivedBuffers) {
+            return receivedBuffers.size();
+        }
+    }
+
+    @Override
     public void resumeConsumption() {
         throw new UnsupportedOperationException("RecoveredInputChannel should 
never be blocked.");
     }
@@ -272,4 +279,9 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
     public void checkpointStarted(CheckpointBarrier barrier) throws 
CheckpointException {
         throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
     }
+
+    @Override
+    void announceBufferSize(int newBufferSize) {
+        // Not supported.
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 3623f03..07de42c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
 
@@ -285,6 +286,21 @@ public class RemoteInputChannel extends InputChannel {
         }
     }
 
+    @Override
+    int getBuffersInUseCount() {
+        return getNumberOfQueuedBuffers()
+                + Math.max(0, bufferManager.getNumberOfRequiredBuffers() - 
initialCredit);
+    }
+
+    @Override
+    void announceBufferSize(int newBufferSize) {
+        try {
+            notifyNewBufferSize(newBufferSize);
+        } catch (Throwable t) {
+            ExceptionUtils.rethrow(t);
+        }
+    }
+
     private void failPartitionRequest() {
         setError(new PartitionNotFoundException(partitionId));
     }
@@ -307,6 +323,12 @@ public class RemoteInputChannel extends InputChannel {
         partitionRequestClient.notifyCreditAvailable(this);
     }
 
+    private void notifyNewBufferSize(int newBufferSize) throws IOException {
+        checkPartitionRequestQueueInitialized();
+
+        partitionRequestClient.notifyNewBufferSize(this, newBufferSize);
+    }
+
     @VisibleForTesting
     public int getNumberOfAvailableBuffers() {
         return bufferManager.getNumberOfAvailableBuffers();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index a1b120d..a203d16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -366,6 +366,22 @@ public class SingleInputGate extends IndexedInputGate {
         return unfinishedChannels;
     }
 
+    @Override
+    public int getBuffersInUseCount() {
+        int total = 0;
+        for (InputChannel channel : channels) {
+            total += Math.max(1, channel.getBuffersInUseCount());
+        }
+        return total;
+    }
+
+    @Override
+    public void announceBufferSize(int newBufferSize) {
+        for (InputChannel channel : channels) {
+            channel.announceBufferSize(newBufferSize);
+        }
+    }
+
     /**
      * Returns the type of this input channel's consumed result partition.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 9c204b9..88d91c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -131,6 +131,16 @@ class UnknownInputChannel extends InputChannel implements 
ChannelStateHolder {
     }
 
     @Override
+    void announceBufferSize(int newBufferSize) {
+        // Not supported.
+    }
+
+    @Override
+    int getBuffersInUseCount() {
+        return 0;
+    }
+
+    @Override
     public String toString() {
         return "UnknownInputChannel [" + partitionId + "]";
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index cbfa594..bc6ca20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -93,6 +93,16 @@ public class InputGateWithMetrics extends IndexedInputGate {
     }
 
     @Override
+    public int getBuffersInUseCount() {
+        return inputGate.getBuffersInUseCount();
+    }
+
+    @Override
+    public void announceBufferSize(int bufferSize) {
+        inputGate.announceBufferSize(bufferSize);
+    }
+
+    @Override
     public boolean isFinished() {
         return inputGate.isFinished();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 3978fc7..fc49356 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -64,13 +64,14 @@ import java.io.IOException;
 import static 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest.blockChannel;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -81,6 +82,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/** Test for {@link CreditBasedPartitionRequestClientHandler}. */
 public class CreditBasedPartitionRequestClientHandlerTest {
 
     /**
@@ -674,6 +676,53 @@ public class CreditBasedPartitionRequestClientHandlerTest {
         }
     }
 
+    @Test
+    public void testAnnounceBufferSize() throws Exception {
+        final CreditBasedPartitionRequestClientHandler handler =
+                new CreditBasedPartitionRequestClientHandler();
+        final EmbeddedChannel channel = new EmbeddedChannel(handler);
+        final PartitionRequestClient client =
+                new NettyPartitionRequestClient(
+                        channel,
+                        handler,
+                        mock(ConnectionID.class),
+                        mock(PartitionRequestClientFactory.class));
+
+        final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 
32);
+        final SingleInputGate inputGate = createSingleInputGate(2, 
networkBufferPool);
+        final RemoteInputChannel[] inputChannels = new RemoteInputChannel[2];
+        inputChannels[0] = createRemoteInputChannel(inputGate, client);
+        inputChannels[1] = createRemoteInputChannel(inputGate, client);
+        try {
+            inputGate.setInputChannels(inputChannels);
+            final BufferPool bufferPool = 
networkBufferPool.createBufferPool(6, 6);
+            inputGate.setBufferPool(bufferPool);
+            inputGate.setupChannels();
+
+            inputChannels[0].requestSubpartition(0);
+            inputChannels[1].requestSubpartition(0);
+            channel.readOutbound();
+            channel.readOutbound();
+
+            inputGate.announceBufferSize(333);
+
+            channel.runPendingTasks();
+
+            NettyMessage.NewBufferSize readOutbound = channel.readOutbound();
+            assertThat(readOutbound, 
instanceOf(NettyMessage.NewBufferSize.class));
+            assertThat(readOutbound.receiverId, 
is(inputChannels[0].getInputChannelId()));
+            assertThat(readOutbound.bufferSize, is(333L));
+
+            readOutbound = channel.readOutbound();
+            assertThat(readOutbound.receiverId, 
is(inputChannels[1].getInputChannelId()));
+            assertThat(readOutbound.bufferSize, is(333L));
+
+        } finally {
+            releaseResource(inputGate, networkBufferPool);
+            channel.close();
+        }
+    }
+
     private void testReadBufferResponseWithReleasingOrRemovingChannel(
             boolean isRemoved, boolean readBeforeReleasingOrRemoving) throws 
Exception {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index c59e24e..8ef0937 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -243,6 +243,8 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
         try {
             partition.add(buffer1);
             partition.add(buffer2);
+            assertEquals(2, partition.getNumberOfQueuedBuffers());
+
             // create the read view first
             ResultSubpartitionView view = null;
             if (createView) {
@@ -250,6 +252,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
             }
 
             partition.release();
+            assertEquals(0, partition.getNumberOfQueuedBuffers());
 
             assertTrue(partition.isReleased());
             if (createView) {
@@ -282,6 +285,21 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
         verifyViewReleasedAfterParentRelease(partition);
     }
 
+    @Test
+    public void testNumberOfQueueBuffers() throws Exception {
+        final PipelinedSubpartition subpartition = createSubpartition();
+
+        subpartition.add(createFilledFinishedBufferConsumer(4096));
+        assertEquals(1, subpartition.getNumberOfQueuedBuffers());
+
+        subpartition.add(createFilledFinishedBufferConsumer(4096));
+        assertEquals(2, subpartition.getNumberOfQueuedBuffers());
+
+        subpartition.getNextBuffer();
+
+        assertEquals(1, subpartition.getNumberOfQueuedBuffers());
+    }
+
     private void verifyViewReleasedAfterParentRelease(ResultSubpartition 
partition)
             throws Exception {
         // Add a bufferConsumer
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index ba2c1f2..d6de0ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -156,5 +156,13 @@ public class InputChannelTest {
 
         @Override
         void releaseAllResources() throws IOException {}
+
+        @Override
+        void announceBufferSize(int newBufferSize) {}
+
+        @Override
+        int getBuffersInUseCount() {
+            return 0;
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index bea6b6c..ff20ebb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -1847,6 +1847,36 @@ public class RemoteInputChannelTest {
                                 }));
     }
 
+    @Test
+    public void testBuffersInUseCount() throws Exception {
+        // Setup
+        RemoteInputChannel remoteInputChannel = buildInputGateAndGetChannel();
+
+        final Buffer buffer = createBuffer(TestBufferFactory.BUFFER_SIZE);
+
+        // Receiving the buffer with backlog.
+        remoteInputChannel.onBuffer(buffer.retainBuffer(), 0, 1);
+        // 1 buffer + 1 backlog.
+        assertEquals(2, remoteInputChannel.getBuffersInUseCount());
+
+        remoteInputChannel.onBuffer(buffer.retainBuffer(), 1, 3);
+        // 2 buffer + 3 backlog.
+        assertEquals(5, remoteInputChannel.getBuffersInUseCount());
+
+        // 1 buffer + 3 backlog.
+        remoteInputChannel.getNextBuffer();
+        assertEquals(4, remoteInputChannel.getBuffersInUseCount());
+
+        // 0 buffer + 3 backlog.
+        remoteInputChannel.getNextBuffer();
+        assertEquals(3, remoteInputChannel.getBuffersInUseCount());
+
+        // 0 buffer + 3 backlog. Nothing changes from previous case because 
receivedBuffers was
+        // already empty.
+        remoteInputChannel.getNextBuffer();
+        assertEquals(3, remoteInputChannel.getBuffersInUseCount());
+    }
+
     /**
      * Requests the buffers from input channel and buffer pool first and then 
recycles them by a
      * callable task.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index a9ee13d..ae6753c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -60,6 +60,8 @@ public class TestInputChannel extends InputChannel {
 
     private int sequenceNumber;
 
+    private int currentBufferSize;
+
     public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
         this(inputGate, channelIndex, true, false);
     }
@@ -187,6 +189,16 @@ public class TestInputChannel extends InputChannel {
     }
 
     @Override
+    void announceBufferSize(int newBufferSize) {
+        currentBufferSize = newBufferSize;
+    }
+
+    @Override
+    int getBuffersInUseCount() {
+        return buffers.size();
+    }
+
+    @Override
     public void resumeConsumption() {
         isBlocked = false;
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
index 3ec3ca7..e834bc4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
@@ -120,4 +120,12 @@ public class MockIndexedInputGate extends IndexedInputGate 
{
     public List<InputChannelInfo> getUnfinishedChannels() {
         return Collections.emptyList();
     }
+
+    @Override
+    public int getBuffersInUseCount() {
+        return 0;
+    }
+
+    @Override
+    public void announceBufferSize(int bufferSize) {}
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 94ae708..7b9b0f3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -158,6 +158,14 @@ public class MockInputGate extends IndexedInputGate {
         }
     }
 
+    @Override
+    public int getBuffersInUseCount() {
+        return 0;
+    }
+
+    @Override
+    public void announceBufferSize(int bufferSize) {}
+
     public Set<Integer> getBlockedChannels() {
         return blockedChannels;
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
index 6c0ffa2..033c0a3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
@@ -235,6 +235,14 @@ public class AlignedCheckpointsMassiveRandomTest {
         @Override
         public void checkpointStopped(long cancelledCheckpointId) {}
 
+        @Override
+        public int getBuffersInUseCount() {
+            return 0;
+        }
+
+        @Override
+        public void announceBufferSize(int bufferSize) {}
+
         public void acknowledgeAllRecordsProcessed(InputChannelInfo 
channelInfo)
                 throws IOException {}
 

Reply via email to