JesseAtSZ commented on PR #20091:
URL: https://github.com/apache/flink/pull/20091#issuecomment-1205216052

   I have tried to write the test case in `CheckpointFailureManagerITCase` like 
this:
   
   >     @Test
       public void testFinalizeCheckpointFailureTriggerJobFailed() throws 
Exception {
           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env.enableCheckpointing(500);
           env.setRestartStrategy(RestartStrategies.noRestart());
           CheckpointConfig checkpointConfig = env.getCheckpointConfig();
           checkpointConfig.setCheckpointStorage(
                   new JobManagerCheckpointStorage() {
                       private static final long serialVersionUID = 
8134582566514272547L;
   
                       // Throw exception when finalizing the checkpoint.
                       @Override
                       public CheckpointStorageAccess 
createCheckpointStorage(JobID jobId)
                               throws IOException {
                           return new 
MemoryBackendCheckpointStorageAccess(jobId, null, null, 100) {
                               @Override
                               public CheckpointStorageLocation 
initializeLocationForCheckpoint(
                                       long checkpointId) throws IOException {
                                   return new 
NonPersistentMetadataCheckpointStorageLocation(1000) {
                                       @Override
                                       public CheckpointMetadataOutputStream
                                               createMetadataOutputStream() 
throws IOException {
                                           throw new IOException("Artificial 
Exception");
                                       }
                                   };
                               }
                           };
                       }
                   });
           env.addSource(new StringGeneratingSourceFunction()).addSink(new 
DiscardingSink<>());
           JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
           try {
               // assert that the job only execute checkpoint once and only 
failed once.
               TestUtils.submitJobAndWaitForResult(
                       cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
           } catch (JobExecutionException jobException) {
               Optional<FlinkRuntimeException> throwable =
                       ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class);
               Assert.assertTrue(throwable.isPresent());
           }
           // assert that the job only failed once.
           Assert.assertEquals(1, 
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
       }
   
   However, when the checkpoint is executed, the methods in 
`flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorage.java`
 will be called, my implementation of `JobManagerCheckpointStorage` doesn't 
work, This seems to be related to `CheckpointStorageLoader.load` in 
`org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder#buildGraph`.
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to