This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a36fac6fceff59a237e90e55965abf76b39e3127 Author: Anton Kalashnikov <[email protected]> AuthorDate: Wed Jul 28 17:44:25 2021 +0200 [FLINK-23453][runtime] Message for notification about new buffer size(NewBufferSize) was added --- .../runtime/io/network/NetworkClientHandler.java | 8 ++++ .../runtime/io/network/PartitionRequestClient.java | 8 ++++ .../CreditBasedPartitionRequestClientHandler.java | 25 +++++++++++ .../runtime/io/network/netty/NettyMessage.java | 49 ++++++++++++++++++++++ .../network/netty/NettyPartitionRequestClient.java | 5 +++ .../netty/PartitionRequestServerHandler.java | 4 ++ .../io/network/TestingPartitionRequestClient.java | 3 ++ .../NettyMessageServerSideSerializationTest.java | 11 +++++ 8 files changed, 113 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java index 0db6f56..d13a33b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java @@ -49,6 +49,14 @@ public interface NetworkClientHandler extends ChannelHandler { void notifyCreditAvailable(final RemoteInputChannel inputChannel); /** + * The new size should be announced after it was calculated on the receiver side. + * + * @param inputChannel The input channel with new buffer size. + * @param bufferSize The new buffer size. + */ + void notifyNewBufferSize(final RemoteInputChannel inputChannel, int bufferSize); + + /** * Resumes data consumption from the producer after an exactly once checkpoint. * * @param inputChannel The input channel to resume data consumption. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java index ad7e622..480d320 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java @@ -50,6 +50,14 @@ public interface PartitionRequestClient { void notifyCreditAvailable(RemoteInputChannel inputChannel); /** + * Notifies new buffer size from one remote input channel. + * + * @param inputChannel The remote input channel who announces the new buffer size. + * @param bufferSize The new buffer size. + */ + void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize); + + /** * Requests to resume data consumption from one remote input channel. * * @param inputChannel The remote input channel who is ready to resume data consumption. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java index 41d9143..f8660e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java @@ -131,6 +131,17 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap } @Override + public void notifyNewBufferSize(final RemoteInputChannel inputChannel, int bufferSize) { + ctx.executor() + .execute( + () -> + ctx.pipeline() + .fireUserEventTriggered( + new NewBufferSizeMessage( + inputChannel, bufferSize))); + } + + @Override public void resumeConsumption(RemoteInputChannel inputChannel) { ctx.executor() .execute( @@ -458,6 +469,20 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap } } + private static class NewBufferSizeMessage extends ClientOutboundMessage { + private final int bufferSize; + + NewBufferSizeMessage(RemoteInputChannel inputChannel, int bufferSize) { + super(checkNotNull(inputChannel)); + this.bufferSize = bufferSize; + } + + @Override + public Object buildMessage() { + return new NettyMessage.NewBufferSize(bufferSize, inputChannel.getInputChannelId()); + } + } + private static class ResumeConsumptionMessage extends ClientOutboundMessage { ResumeConsumptionMessage(RemoteInputChannel inputChannel) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java index c41d8fd..3f62400 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java @@ -236,6 +236,9 @@ public abstract class NettyMessage { case AckAllUserRecordsProcessed.ID: decodedMsg = AckAllUserRecordsProcessed.readFrom(msg); break; + case NewBufferSize.ID: + decodedMsg = NewBufferSize.readFrom(msg); + break; default: throw new ProtocolException( "Received unknown message from producer: " + msg); @@ -841,6 +844,52 @@ public abstract class NettyMessage { } } + /** Message to notify producer about new buffer size. */ + static class NewBufferSize extends NettyMessage { + + private static final byte ID = 10; + + final long bufferSize; + + final InputChannelID receiverId; + + NewBufferSize(long bufferSize, InputChannelID receiverId) { + checkArgument(bufferSize > 0, "The new buffer size should be greater than 0"); + this.bufferSize = bufferSize; + this.receiverId = receiverId; + } + + @Override + void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) + throws IOException { + ByteBuf result = null; + + try { + result = + allocateBuffer( + allocator, ID, Long.BYTES + InputChannelID.getByteBufLength()); + result.writeLong(bufferSize); + receiverId.writeTo(result); + + out.write(result, promise); + } catch (Throwable t) { + handleException(result, null, t); + } + } + + static NewBufferSize readFrom(ByteBuf buffer) { + long bufferSize = buffer.readLong(); + InputChannelID receiverId = InputChannelID.fromByteBuf(buffer); + + return new NewBufferSize(bufferSize, receiverId); + } + + @Override + public String toString() { + return String.format("NewBufferSize(%s : %d)", receiverId, bufferSize); + } + } + // ------------------------------------------------------------------------ void writeToChannel( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java index 7236bb2..4a99397 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java @@ -199,6 +199,11 @@ public class NettyPartitionRequestClient implements PartitionRequestClient { } @Override + public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) { + clientHandler.notifyNewBufferSize(inputChannel, bufferSize); + } + + @Override public void resumeConsumption(RemoteInputChannel inputChannel) { clientHandler.resumeConsumption(inputChannel); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java index b43ac54..1c8af58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.netty.NettyMessage.AckAllUserRecordsP import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit; import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest; import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest; +import org.apache.flink.runtime.io.network.netty.NettyMessage.NewBufferSize; import org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest; import org.apache.flink.runtime.io.network.netty.NettyMessage.ResumeConsumption; import org.apache.flink.runtime.io.network.netty.NettyMessage.TaskEventRequest; @@ -127,6 +128,9 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes AckAllUserRecordsProcessed request = (AckAllUserRecordsProcessed) msg; outboundQueue.acknowledgeAllRecordsProcessed(request.receiverId); + } else if (msgClazz == NewBufferSize.class) { + NewBufferSize request = (NewBufferSize) msg; + } else { LOG.warn("Received unexpected client request: {}", msg); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java index a9597a8..aa0f93f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java @@ -39,6 +39,9 @@ public class TestingPartitionRequestClient implements PartitionRequestClient { public void notifyCreditAvailable(RemoteInputChannel inputChannel) {} @Override + public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) {} + + @Override public void resumeConsumption(RemoteInputChannel inputChannel) {} @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java index 1e56195..66714de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java @@ -135,4 +135,15 @@ public class NettyMessageServerSideSerializationTest extends TestLogger { assertEquals(expected.receiverId, actual.receiverId); } + + @Test + public void testNewBufferSize() { + NettyMessage.NewBufferSize expected = + new NettyMessage.NewBufferSize( + random.nextInt(Integer.MAX_VALUE) + 1L, new InputChannelID()); + NettyMessage.NewBufferSize actual = encodeAndDecode(expected, channel); + + assertEquals(expected.bufferSize, actual.bufferSize); + assertEquals(expected.receiverId, actual.receiverId); + } }
