[
https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17310341#comment-17310341
]
Jiayi Liao commented on FLINK-21990:
------------------------------------
[~pnowojski] We've encountered the problem when upgrading Flink version from
1.9 to 1.11. This seems to be a very serious problem and can be reproduced with
the test case above. Could you spare some time and take a look?
> SourceStreamTask will always hang if the CheckpointedFunction#snapshotState
> throws an exception.
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-21990
> URL: https://issues.apache.org/jira/browse/FLINK-21990
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.11.0, 1.12.0
> Reporter: ming li
> Priority: Major
>
> If the source in {{SourceStreamTask}} implements {{CheckpointedFunction}} and
> an exception is thrown in the snapshotState method, then the
> {{SourceStreamTask}} will always hang.
> The main reason is that the checkpoint is executed in the mailbox. When the
> {{CheckpointedFunction#snapshotState}} of the source throws an exception,
> the StreamTask#cleanUpInvoke will be called, where it will wait for the end
> of the {{LegacySourceFunctionThread}} of the source. However, the source
> thread does not end by itself (this requires the user to control it), the
> {{Task}} will hang at this time, and the JobMaster has no perception of this
> behavior.
> {code:java}
> protected void cleanUpInvoke() throws Exception {
> getCompletionFuture().exceptionally(unused -> null).join(); //wait for
> the end of the source
> // clean up everything we initialized
> isRunning = false;
> ...
> }{code}
> I think we should call the cancel method of the source first, and then wait
> for the end.
> The following is my test code, the test branch is Flink's master branch.
> {code:java}
> @Test
> public void testSourceFailure() throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(2000L);
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
> JobGraph jobGraph =
> StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
> try {
> // assert that the job only execute checkpoint once and only failed
> once.
> TestUtils.submitJobAndWaitForResult(
> cluster.getClusterClient(), jobGraph,
> getClass().getClassLoader());
> } catch (JobExecutionException jobException) {
> Optional<FlinkRuntimeException> throwable =
> ExceptionUtils.findThrowable(jobException,
> FlinkRuntimeException.class);
> Assert.assertTrue(throwable.isPresent());
> Assert.assertEquals(
>
> CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
> throwable.get().getMessage());
> }
> // assert that the job only failed once.
> Assert.assertEquals(1,
> StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
> }
> private static class FailedSource extends RichParallelSourceFunction<String>
> implements CheckpointedFunction {
> private transient boolean running;
> @Override
> public void open(Configuration parameters) throws Exception {
> running = true;
> }
> @Override
> public void run(SourceContext<String> ctx) throws Exception {
> while (running) {
> ctx.collect("test");
> }
> }
> @Override
> public void cancel() {
> running = false;
> }
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
> throw new RuntimeException("source failed");
> }
> @Override
> public void initializeState(FunctionInitializationContext context) throws
> Exception {}
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)