[
https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692945#comment-16692945
]
Till Rohrmann commented on FLINK-10941:
---------------------------------------
I think the underlying problem is that the {{JobMaster}} prematurely releases
slots even though they still contain unconsumed data. The idea is that
resources (e.g. produced data) are bound to the lifecycle of a slot. Once the
slot is released, it is safe to use the slots resources for something else (or
release the underlying TM).
At the moment, the slot is considered being used as long as the deployed
{{Task}} has not finished. Once the {{Task}} finishes, the slot is being freed
and if not used for some time (idle timeout), it will be released. Thus, one
solution to tackle this problem would be to let the {{Task}} only go into a
terminal state if its intermediate results have been persisted (e.g. committed
to an external shuffle service) or if its ephemeral data has been consumed.
> Slots prematurely released which still contain unconsumed data
> ---------------------------------------------------------------
>
> Key: FLINK-10941
> URL: https://issues.apache.org/jira/browse/FLINK-10941
> Project: Flink
> Issue Type: Bug
> Components: ResourceManager
> Affects Versions: 1.5.5, 1.6.2, 1.7.0
> Reporter: Qi
> Priority: Critical
>
> Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4
> parallelism to write data sink.
>
> The read task worked perfectly with 32 TMs. However when the job was
> executing the write task, since only 4 TMs were needed, other 28 TMs were
> released. This caused RemoteTransportException in the write task:
>
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager
> ’the_previous_TM_used_by_read_task'. This might indicate that the remote task
> manager was lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
> ...
>
> After skimming YarnFlinkResourceManager related code, it seems to me that
> Flink is releasing TMs when they’re idle, regardless of whether working TMs
> need them.
>
> Put in another way, Flink seems to prematurely release slots which contain
> unconsumed data and, thus, eventually release a TM which then fails a
> consuming task.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)