JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1205216052
I have tried to write the test case in `CheckpointFailureManagerITCase` like
this:
> @Test
public void testFinalizeCheckpointFailureTriggerJobFailed() throws
Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setRestartStrategy(RestartStrategies.noRestart());
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(
new JobManagerCheckpointStorage() {
private static final long serialVersionUID =
8134582566514272547L;
// Throw exception when finalizing the checkpoint.
@Override
public CheckpointStorageAccess
createCheckpointStorage(JobID jobId)
throws IOException {
return new
MemoryBackendCheckpointStorageAccess(jobId, null, null, 100) {
@Override
public CheckpointStorageLocation
initializeLocationForCheckpoint(
long checkpointId) throws IOException {
return new
NonPersistentMetadataCheckpointStorageLocation(1000) {
@Override
public CheckpointMetadataOutputStream
createMetadataOutputStream()
throws IOException {
throw new IOException("Artificial
Exception");
}
};
}
};
}
});
env.addSource(new StringGeneratingSourceFunction()).addSink(new
DiscardingSink<>());
JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
try {
// assert that the job only execute checkpoint once and only
failed once.
TestUtils.submitJobAndWaitForResult(
cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
} catch (JobExecutionException jobException) {
Optional<FlinkRuntimeException> throwable =
ExceptionUtils.findThrowable(jobException,
FlinkRuntimeException.class);
Assert.assertTrue(throwable.isPresent());
}
// assert that the job only failed once.
Assert.assertEquals(1,
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
}
However, when the checkpoint is executed, the methods in
`flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorage.java`
will be called, my implementation of `JobManagerCheckpointStorage` doesn't
work, This seems to be related to `CheckpointStorageLoader.load` in
`org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder#buildGraph`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]