This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a36fac6fceff59a237e90e55965abf76b39e3127
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Jul 28 17:44:25 2021 +0200

    [FLINK-23453][runtime] Message for notification about new buffer 
size(NewBufferSize) was added
---
 .../runtime/io/network/NetworkClientHandler.java   |  8 ++++
 .../runtime/io/network/PartitionRequestClient.java |  8 ++++
 .../CreditBasedPartitionRequestClientHandler.java  | 25 +++++++++++
 .../runtime/io/network/netty/NettyMessage.java     | 49 ++++++++++++++++++++++
 .../network/netty/NettyPartitionRequestClient.java |  5 +++
 .../netty/PartitionRequestServerHandler.java       |  4 ++
 .../io/network/TestingPartitionRequestClient.java  |  3 ++
 .../NettyMessageServerSideSerializationTest.java   | 11 +++++
 8 files changed, 113 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
index 0db6f56..d13a33b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
@@ -49,6 +49,14 @@ public interface NetworkClientHandler extends ChannelHandler 
{
     void notifyCreditAvailable(final RemoteInputChannel inputChannel);
 
     /**
+     * The new size should be announced after it was calculated on the 
receiver side.
+     *
+     * @param inputChannel The input channel with new buffer size.
+     * @param bufferSize The new buffer size.
+     */
+    void notifyNewBufferSize(final RemoteInputChannel inputChannel, int 
bufferSize);
+
+    /**
      * Resumes data consumption from the producer after an exactly once 
checkpoint.
      *
      * @param inputChannel The input channel to resume data consumption.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
index ad7e622..480d320 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
@@ -50,6 +50,14 @@ public interface PartitionRequestClient {
     void notifyCreditAvailable(RemoteInputChannel inputChannel);
 
     /**
+     * Notifies new buffer size from one remote input channel.
+     *
+     * @param inputChannel The remote input channel who announces the new 
buffer size.
+     * @param bufferSize The new buffer size.
+     */
+    void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize);
+
+    /**
      * Requests to resume data consumption from one remote input channel.
      *
      * @param inputChannel The remote input channel who is ready to resume 
data consumption.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 41d9143..f8660e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -131,6 +131,17 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
     }
 
     @Override
+    public void notifyNewBufferSize(final RemoteInputChannel inputChannel, int 
bufferSize) {
+        ctx.executor()
+                .execute(
+                        () ->
+                                ctx.pipeline()
+                                        .fireUserEventTriggered(
+                                                new NewBufferSizeMessage(
+                                                        inputChannel, 
bufferSize)));
+    }
+
+    @Override
     public void resumeConsumption(RemoteInputChannel inputChannel) {
         ctx.executor()
                 .execute(
@@ -458,6 +469,20 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
         }
     }
 
+    private static class NewBufferSizeMessage extends ClientOutboundMessage {
+        private final int bufferSize;
+
+        NewBufferSizeMessage(RemoteInputChannel inputChannel, int bufferSize) {
+            super(checkNotNull(inputChannel));
+            this.bufferSize = bufferSize;
+        }
+
+        @Override
+        public Object buildMessage() {
+            return new NettyMessage.NewBufferSize(bufferSize, 
inputChannel.getInputChannelId());
+        }
+    }
+
     private static class ResumeConsumptionMessage extends 
ClientOutboundMessage {
 
         ResumeConsumptionMessage(RemoteInputChannel inputChannel) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index c41d8fd..3f62400 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -236,6 +236,9 @@ public abstract class NettyMessage {
                     case AckAllUserRecordsProcessed.ID:
                         decodedMsg = AckAllUserRecordsProcessed.readFrom(msg);
                         break;
+                    case NewBufferSize.ID:
+                        decodedMsg = NewBufferSize.readFrom(msg);
+                        break;
                     default:
                         throw new ProtocolException(
                                 "Received unknown message from producer: " + 
msg);
@@ -841,6 +844,52 @@ public abstract class NettyMessage {
         }
     }
 
+    /** Message to notify producer about new buffer size. */
+    static class NewBufferSize extends NettyMessage {
+
+        private static final byte ID = 10;
+
+        final long bufferSize;
+
+        final InputChannelID receiverId;
+
+        NewBufferSize(long bufferSize, InputChannelID receiverId) {
+            checkArgument(bufferSize > 0, "The new buffer size should be 
greater than 0");
+            this.bufferSize = bufferSize;
+            this.receiverId = receiverId;
+        }
+
+        @Override
+        void write(ChannelOutboundInvoker out, ChannelPromise promise, 
ByteBufAllocator allocator)
+                throws IOException {
+            ByteBuf result = null;
+
+            try {
+                result =
+                        allocateBuffer(
+                                allocator, ID, Long.BYTES + 
InputChannelID.getByteBufLength());
+                result.writeLong(bufferSize);
+                receiverId.writeTo(result);
+
+                out.write(result, promise);
+            } catch (Throwable t) {
+                handleException(result, null, t);
+            }
+        }
+
+        static NewBufferSize readFrom(ByteBuf buffer) {
+            long bufferSize = buffer.readLong();
+            InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+
+            return new NewBufferSize(bufferSize, receiverId);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("NewBufferSize(%s : %d)", receiverId, 
bufferSize);
+        }
+    }
+
     // ------------------------------------------------------------------------
 
     void writeToChannel(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
index 7236bb2..4a99397 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
@@ -199,6 +199,11 @@ public class NettyPartitionRequestClient implements 
PartitionRequestClient {
     }
 
     @Override
+    public void notifyNewBufferSize(RemoteInputChannel inputChannel, int 
bufferSize) {
+        clientHandler.notifyNewBufferSize(inputChannel, bufferSize);
+    }
+
+    @Override
     public void resumeConsumption(RemoteInputChannel inputChannel) {
         clientHandler.resumeConsumption(inputChannel);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index b43ac54..1c8af58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.netty.NettyMessage.AckAllUserRecordsP
 import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
 import 
org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.NewBufferSize;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
 import 
org.apache.flink.runtime.io.network.netty.NettyMessage.ResumeConsumption;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.TaskEventRequest;
@@ -127,6 +128,9 @@ class PartitionRequestServerHandler extends 
SimpleChannelInboundHandler<NettyMes
                 AckAllUserRecordsProcessed request = 
(AckAllUserRecordsProcessed) msg;
 
                 
outboundQueue.acknowledgeAllRecordsProcessed(request.receiverId);
+            } else if (msgClazz == NewBufferSize.class) {
+                NewBufferSize request = (NewBufferSize) msg;
+
             } else {
                 LOG.warn("Received unexpected client request: {}", msg);
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java
index a9597a8..aa0f93f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java
@@ -39,6 +39,9 @@ public class TestingPartitionRequestClient implements 
PartitionRequestClient {
     public void notifyCreditAvailable(RemoteInputChannel inputChannel) {}
 
     @Override
+    public void notifyNewBufferSize(RemoteInputChannel inputChannel, int 
bufferSize) {}
+
+    @Override
     public void resumeConsumption(RemoteInputChannel inputChannel) {}
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
index 1e56195..66714de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
@@ -135,4 +135,15 @@ public class NettyMessageServerSideSerializationTest 
extends TestLogger {
 
         assertEquals(expected.receiverId, actual.receiverId);
     }
+
+    @Test
+    public void testNewBufferSize() {
+        NettyMessage.NewBufferSize expected =
+                new NettyMessage.NewBufferSize(
+                        random.nextInt(Integer.MAX_VALUE) + 1L, new 
InputChannelID());
+        NettyMessage.NewBufferSize actual = encodeAndDecode(expected, channel);
+
+        assertEquals(expected.bufferSize, actual.bufferSize);
+        assertEquals(expected.receiverId, actual.receiverId);
+    }
 }

Reply via email to