This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ae6448377 [core] process-time TagTimeExtractor supports custom zone 
(#5927)
7ae6448377 is described below

commit 7ae644837725181ff7e68a6e52c505fe7373d4cb
Author: yuzelin <33053040+yuze...@users.noreply.github.com>
AuthorDate: Tue Jul 22 16:30:50 2025 +0800

    [core] process-time TagTimeExtractor supports custom zone (#5927)
---
 .../layouts/shortcodes/generated/core_configuration.html |  6 ++++++
 .../src/main/java/org/apache/paimon/CoreOptions.java     | 16 ++++++++++++++++
 .../java/org/apache/paimon/tag/TagTimeExtractor.java     | 12 ++++++++----
 3 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index f4276640e8..3d1bde2e94 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -971,6 +971,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td><p>Enum</p></td>
             <td>Specify the order of sequence.field.<br /><br />Possible 
values:<ul><li>"ascending": specifies sequence.field sort order is 
ascending.</li><li>"descending": specifies sequence.field sort order is 
descending.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>sink.process-time-zone</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The time zone to parse the long process time to TIMESTAMP 
value. The default value is JVM's default time zone. If you want to specify a 
time zone, you should either set a full name such as 'America/Los_Angeles' or a 
custom zone id such as 'GMT-08:00'. This option currently is used for extract 
tag name.</td>
+        </tr>
         <tr>
             <td><h5>sink.watermark-time-zone</h5></td>
             <td style="word-wrap: break-word;">"UTC"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index d41fdcbfa9..e5eeb17dc7 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.time.Duration;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -1579,6 +1580,16 @@ public class CoreOptions implements Serializable {
                                     + " the value should be the user 
configured local time zone. The option value is either a full name"
                                     + " such as 'America/Los_Angeles', or a 
custom timezone id such as 'GMT-08:00'.");
 
+    public static final ConfigOption<String> SINK_PROCESS_TIME_ZONE =
+            key("sink.process-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The time zone to parse the long process time to 
TIMESTAMP value. The default value is JVM's "
+                                    + "default time zone. If you want to 
specify a time zone, you should either set a "
+                                    + "full name such as 'America/Los_Angeles' 
or a custom zone id such as 'GMT-08:00'. "
+                                    + "This option currently is used for 
extract tag name.");
+
     public static final ConfigOption<MemorySize> LOCAL_MERGE_BUFFER_SIZE =
             key("local-merge-buffer-size")
                     .memoryType()
@@ -2730,6 +2741,11 @@ public class CoreOptions implements Serializable {
         return options.get(SINK_WATERMARK_TIME_ZONE);
     }
 
+    public ZoneId sinkProcessTimeZone() {
+        String zoneId = options.get(SINK_PROCESS_TIME_ZONE);
+        return zoneId == null ? ZoneId.systemDefault() : ZoneId.of(zoneId);
+    }
+
     public boolean forceCreatingSnapshot() {
         return options.get(COMMIT_FORCE_CREATE_SNAPSHOT);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
index 5b046e752f..b94c613251 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
@@ -36,12 +36,16 @@ public interface TagTimeExtractor {
     /** Extract time from snapshot time millis. */
     class ProcessTimeExtractor implements TagTimeExtractor {
 
+        private final ZoneId processTimeZoneId;
+
+        private ProcessTimeExtractor(ZoneId processTimeZoneId) {
+            this.processTimeZoneId = processTimeZoneId;
+        }
+
         @Override
         public Optional<LocalDateTime> extract(long timeMilli, @Nullable Long 
watermark) {
             return Optional.of(
-                    Instant.ofEpochMilli(timeMilli)
-                            .atZone(ZoneId.systemDefault())
-                            .toLocalDateTime());
+                    
Instant.ofEpochMilli(timeMilli).atZone(processTimeZoneId).toLocalDateTime());
         }
     }
 
@@ -82,7 +86,7 @@ public interface TagTimeExtractor {
             case BATCH:
                 return null;
             case PROCESS_TIME:
-                return new ProcessTimeExtractor();
+                return new ProcessTimeExtractor(options.sinkProcessTimeZone());
             case WATERMARK:
                 return new 
WatermarkExtractor(ZoneId.of(options.sinkWatermarkTimeZone()));
             default:

Reply via email to