chestnut-c commented on code in PR #9552:
URL: https://github.com/apache/seatunnel/pull/9552#discussion_r2209357964
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java:
##########
@@ -177,4 +183,104 @@ public void testStreamJobWithCancel() throws
CheckpointStorageException, Interru
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(0, allCheckpoints.size());
}
+
+ @Test
+ public void testBatchJobResetCheckpointStorage()
+ throws CheckpointStorageException, NoSuchFieldException,
IllegalAccessException {
+ long jobId = System.currentTimeMillis();
+ CheckpointConfig checkpointConfig =
+
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+ // access checkpoint storage counter
+ AtomicInteger accessCnt = new AtomicInteger(0);
+ CheckpointStorage checkpointStorage =
+ new CheckpointStorage() {
+ @Override
+ public String storeCheckPoint(PipelineState pipelineState)
+ throws CheckpointStorageException {
+ accessCnt.incrementAndGet();
+ return "";
+ }
+
+ @Override
+ public void asyncStoreCheckPoint(PipelineState
pipelineState)
+ throws CheckpointStorageException {
+ accessCnt.incrementAndGet();
+ }
+
+ @Override
+ public List<PipelineState> getAllCheckpoints(String s)
+ throws CheckpointStorageException {
+ accessCnt.incrementAndGet();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<PipelineState> getLatestCheckpoint(String s)
+ throws CheckpointStorageException {
+ accessCnt.incrementAndGet();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public PipelineState
getLatestCheckpointByJobIdAndPipelineId(
+ String s, String s1) throws
CheckpointStorageException {
+ accessCnt.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public List<PipelineState>
getCheckpointsByJobIdAndPipelineId(
+ String s, String s1) throws
CheckpointStorageException {
+ accessCnt.incrementAndGet();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void deleteCheckpoint(String s) {
+ accessCnt.incrementAndGet();
+ }
+
+ @Override
+ public PipelineState getCheckpoint(String s, String s1,
String s2)
+ throws CheckpointStorageException {
+ accessCnt.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public void deleteCheckpoint(String s, String s1, String
s2)
+ throws CheckpointStorageException {
+ accessCnt.incrementAndGet();
+ }
+
+ @Override
+ public void deleteCheckpoint(String s, String s1,
List<String> list)
+ throws CheckpointStorageException {
+ accessCnt.incrementAndGet();
+ }
+ };
+
+ // replace the checkpoint storage reused by the system
+ CheckpointService checkpointService = server.getCheckpointService();
+ Field checkpointStorageField =
+
checkpointService.getClass().getDeclaredField("checkpointStorage");
+ checkpointStorageField.setAccessible(true);
+ checkpointStorageField.set(checkpointService, checkpointStorage);
Review Comment:
ok thx @Hisoka-X
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]