[ 
https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16709805#comment-16709805
 ] 

ASF GitHub Bot commented on FLINK-10941:
----------------------------------------

zhijiangW commented on a change in pull request #7186: [FLINK-10941] Keep slots 
which contain unconsumed result partitions
URL: https://github.com/apache/flink/pull/7186#discussion_r238983295
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##########
 @@ -134,10 +138,17 @@ public void cancel(InputChannelID receiverId) {
                ctx.pipeline().fireUserEventTriggered(receiverId);
        }
 
-       public void close() {
+       public void close() throws IOException {
                if (ctx != null) {
                        ctx.channel().close();
                }
+
+               LOG.info("Close all {} readers pending for close.", 
readersToClose.size());
 
 Review comment:
   Sorry for late response because of a little busy these days.
   
   I agree with your above comments. So whether the task executor can be 
released is based on whether there are active channels in this executor. The 
task executor can only exit after all the tcp connections are closed gracefully.
   
   In theory as long as the downstream received all the data from the network, 
then the upstream side can be released normally, no need to wait all the 
received data are processed completely by downstream side. But we have on 
existing ack mechanism to notify upstream side of this, so it is easy to rely 
on close request currently.  Based on downstream's consumption to release 
upstream's resource, it may get extra benefits in failover scenarios in future 
for persistent output files in upstream side, because the upstream do not need 
to restart during consumption exception in downstream side.
   
   But I just a little wonder it might bring potential effects in future via 
close request. For example, if there are 10 downstream tasks reuse the same tcp 
connection, and 9 tasks are finished earlier and only one tail task delay long 
to finish, then all the 10 partition views must be released together until the 
last downstream task finished. Although it might be no bad effects for delay 
releasing partition views currently.
   
   I would continue reviewing other parts of this PR and it may take some days 
on my side. :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Slots prematurely released which still contain unconsumed data 
> ---------------------------------------------------------------
>
>                 Key: FLINK-10941
>                 URL: https://issues.apache.org/jira/browse/FLINK-10941
>             Project: Flink
>          Issue Type: Bug
>          Components: ResourceManager
>    Affects Versions: 1.5.5, 1.6.2, 1.7.0
>            Reporter: Qi
>            Assignee: Qi
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4 
> parallelism to write data sink.
>  
> The read task worked perfectly with 32 TMs. However when the job was 
> executing the write task, since only 4 TMs were needed, other 28 TMs were 
> released. This caused RemoteTransportException in the write task:
>  
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connection unexpectedly closed by remote task manager 
> ’the_previous_TM_used_by_read_task'. This might indicate that the remote task 
> manager was lost.
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>       ...
>  
> After skimming YarnFlinkResourceManager related code, it seems to me that 
> Flink is releasing TMs when they’re idle, regardless of whether working TMs 
> need them.
>  
> Put in another way, Flink seems to prematurely release slots which contain 
> unconsumed data and, thus, eventually release a TM which then fails a 
> consuming task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to