This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 867f126d3a [Improve][Zeta] Add log for tryTriggerPendingCheckpoint 
because the wrong time of server (#7717)
867f126d3a is described below

commit 867f126d3a31fa1a9211a7df31059581c7ee3c5c
Author: dailai <[email protected]>
AuthorDate: Mon Sep 23 14:09:09 2024 +0800

    [Improve][Zeta] Add log for tryTriggerPendingCheckpoint because the wrong 
time of server (#7717)
---
 .../engine/server/checkpoint/CheckpointCoordinator.java        | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 515f3ad871..d2a0b7270d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -488,8 +488,14 @@ public class CheckpointCoordinator {
         }
         final long currentTimestamp = Instant.now().toEpochMilli();
         if (checkpointType.notFinalCheckpoint() && 
checkpointType.notSchemaChangeCheckpoint()) {
-            if (currentTimestamp - latestTriggerTimestamp.get()
-                            < coordinatorConfig.getCheckpointInterval()
+            long diffFromLastTimestamp = currentTimestamp - 
latestTriggerTimestamp.get();
+            if (diffFromLastTimestamp <= 0) {
+                LOG.error(
+                        "The time on your server may not be incremental which 
can lead checkpoint to stop. The latestTriggerTimestamp: ({}), but the 
currentTimestamp: ({})",
+                        latestTriggerTimestamp.get(),
+                        currentTimestamp);
+            }
+            if (diffFromLastTimestamp < 
coordinatorConfig.getCheckpointInterval()
                     || !isAllTaskReady) {
                 return;
             }

Reply via email to