Copilot commented on code in PR #9552:
URL: https://github.com/apache/seatunnel/pull/9552#discussion_r2227089174


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java:
##########
@@ -81,9 +78,9 @@ public class CheckpointManager {
 
     private final CheckpointStorage checkpointStorage;
 
-    private final JobMaster jobMaster;
+    private final CheckpointConfig checkpointConfig;
 

Review Comment:
   [nitpick] The field declaration order has been changed. Consider maintaining 
consistent field ordering by placing checkpointConfig field declaration near 
other config-related fields for better code organization.



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java:
##########
@@ -177,4 +159,97 @@ public void testStreamJobWithCancel() throws 
CheckpointStorageException, Interru
                 checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
         Assertions.assertEquals(0, allCheckpoints.size());
     }
+
+    @Test
+    public void testBatchJobResetCheckpointStorage() throws 
CheckpointStorageException {
+        long jobId = System.currentTimeMillis();
+        CheckpointConfig checkpointConfig =
+                
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+        
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+        // access checkpoint storage counter
+        AtomicInteger accessCounter = new AtomicInteger(0);
+        CheckpointStorage checkpointStorage =
+                new CheckpointStorage() {
+                    @Override
+                    public String storeCheckPoint(PipelineState pipelineState)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return "";
+                    }
+
+                    @Override
+                    public void asyncStoreCheckPoint(PipelineState 
pipelineState)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                    }
+
+                    @Override
+                    public List<PipelineState> getAllCheckpoints(String s)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    public List<PipelineState> getLatestCheckpoint(String s)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    public PipelineState 
getLatestCheckpointByJobIdAndPipelineId(
+                            String s, String s1) throws 
CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return null;
+                    }
+
+                    @Override
+                    public List<PipelineState> 
getCheckpointsByJobIdAndPipelineId(
+                            String s, String s1) throws 
CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    public void deleteCheckpoint(String s) {
+                        accessCounter.incrementAndGet();
+                    }
+
+                    @Override
+                    public PipelineState getCheckpoint(String s, String s1, 
String s2)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return null;
+                    }
+
+                    @Override
+                    public void deleteCheckpoint(String s, String s1, String 
s2)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                    }
+
+                    @Override
+                    public void deleteCheckpoint(String s, String s1, 
List<String> list)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                    }
+                };
+
+        // replace the checkpoint storage reused by the system
+        CheckpointService checkpointService = server.getCheckpointService();
+        ReflectionUtils.setField(checkpointService, "checkpointStorage", 
checkpointStorage);
+
+        startJob(jobId, BATCH_CONF_WITHOUT_CHECKPOINT_INTERVAL_PATH, false);
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.FINISHED));
+
+        checkpointStorage.getAllCheckpoints(String.valueOf(jobId));

Review Comment:
   This line appears to be testing side effects but the return value is 
ignored. Consider either asserting on the return value or adding a comment to 
clarify that this call is intentionally testing the access counter increment.
   ```suggestion
           List<PipelineState> checkpoints = 
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
           Assertions.assertTrue(checkpoints.isEmpty(), "Expected no 
checkpoints to be returned.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to