[ 
https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701679#comment-16701679
 ] 

Qi commented on FLINK-10941:
----------------------------

Hi [~till.rohrmann] and [~zjwang],

Before the external shuffle service is implemented, I want to fix this issue by 
checking whether Result Partitions in TMs are fully consumed before TM is 
releasing. I've opened a PR and it works well in our Flink environment (tested 
in batch jobs with 1000 TMs).

Could you kindly help review the PR to help fix this issue? Thank you!

[1] [https://github.com/apache/flink/pull/7186]

 

Regards,

Qi

> Slots prematurely released which still contain unconsumed data 
> ---------------------------------------------------------------
>
>                 Key: FLINK-10941
>                 URL: https://issues.apache.org/jira/browse/FLINK-10941
>             Project: Flink
>          Issue Type: Bug
>          Components: ResourceManager
>    Affects Versions: 1.5.5, 1.6.2, 1.7.0
>            Reporter: Qi
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4 
> parallelism to write data sink.
>  
> The read task worked perfectly with 32 TMs. However when the job was 
> executing the write task, since only 4 TMs were needed, other 28 TMs were 
> released. This caused RemoteTransportException in the write task:
>  
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connection unexpectedly closed by remote task manager 
> ’the_previous_TM_used_by_read_task'. This might indicate that the remote task 
> manager was lost.
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>       ...
>  
> After skimming YarnFlinkResourceManager related code, it seems to me that 
> Flink is releasing TMs when they’re idle, regardless of whether working TMs 
> need them.
>  
> Put in another way, Flink seems to prematurely release slots which contain 
> unconsumed data and, thus, eventually release a TM which then fails a 
> consuming task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to