This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0223348b6c [core] Fix TagAutoCreation.forceCreatingSnapshot to use 
SINK_PROCESS_TIME_ZONE (#7600)
0223348b6c is described below

commit 0223348b6c62ee6697ac56fe23afb5d3cfac80f9
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 7 11:43:43 2026 +0800

    [core] Fix TagAutoCreation.forceCreatingSnapshot to use 
SINK_PROCESS_TIME_ZONE (#7600)
    
    TagAutoCreation.forceCreatingSnapshot in the ProcessTimeExtractor branch
    uses LocalDateTime.now() (machine timezone) to determine whether to
    force creating a snapshot. When sink.process-time-zone is configured
    differently from the machine timezone (e.g. UTC on an Asia/Shanghai
    machine), the tag creation time is incorrect — it triggers at the
    machine's midnight instead of the configured timezone's midnight.
---
 .../org/apache/paimon/tag/TagAutoCreation.java     | 19 ++++++---
 .../org/apache/paimon/tag/TagAutoManagerTest.java  | 45 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 08dfe25378..5f72763c70 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -20,6 +20,7 @@ package org.apache.paimon.tag;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor;
@@ -60,6 +61,7 @@ public class TagAutoCreation {
     private final List<TagCallback> callbacks;
     private final Duration idlenessTimeout;
     private final boolean automaticCompletion;
+    private final ZoneId sinkProcessTimeZone;
 
     private LocalDateTime nextTag;
     private long nextSnapshot;
@@ -75,7 +77,8 @@ public class TagAutoCreation {
             @Nullable Duration defaultTimeRetained,
             Duration idlenessTimeout,
             boolean automaticCompletion,
-            List<TagCallback> callbacks) {
+            List<TagCallback> callbacks,
+            ZoneId sinkProcessTimeZone) {
         this.snapshotManager = snapshotManager;
         this.tagManager = tagManager;
         this.tagDeletion = tagDeletion;
@@ -87,6 +90,7 @@ public class TagAutoCreation {
         this.callbacks = callbacks;
         this.idlenessTimeout = idlenessTimeout;
         this.automaticCompletion = automaticCompletion;
+        this.sinkProcessTimeZone = sinkProcessTimeZone;
 
         this.periodHandler.validateDelay(delay);
 
@@ -123,13 +127,17 @@ public class TagAutoCreation {
 
             return isAfterOrEqual(LocalDateTime.now().minus(idlenessTimeout), 
snapshotTime);
         } else if (timeExtractor instanceof ProcessTimeExtractor) {
-            return nextTag == null
-                    || isAfterOrEqual(
-                            LocalDateTime.now().minus(delay), 
periodHandler.nextTagTime(nextTag));
+            return 
forceCreatingSnapshotProcessTime(LocalDateTime.now(sinkProcessTimeZone));
         }
         return false;
     }
 
+    @VisibleForTesting
+    boolean forceCreatingSnapshotProcessTime(LocalDateTime now) {
+        return nextTag == null
+                || isAfterOrEqual(now.minus(delay), 
periodHandler.nextTagTime(nextTag));
+    }
+
     public void run() {
         while (true) {
             if (snapshotManager.snapshotExists(nextSnapshot)) {
@@ -230,6 +238,7 @@ public class TagAutoCreation {
                 options.tagDefaultTimeRetained(),
                 options.snapshotWatermarkIdleTimeout(),
                 options.tagAutomaticCompletion(),
-                callbacks);
+                callbacks,
+                options.sinkProcessTimeZone());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index c8954f7b31..ef0e1627ee 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -36,6 +36,7 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.Collections;
 
+import static org.apache.paimon.CoreOptions.SINK_PROCESS_TIME_ZONE;
 import static org.apache.paimon.CoreOptions.SINK_WATERMARK_TIME_ZONE;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
@@ -519,6 +520,50 @@ public class TagAutoManagerTest extends 
PrimaryKeyTableTestBase {
         assertThat(tagManager.allTagNames()).containsOnly("20230718");
     }
 
+    @Test
+    public void testForceCreatingSnapshotProcessTime() throws Exception {
+        // sink.process-time-zone=UTC, machine timezone=Asia/Shanghai.
+        // Daily tag should trigger at UTC 00:00 (Shanghai 08:00), not 
Shanghai 00:00.
+
+        Options options = new Options();
+        options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.PROCESS_TIME);
+        options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
+        options.set(SINK_PROCESS_TIME_ZONE, "UTC");
+
+        FileStoreTable table = this.table.copy(options.toMap());
+
+        // Commit a snapshot to set nextTag
+        TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
+        commit.commit(new ManifestCommittable(0));
+        commit.close();
+
+        TagAutoCreation tagAutoCreation =
+                TagAutoCreation.create(
+                        table.coreOptions(),
+                        table.snapshotManager(),
+                        table.store().newTagManager(),
+                        table.store().newTagDeletion(),
+                        Collections.emptyList());
+
+        // threshold = tagTime + 2 days (nextTag + 1 period)
+        TagManager tagManager = table.store().newTagManager();
+        String createdTag = tagManager.allTagNames().get(0);
+        LocalDateTime tagTime = LocalDateTime.parse(createdTag + "T00:00:00");
+        LocalDateTime thresholdUtc = tagTime.plusDays(2);
+
+        // Shanghai 00:00 = UTC 16:00 previous day, before threshold -> false
+        LocalDateTime shanghaiMidnightAsUtc = thresholdUtc.minusHours(8);
+        
assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(shanghaiMidnightAsUtc))
+                .isFalse();
+
+        // UTC 00:00 = threshold -> true
+        
assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(thresholdUtc)).isTrue();
+
+        // After threshold -> true
+        
assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(thresholdUtc.plusHours(1)))
+                .isTrue();
+    }
+
     private long localZoneMills(String timestamp) {
         return LocalDateTime.parse(timestamp)
                 .atZone(ZoneId.systemDefault())

Reply via email to