[ 
https://issues.apache.org/jira/browse/FLINK-17464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17098982#comment-17098982
 ] 

Till Rohrmann edited comment on FLINK-17464 at 5/4/20, 2:35 PM:
----------------------------------------------------------------

Thanks for reporting this issue [~johnlon]. Your description of Flink's 
behaviour is correct. The reasoning behind this behaviour is that Flink must 
not lose jobs due to transient exceptions. When recovering jobs, Flink needs to 
interact with external systems and it can happen that certain operations fail. 
If Flink encounters a problem, it takes the conservative approach to fail the 
complete process so that a new process can take over and try again to recover 
the persisted jobs. This will work if the encounter problem eventually 
disappears.

Unfortunately, it won't work when the problem repeats deterministically as in 
your case or if someone would meddle around with some internal state of Flink 
(e.g. removing persisted blobs belonging to a submitted job). This problem even 
more problematic in case of a session cluster where other jobs are affected by 
one faulty job.

Ideally, one would like to distinguish between transient exceptions and 
deterministic ones. If this were possible, then one could retry for the former 
ones and fail the jobs in case one encounters the latter ones. Since this is in 
general a hard problem for which I don't know a good solution, I guess it is a 
good proposal to make the failure behaviour in case of recoveries configurable. 
As you have suggested such a sandbox mode could simply transition the job into 
a failed state instead of failing the whole process. 

The drawback of such a mode would be that you might fail some jobs which might 
be recoverable if retried a bit more.


was (Author: till.rohrmann):
Thanks for reporting this issue [~johnlon]. Your description of Flink's 
behaviour is correct. The reasoning behind this behaviour is that Flink must 
not lose jobs due to ephemeral exceptions. When recovering jobs, Flink needs to 
interact with external systems and it can happen that certain operations fail. 
If Flink encounters a problem, it takes the conservative approach to fail the 
complete process so that a new process can take over and try again to recover 
the persisted jobs. This will work if the encounter problem eventually 
disappears.

Unfortunately, it won't work when the problem repeats deterministically as in 
your case or if someone would meddle around with some internal state of Flink 
(e.g. removing persisted blobs belonging to a submitted job). This problem even 
more problematic in case of a session cluster where other jobs are affected by 
one faulty job.

Ideally, one would like to distinguish between ephemeral exceptions and 
deterministic ones. If this were possible, then one could retry for the former 
ones and fail the jobs in case one encounters the latter ones. Since this is in 
general a hard problem for which I don't know a good solution, I guess it is a 
good proposal to make the failure behaviour in case of recoveries configurable. 
As you have suggested such a sandbox mode could simply transition the job into 
a failed state instead of failing the whole process. 

The drawback of such a mode would be that you might fail some jobs which might 
be recoverable if retried a bit more.

> Stanalone HA Cluster crash with non-recoverable cluster state - need to wipe 
> cluster to recover service
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17464
>                 URL: https://issues.apache.org/jira/browse/FLINK-17464
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: John Lonergan
>            Priority: Critical
>
> When recovering job graphs after a failover of the JobManager, or after a 
> restart of the cluster, the HA Cluster can get into a state where it cannot 
> be restarted and the only resoluton we have identified is to destroy the 
> Zookkeeper job graph store.
> This happens when any job graph that is being recovered throws an exception 
> during recovery on the master. 
> Whilst we encountered this issues on a sink that extends "InitialiseOnMaster" 
> we believe the vulnerability is generic in nature and the unrecolverable 
> problems encountered will occur if the application code throws any exception 
> for any reason during recovery on the main line. 
> These application exceptions propagate up to the JobManager ClusterEntryPoint 
> class at which point the JM leader does a system.exit. If there are remaining 
> JobManagers then they will also follow leader election and also encounter the 
> same sequence of events. Ultimately all JM's exit and then all TM's fail 
> also. 
> The entire cluster is destroyed.
> Because these events happen during job graph recovery then merely attempt a 
> restart of the cluster will fail leaving the only option as destroying the 
> job graph state. 
> If one is running a shared cluster with many jobs then this is effectively a 
> DOS and results in prolonged down time as code or data changes are necessary 
> to work around the issue.
> --
> Of course if the same exception were to be thrown during job submission using 
> the CLI, then we would not see the cluster crashing nor the cluster being 
> corrupted; the job would merely fail.
> Our feeling is that the job graph recovery process ought to behave in a 
> similar fashion to the job submission processes.
> If a job submission fails then the job is recorded as failed and there is no 
> further impact on the cluster. However, if job recovery fails then the entire 
> cluster is taken down, and may as we have seen, become inoperable.
> We feel that a failure to restore a single job graph ought merely to result 
> in the job being recorded as failed. It should not result in a cluster-wide 
> impact.
> We do not understand the logic of the design in this space. However, if the 
> existing logic was for the benefit of single job clusters then this is a poor 
> result for multi job clusters. In which case we ought to be able to configure 
> a cluster for "multi-job mode" so that job graph recovery is "sandboxed"  and 
> doesn't take out the entire cluster.
> ---
> It is easy to demonstrate the problem using the built in Flink streaming Word 
> Count example.
> In order for this to work you configure the job to write a single output file 
> and also write this to HDFS not to a local disk. 
> You will note that the class FileOutputFormat extends InitializeOnMaster and 
> the initializeGlobal() function executes only when the file is on HDFS, not 
> on local disk.
> When this functon runs it will generate an exception if the output already 
> exists.
> Therefore to demonstrate the issues do the following:
> - configure the job to write a single file to HDFS
> - configure the job to to read a large file so that the job takes some time 
> to execute and we have time to complete the next few steps bnefore the job 
> finishes.
> - run the job on a HA cluster with two JM nodes
> - wait for the job to start and the output file to be created
> - kill the leader JM before the job has finished 
> - observe JM failover occuring ... 
> - recovery during failover will NOT suceed because the recovery of the Word 
> Count job will fail due to the presence of the output file
> - observe all JM's and TM's ultimately terminating
> Once the cluster has outright failed then try and restart it.
> During restart the cluster will detect the presence of job graphs in Zk and 
> attempt to restore them. This however, is doomed due to the same 
> vulnerability that causes the global outage above.
> -------
> For operability Flink needs a mod such that the job graph recovery process is 
> entirely sandboxed and failure of a given job during job graph recovery ought 
> to result merely in a failed job and not a failed cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to