[
https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ming li updated FLINK-21990:
----------------------------
Description:
If the source in {{SourceStreamTask}} implements {{CheckpointedFunction}} and
an exception is thrown in the snapshotState method, then the
{{SourceStreamTask}} will always hang.
The main reason is that the checkpoint is executed in the mailbox. When the
{{CheckpointedFunction#snapshotState}} of the source throws an exception, the
StreamTask#cleanUpInvoke will be called, where it will wait for the end of the
{{LegacySourceFunctionThread}} of the source. However, the source thread does
not end by itself (this requires the user to control it), the {{Task}} will
hang at this time, and the JobMaster has no perception of this behavior.
{code:java}
protected void cleanUpInvoke() throws Exception {
getCompletionFuture().exceptionally(unused -> null).join(); //wait for the
end of the source
// clean up everything we initialized
isRunning = false;
...
}{code}
I think we should call the cancel method of the source first, and then wait for
the end.
The following is my test code, the test branch is Flink's master branch.
{code:java}
@Test
public void testSourceFailure() throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2000L);
env.setRestartStrategy(RestartStrategies.noRestart());
env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
try {
// assert that the job only execute checkpoint once and only failed
once.
TestUtils.submitJobAndWaitForResult(
cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
} catch (JobExecutionException jobException) {
Optional<FlinkRuntimeException> throwable =
ExceptionUtils.findThrowable(jobException,
FlinkRuntimeException.class);
Assert.assertTrue(throwable.isPresent());
Assert.assertEquals(
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
throwable.get().getMessage());
}
// assert that the job only failed once.
Assert.assertEquals(1,
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
}
private static class FailedSource extends RichParallelSourceFunction<String>
implements CheckpointedFunction {
private transient boolean running;
@Override
public void open(Configuration parameters) throws Exception {
running = true;
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (running) {
ctx.collect("test");
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception
{
throw new RuntimeException("source failed");
}
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {}
}
{code}
was:
If the source in {{SourceStreamTask}} implements {{CheckpointedFunction}} and
an exception is thrown in the snapshotState method, then the
{{SourceStreamTask}} will always hang.
The main reason is that the checkpoint is executed in the mailbox. When the
{{CheckpointedFunction#snapshotState}} of the source throws an exception, the
StreamTask#cleanUpInvoke will be called, where it will wait for the end of the
{{LegacySourceFunctionThread}} of the source. However, the source thread does
not end by itself (this requires the user to control it), the {{Task}} will
hang at this time, and the JobMaster has no perception of this behavior.
{code:java}
protected void cleanUpInvoke() throws Exception {
getCompletionFuture().exceptionally(unused -> null).join(); //wait for the
end of the source
// clean up everything we initialized
isRunning = false;
...
}{code}
I think we should call the cancel method of the source first, and then wait for
the end.
> SourceStreamTask will always hang if the CheckpointedFunction#snapshotState
> throws an exception.
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-21990
> URL: https://issues.apache.org/jira/browse/FLINK-21990
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.11.0
> Reporter: ming li
> Priority: Major
>
> If the source in {{SourceStreamTask}} implements {{CheckpointedFunction}} and
> an exception is thrown in the snapshotState method, then the
> {{SourceStreamTask}} will always hang.
> The main reason is that the checkpoint is executed in the mailbox. When the
> {{CheckpointedFunction#snapshotState}} of the source throws an exception,
> the StreamTask#cleanUpInvoke will be called, where it will wait for the end
> of the {{LegacySourceFunctionThread}} of the source. However, the source
> thread does not end by itself (this requires the user to control it), the
> {{Task}} will hang at this time, and the JobMaster has no perception of this
> behavior.
> {code:java}
> protected void cleanUpInvoke() throws Exception {
> getCompletionFuture().exceptionally(unused -> null).join(); //wait for
> the end of the source
> // clean up everything we initialized
> isRunning = false;
> ...
> }{code}
> I think we should call the cancel method of the source first, and then wait
> for the end.
> The following is my test code, the test branch is Flink's master branch.
> {code:java}
> @Test
> public void testSourceFailure() throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(2000L);
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
> JobGraph jobGraph =
> StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
> try {
> // assert that the job only execute checkpoint once and only failed
> once.
> TestUtils.submitJobAndWaitForResult(
> cluster.getClusterClient(), jobGraph,
> getClass().getClassLoader());
> } catch (JobExecutionException jobException) {
> Optional<FlinkRuntimeException> throwable =
> ExceptionUtils.findThrowable(jobException,
> FlinkRuntimeException.class);
> Assert.assertTrue(throwable.isPresent());
> Assert.assertEquals(
>
> CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
> throwable.get().getMessage());
> }
> // assert that the job only failed once.
> Assert.assertEquals(1,
> StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
> }
> private static class FailedSource extends RichParallelSourceFunction<String>
> implements CheckpointedFunction {
> private transient boolean running;
> @Override
> public void open(Configuration parameters) throws Exception {
> running = true;
> }
> @Override
> public void run(SourceContext<String> ctx) throws Exception {
> while (running) {
> ctx.collect("test");
> }
> }
> @Override
> public void cancel() {
> running = false;
> }
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
> throw new RuntimeException("source failed");
> }
> @Override
> public void initializeState(FunctionInitializationContext context) throws
> Exception {}
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)