[
https://issues.apache.org/jira/browse/FLINK-21751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17378880#comment-17378880
]
Xintong Song edited comment on FLINK-21751 at 7/12/21, 3:31 AM:
----------------------------------------------------------------
[~chesnay], could you please check this instability? It seems caused by the
exactly problem that is expected to be fixed by this ticket.
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20267&view=logs&j=4dd4dbdd-1802-5eb7-a518-6acd9d24d0fc&t=8d6b4dd3-4ca1-5611-1743-57a7d76b395a&l=412
The test failed due to exceptions in logs. I executed the following command
from {{flink-end-to-end-tests/test-scripts/common.sh}} on the logs, and it
points to the {{RecipientUnreachableException}} in TM logs. The problem is
that, TM received extra slot requests from RM after the tasks are finished and
slots are freed, while the JobMaster it tried to offer slots to had already
shutdown.
{code}
$ grep -rv "GroupCoordinatorNotAvailableException" . \
| grep -v "RetriableCommitFailedException" \
| grep -v "NoAvailableBrokersException" \
| grep -v "Async Kafka commit failed" \
| grep -v "DisconnectException" \
| grep -v "Cannot connect to ResourceManager right now" \
| grep -v "AskTimeoutException" \
| grep -v "WARN akka.remote.transport.netty.NettyTransport" \
| grep -v "WARN
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
| grep -v 'INFO.*AWSErrorCode' \
| grep -v "RejectedExecutionException" \
| grep -v "CancellationException" \
| grep -v "An exception was thrown by an exception handler" \
| grep -v "Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException" \
| grep -v "Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.conf.Configuration" \
| grep -v "java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException" \
| grep -v "java.lang.NoClassDefFoundError:
org/apache/hadoop/conf/Configuration" \
| grep -v "java.lang.Exception: Execution was suspended" \
| grep -v "java.io.InvalidClassException:
org.apache.flink.formats.avro.typeutils.AvroSerializer" \
| grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
| grep -v "java.lang.Exception: Artificial failure" \
| grep -v "org.apache.flink.runtime.checkpoint.CheckpointException" \
| grep -v "org.elasticsearch.ElasticsearchException" \
| grep -v "Elasticsearch exception" \
| grep -v "org.apache.flink.runtime.JobException: Recovery is suppressed" \
| grep -v "WARN akka.remote.ReliableDeliverySupervisor" \
| grep -i "exception"
./flink-vsts-taskexecutor-0-fv-az217-107.log:org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
Could not send message [RemoteFencedMessage(00000000000000000000000000000000,
RemoteRpcInvocation(null.offerSlots(ResourceID, Collection, Time)))] from
sender [Actor[akka.tcp://[email protected]:38955/temp/$0b]] to recipient
[Actor[akka://flink/user/rpc/jobmanager_2#1483449133]], because the recipient
is unreachable. This can either mean that the recipient has been terminated or
that the remote RpcService is currently not reachable.
{code}
was (Author: xintongsong):
[~chesnay], could you please check this instability? It seems caused by the
exactly problem that is expected to be fixed by this ticket.
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20267&view=logs&j=4dd4dbdd-1802-5eb7-a518-6acd9d24d0fc&t=8d6b4dd3-4ca1-5611-1743-57a7d76b395a&l=412
The test failed due to exceptions in logs. I executed the following command
from {{flink-end-to-end-tests/test-scripts/common.sh}} on the logs, and it
points to the {{RecipientUnreachableException}} in TM logs. The problem is
that, TM received extra slot requests from RM after the tasks are finished and
slots are freed, while the JobMaster it tried to offer slots to had already
shutdown.
{code}
$ grep -rv "GroupCoordinatorNotAvailableException" . \
| grep -v "RetriableCommitFailedException" \
| grep -v "NoAvailableBrokersException" \
| grep -v "Async Kafka commit failed" \
| grep -v "DisconnectException" \
| grep -v "Cannot connect to ResourceManager right now" \
| grep -v "AskTimeoutException" \
| grep -v "WARN akka.remote.transport.netty.NettyTransport" \
| grep -v "WARN
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
| grep -v 'INFO.*AWSErrorCode' \
| grep -v "RejectedExecutionException" \
| grep -v "CancellationException" \
| grep -v "An exception was thrown by an exception handler" \
| grep -v "Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException" \
| grep -v "Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.conf.Configuration" \
| grep -v "java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException" \
| grep -v "java.lang.NoClassDefFoundError:
org/apache/hadoop/conf/Configuration" \
| grep -v "java.lang.Exception: Execution was suspended" \
| grep -v "java.io.InvalidClassException:
org.apache.flink.formats.avro.typeutils.AvroSerializer" \
| grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
| grep -v "java.lang.Exception: Artificial failure" \
| grep -v "org.apache.flink.runtime.checkpoint.CheckpointException" \
| grep -v "org.elasticsearch.ElasticsearchException" \
| grep -v "Elasticsearch exception" \
| grep -v "org.apache.flink.runtime.JobException: Recovery is suppressed" \
| grep -v "WARN akka.remote.ReliableDeliverySupervisor" \
| grep -i "exception"
./t.sh: | grep -i "exception"
./flink-vsts-taskexecutor-0-fv-az217-107.log:org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
Could not send message [RemoteFencedMessage(00000000000000000000000000000000,
RemoteRpcInvocation(null.offerSlots(ResourceID, Collection, Time)))] from
sender [Actor[akka.tcp://[email protected]:38955/temp/$0b]] to recipient
[Actor[akka://flink/user/rpc/jobmanager_2#1483449133]], because the recipient
is unreachable. This can either mean that the recipient has been terminated or
that the remote RpcService is currently not reachable.
{code}
> Improve handling of freed slots if final requirement message is in flight
> -------------------------------------------------------------------------
>
> Key: FLINK-21751
> URL: https://issues.apache.org/jira/browse/FLINK-21751
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.13.0
>
>
> When a job shuts down there is a race condition between slots being freed and
> requirements being set to 0. If the slot release arrives first at the RM then
> it will immediately try to re-allocate slots, since the requirements are not
> 0 yet.
> In practice this is unlikely to cause issues (because the trip from
> JobMaster->TM->RM should always take longer than JobMaster->RM), but this
> problem results in various test stabilities.
> Essentially there are 2 alternatives:
> a) enforce a strict order such that the requirement update must be
> acknowledged before slots are freed
> b) have the RM inform the TM if the job has finished, to clean up any pending
> slots.
> Both options are not ideal.
> a) implies that the JobMaster has to stick around longer to wait for the
> acknowledge and this also introduces a delay to all slot freeing operations.
> b) can easily lead to bugs in the future; if the TM was informed that the job
> has concluded it must only cancel pending slots; it may not free all job
> resources because other messages from the JM may still be in flight (for
> example, the partition promotions).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)