This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 7ae6448377 [core] process-time TagTimeExtractor supports custom zone (#5927) 7ae6448377 is described below commit 7ae644837725181ff7e68a6e52c505fe7373d4cb Author: yuzelin <33053040+yuze...@users.noreply.github.com> AuthorDate: Tue Jul 22 16:30:50 2025 +0800 [core] process-time TagTimeExtractor supports custom zone (#5927) --- .../layouts/shortcodes/generated/core_configuration.html | 6 ++++++ .../src/main/java/org/apache/paimon/CoreOptions.java | 16 ++++++++++++++++ .../java/org/apache/paimon/tag/TagTimeExtractor.java | 12 ++++++++---- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f4276640e8..3d1bde2e94 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -971,6 +971,12 @@ This config option does not affect the default filesystem metastore.</td> <td><p>Enum</p></td> <td>Specify the order of sequence.field.<br /><br />Possible values:<ul><li>"ascending": specifies sequence.field sort order is ascending.</li><li>"descending": specifies sequence.field sort order is descending.</li></ul></td> </tr> + <tr> + <td><h5>sink.process-time-zone</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The time zone to parse the long process time to TIMESTAMP value. The default value is JVM's default time zone. If you want to specify a time zone, you should either set a full name such as 'America/Los_Angeles' or a custom zone id such as 'GMT-08:00'. This option currently is used for extract tag name.</td> + </tr> <tr> <td><h5>sink.watermark-time-zone</h5></td> <td style="word-wrap: break-word;">"UTC"</td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index d41fdcbfa9..e5eeb17dc7 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -43,6 +43,7 @@ import javax.annotation.Nullable; import java.io.Serializable; import java.lang.reflect.Field; import java.time.Duration; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1579,6 +1580,16 @@ public class CoreOptions implements Serializable { + " the value should be the user configured local time zone. The option value is either a full name" + " such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-08:00'."); + public static final ConfigOption<String> SINK_PROCESS_TIME_ZONE = + key("sink.process-time-zone") + .stringType() + .noDefaultValue() + .withDescription( + "The time zone to parse the long process time to TIMESTAMP value. The default value is JVM's " + + "default time zone. If you want to specify a time zone, you should either set a " + + "full name such as 'America/Los_Angeles' or a custom zone id such as 'GMT-08:00'. " + + "This option currently is used for extract tag name."); + public static final ConfigOption<MemorySize> LOCAL_MERGE_BUFFER_SIZE = key("local-merge-buffer-size") .memoryType() @@ -2730,6 +2741,11 @@ public class CoreOptions implements Serializable { return options.get(SINK_WATERMARK_TIME_ZONE); } + public ZoneId sinkProcessTimeZone() { + String zoneId = options.get(SINK_PROCESS_TIME_ZONE); + return zoneId == null ? ZoneId.systemDefault() : ZoneId.of(zoneId); + } + public boolean forceCreatingSnapshot() { return options.get(COMMIT_FORCE_CREATE_SNAPSHOT); } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java index 5b046e752f..b94c613251 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java @@ -36,12 +36,16 @@ public interface TagTimeExtractor { /** Extract time from snapshot time millis. */ class ProcessTimeExtractor implements TagTimeExtractor { + private final ZoneId processTimeZoneId; + + private ProcessTimeExtractor(ZoneId processTimeZoneId) { + this.processTimeZoneId = processTimeZoneId; + } + @Override public Optional<LocalDateTime> extract(long timeMilli, @Nullable Long watermark) { return Optional.of( - Instant.ofEpochMilli(timeMilli) - .atZone(ZoneId.systemDefault()) - .toLocalDateTime()); + Instant.ofEpochMilli(timeMilli).atZone(processTimeZoneId).toLocalDateTime()); } } @@ -82,7 +86,7 @@ public interface TagTimeExtractor { case BATCH: return null; case PROCESS_TIME: - return new ProcessTimeExtractor(); + return new ProcessTimeExtractor(options.sinkProcessTimeZone()); case WATERMARK: return new WatermarkExtractor(ZoneId.of(options.sinkWatermarkTimeZone())); default: