zhijiangW commented on a change in pull request #11351: [FLINK-16404][runtime] 
Solve the potential deadlock problem when reducing exclusive buffers to zero
URL: https://github.com/apache/flink/pull/11351#discussion_r397650441
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 ##########
 @@ -193,6 +199,16 @@ public void userEventTriggered(ChannelHandlerContext ctx, 
Object msg) throws Exc
                        if (triggerWrite) {
                                
writeAndFlushNextMessageIfPossible(ctx.channel());
                        }
+               } else if (msg instanceof ResumeConsumptionEvent) {
+                       RemoteInputChannel inputChannel = 
((ResumeConsumptionEvent) msg).inputChannel;
+                       inputChannel.unblockChannel();
+                       ResumeConsumption resumeConsumption = new 
ResumeConsumption(inputChannel.getInputChannelId());
+
+                       
ctx.channel().writeAndFlush(resumeConsumption).addListener((ChannelFutureListener)
 channelFuture -> {
 
 Review comment:
   It should make use of existing `writeAndFlushNextMessageIfPossible` for 
writing any messages on downstream side, otherwise we might miss some 
conditions such as `if (channelError.get() != null || !channel.isWritable()) ` 
inside previous `writeAndFlushNextMessageIfPossible`. And also bring much 
overhead work for maintaining two different paths.
   
   We can refactor the existing `inputChannelsWithCredit` as a more general 
outbound message queue to insert both `AddCredit` and `ResumeConsumption` 
messages. And define an abstract `ClientOutboundMessage` with `buildMessage` 
method to be implemented by `AddCredit` and `ResumeConsumption` separately 
during `writeAndFlushNextMessageIfPossible`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to