chestnut-c commented on code in PR #9552:
URL: https://github.com/apache/seatunnel/pull/9552#discussion_r2227542538


##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java:
##########
@@ -177,4 +159,97 @@ public void testStreamJobWithCancel() throws 
CheckpointStorageException, Interru
                 checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
         Assertions.assertEquals(0, allCheckpoints.size());
     }
+
+    @Test
+    public void testBatchJobResetCheckpointStorage() throws 
CheckpointStorageException {
+        long jobId = System.currentTimeMillis();
+        CheckpointConfig checkpointConfig =
+                
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+        
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+        // access checkpoint storage counter
+        AtomicInteger accessCounter = new AtomicInteger(0);
+        CheckpointStorage checkpointStorage =
+                new CheckpointStorage() {
+                    @Override
+                    public String storeCheckPoint(PipelineState pipelineState)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return "";
+                    }
+
+                    @Override
+                    public void asyncStoreCheckPoint(PipelineState 
pipelineState)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                    }
+
+                    @Override
+                    public List<PipelineState> getAllCheckpoints(String s)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    public List<PipelineState> getLatestCheckpoint(String s)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    public PipelineState 
getLatestCheckpointByJobIdAndPipelineId(
+                            String s, String s1) throws 
CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return null;
+                    }
+
+                    @Override
+                    public List<PipelineState> 
getCheckpointsByJobIdAndPipelineId(
+                            String s, String s1) throws 
CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    public void deleteCheckpoint(String s) {
+                        accessCounter.incrementAndGet();
+                    }
+
+                    @Override
+                    public PipelineState getCheckpoint(String s, String s1, 
String s2)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return null;
+                    }
+
+                    @Override
+                    public void deleteCheckpoint(String s, String s1, String 
s2)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                    }
+
+                    @Override
+                    public void deleteCheckpoint(String s, String s1, 
List<String> list)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                    }
+                };
+
+        // replace the checkpoint storage reused by the system
+        CheckpointService checkpointService = server.getCheckpointService();
+        ReflectionUtils.setField(checkpointService, "checkpointStorage", 
checkpointStorage);
+
+        startJob(jobId, BATCH_CONF_WITHOUT_CHECKPOINT_INTERVAL_PATH, false);
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.FINISHED));
+
+        checkpointStorage.getAllCheckpoints(String.valueOf(jobId));

Review Comment:
   The method is currently called to count the number of times 
checkpointStorage is accessed. It has nothing to do with the returned result, 
but it still needs to be called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to