chestnut-c commented on code in PR #9552:
URL: https://github.com/apache/seatunnel/pull/9552#discussion_r2227542538
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java:
##########
@@ -177,4 +159,97 @@ public void testStreamJobWithCancel() throws
CheckpointStorageException, Interru
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(0, allCheckpoints.size());
}
+
+ @Test
+ public void testBatchJobResetCheckpointStorage() throws
CheckpointStorageException {
+ long jobId = System.currentTimeMillis();
+ CheckpointConfig checkpointConfig =
+
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+ // access checkpoint storage counter
+ AtomicInteger accessCounter = new AtomicInteger(0);
+ CheckpointStorage checkpointStorage =
+ new CheckpointStorage() {
+ @Override
+ public String storeCheckPoint(PipelineState pipelineState)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return "";
+ }
+
+ @Override
+ public void asyncStoreCheckPoint(PipelineState
pipelineState)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ }
+
+ @Override
+ public List<PipelineState> getAllCheckpoints(String s)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<PipelineState> getLatestCheckpoint(String s)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public PipelineState
getLatestCheckpointByJobIdAndPipelineId(
+ String s, String s1) throws
CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public List<PipelineState>
getCheckpointsByJobIdAndPipelineId(
+ String s, String s1) throws
CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void deleteCheckpoint(String s) {
+ accessCounter.incrementAndGet();
+ }
+
+ @Override
+ public PipelineState getCheckpoint(String s, String s1,
String s2)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public void deleteCheckpoint(String s, String s1, String
s2)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ }
+
+ @Override
+ public void deleteCheckpoint(String s, String s1,
List<String> list)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ }
+ };
+
+ // replace the checkpoint storage reused by the system
+ CheckpointService checkpointService = server.getCheckpointService();
+ ReflectionUtils.setField(checkpointService, "checkpointStorage",
checkpointStorage);
+
+ startJob(jobId, BATCH_CONF_WITHOUT_CHECKPOINT_INTERVAL_PATH, false);
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.FINISHED));
+
+ checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Review Comment:
The method is currently called to count the number of times
checkpointStorage is accessed. It has nothing to do with the returned result,
but it still needs to be called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]