This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6e773c2ee [core] Automatic Creation for tags (#1588)
6e773c2ee is described below
commit 6e773c2ee5283d68a8d68bacf529d1e06fcc011b
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 19 11:38:15 2023 +0800
[core] Automatic Creation for tags (#1588)
---
docs/content/maintenance/manage-tags.md | 25 ++
.../shortcodes/generated/core_configuration.html | 30 ++
.../main/java/org/apache/paimon/CoreOptions.java | 113 +++++++
.../java/org/apache/paimon/AbstractFileStore.java | 16 +-
.../src/main/java/org/apache/paimon/FileStore.java | 7 +
.../paimon/table/AbstractFileStoreTable.java | 1 +
.../apache/paimon/table/sink/TableCommitImpl.java | 8 +
.../org/apache/paimon/tag/TagAutoCreation.java | 339 +++++++++++++++++++++
.../org/apache/paimon/tag/TagAutoCreationTest.java | 174 +++++++++++
9 files changed, 712 insertions(+), 1 deletion(-)
diff --git a/docs/content/maintenance/manage-tags.md
b/docs/content/maintenance/manage-tags.md
index f613b8984..c74ff0724 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -33,6 +33,31 @@ data files, and the historical data of expired snapshots
cannot be queried anymo
To solve this problem, you can create a tag based on a snapshot. The tag will
maintain the manifests and data files of the
snapshot. A typical usage is creating tags daily, then you can maintain the
historical data of each day for batch reading.
+## Automatic Creation
+
+Paimon supports automatic creation of tags in writing job.
+
+**Step 1: Choose Creation Mode**
+
+You can set `'tag.automatic-creation'` to `process-time` or `watermark`:
+- `process-time`: Create TAG based on the time of the machine.
+- `watermark`: Create TAG based on the watermark of the Sink input.
+
+{{< hint info >}}
+If you choose Watermark, you may need to specify the time zone of watermark,
if watermark is not in the
+UTC time zone, please configure `'sink.watermark-time-zone'`.
+{{< /hint >}}
+
+**Step 2: Choose Creation Period**
+
+What frequency is used to generate tags. You can choose `'daily'`, `'hourly'`
and `'two-hours'` for `'tag.creation-period'`.
+
+If you need to wait for late data, you can configure a delay time:
`'tag.creation-delay'`.
+
+**Step 3: Automatic deletion of tags**
+
+You can configure `'tag.num-retained-max'` to delete tags automatically.
+
## Create Tags
You can create a tag with given name (cannot be number) and snapshot ID.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index f446cd108..36dba2b52 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -424,6 +424,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>String</td>
<td>The field that generates the sequence number for primary key
table, the sequence number determines which data is the most recent.</td>
</tr>
+ <tr>
+ <td><h5>sink.watermark-time-zone</h5></td>
+ <td style="word-wrap: break-word;">"UTC"</td>
+ <td>String</td>
+ <td>The time zone to parse the long watermark value to TIMESTAMP
value. The default value is 'UTC', which means the watermark is defined on
TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ
column, the time zone of watermark is user configured time zone, the value
should be the user configured local time zone. The option value is either a
full name such as 'America/Los_Angeles', or a custom timezone id such as
'GMT-08:00'.</td>
+ </tr>
<tr>
<td><h5>snapshot.num-retained.max</h5></td>
<td style="word-wrap: break-word;">2147483647</td>
@@ -484,6 +490,30 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Boolean</td>
<td>Whether to read the changes from overwrite in streaming
mode.</td>
</tr>
+ <tr>
+ <td><h5>tag.automatic-creation</h5></td>
+ <td style="word-wrap: break-word;">none</td>
+ <td><p>Enum</p></td>
+ <td>Whether to create tag automatically. And how to generate
tags.<br /><br />Possible values:<ul><li>"none": No automatically created
tags.</li><li>"process-time": Based on the time of the machine, create TAG once
the processing time passes period time plus delay.</li><li>"watermark": Based
on the watermark of the input, create TAG once the watermark passes period time
plus delay.</li></ul></td>
+ </tr>
+ <tr>
+ <td><h5>tag.creation-delay</h5></td>
+ <td style="word-wrap: break-word;">0 ms</td>
+ <td>Duration</td>
+ <td>How long is the delay after the period ends before creating a
tag. This can allow some late data to enter the Tag.</td>
+ </tr>
+ <tr>
+ <td><h5>tag.creation-period</h5></td>
+ <td style="word-wrap: break-word;">daily</td>
+ <td><p>Enum</p></td>
+ <td>What frequency is used to generate tags.<br /><br />Possible
values:<ul><li>"daily": Generate a tag every day.</li><li>"hourly": Generate a
tag every hour.</li><li>"two-hours": Generate a tag every two
hours.</li></ul></td>
+ </tr>
+ <tr>
+ <td><h5>tag.num-retained-max</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>The maximum number of tags to retain.</td>
+ </tr>
<tr>
<td><h5>target-file-size</h5></td>
<td style="word-wrap: break-word;">128 mb</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 8fe0dbb65..3552bc21d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -50,6 +50,7 @@ import static
org.apache.paimon.options.description.TextElement.text;
/** Core options for paimon. */
public class CoreOptions implements Serializable {
+
public static final String DEFAULT_VALUE_SUFFIX = "default-value";
public static final String FIELDS_PREFIX = "fields";
@@ -768,6 +769,44 @@ public class CoreOptions implements Serializable {
+ "you need to create this table as a
partitioned table in Hive metastore.\n"
+ "This config option does not affect the
default filesystem metastore.");
+ public static final ConfigOption<TagCreationMode> TAG_AUTOMATIC_CREATION =
+ key("tag.automatic-creation")
+ .enumType(TagCreationMode.class)
+ .defaultValue(TagCreationMode.NONE)
+ .withDescription(
+ "Whether to create tag automatically. And how to
generate tags.");
+
+ public static final ConfigOption<TagCreationPeriod> TAG_CREATION_PERIOD =
+ key("tag.creation-period")
+ .enumType(TagCreationPeriod.class)
+ .defaultValue(TagCreationPeriod.DAILY)
+ .withDescription("What frequency is used to generate
tags.");
+
+ public static final ConfigOption<Duration> TAG_CREATION_DELAY =
+ key("tag.creation-delay")
+ .durationType()
+ .defaultValue(Duration.ofMillis(0))
+ .withDescription(
+ "How long is the delay after the period ends
before creating a tag."
+ + " This can allow some late data to enter
the Tag.");
+
+ public static final ConfigOption<Integer> TAG_NUM_RETAINED_MAX =
+ key("tag.num-retained-max")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The maximum number of tags to retain.");
+
+ public static final ConfigOption<String> SINK_WATERMARK_TIME_ZONE =
+ key("sink.watermark-time-zone")
+ .stringType()
+ .defaultValue("UTC")
+ .withDescription(
+ "The time zone to parse the long watermark value
to TIMESTAMP value."
+ + " The default value is 'UTC', which
means the watermark is defined on TIMESTAMP column or not defined."
+ + " If the watermark is defined on
TIMESTAMP_LTZ column, the time zone of watermark is user configured time zone,"
+ + " the value should be the user
configured local time zone. The option value is either a full name"
+ + " such as 'America/Los_Angeles', or a
custom timezone id such as 'GMT-08:00'.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -1104,6 +1143,26 @@ public class CoreOptions implements Serializable {
return options.get(METASTORE_PARTITIONED_TABLE);
}
+ public TagCreationMode tagCreationMode() {
+ return options.get(TAG_AUTOMATIC_CREATION);
+ }
+
+ public TagCreationPeriod tagCreationPeriod() {
+ return options.get(TAG_CREATION_PERIOD);
+ }
+
+ public Duration tagCreationDelay() {
+ return options.get(TAG_CREATION_DELAY);
+ }
+
+ public Integer tagNumRetainedMax() {
+ return options.get(TAG_NUM_RETAINED_MAX);
+ }
+
+ public String sinkWatermarkTimeZone() {
+ return options.get(SINK_WATERMARK_TIME_ZONE);
+ }
+
public Map<String, String> getFieldDefaultValues() {
Map<String, String> defaultValues = new HashMap<>();
String fieldPrefix = FIELDS_PREFIX + ".";
@@ -1566,4 +1625,58 @@ public class CoreOptions implements Serializable {
return text(description);
}
}
+
+ /** The mode for tag creation. */
+ public enum TagCreationMode implements DescribedEnum {
+ NONE("none", "No automatically created tags."),
+ PROCESS_TIME(
+ "process-time",
+ "Based on the time of the machine, create TAG once the
processing time passes period time plus delay."),
+ WATERMARK(
+ "watermark",
+ "Based on the watermark of the input, create TAG once the
watermark passes period time plus delay.");
+
+ private final String value;
+ private final String description;
+
+ TagCreationMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
+ /** The period for tag creation. */
+ public enum TagCreationPeriod implements DescribedEnum {
+ DAILY("daily", "Generate a tag every day."),
+ HOURLY("hourly", "Generate a tag every hour."),
+ TWO_HOURS("two-hours", "Generate a tag every two hours.");
+
+ private final String value;
+ private final String description;
+
+ TagCreationPeriod(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 8fd6ad216..b31dfebb4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -33,6 +33,7 @@ import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
@@ -170,7 +171,7 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
options.snapshotTimeRetain().toMillis(),
snapshotManager(),
newSnapshotDeletion(),
- new TagManager(fileIO, options.path()));
+ newTagManager());
}
@Override
@@ -183,6 +184,11 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
newIndexFileHandler());
}
+ @Override
+ public TagManager newTagManager() {
+ return new TagManager(fileIO, options.path());
+ }
+
@Override
public TagDeletion newTagDeletion() {
return new TagDeletion(
@@ -196,6 +202,7 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
public abstract Comparator<InternalRow> newKeyComparator();
@Override
+ @Nullable
public PartitionExpire newPartitionExpire(String commitUser) {
Duration partitionExpireTime = options.partitionExpireTime();
if (partitionExpireTime == null || partitionType().getFieldCount() ==
0) {
@@ -211,4 +218,11 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
newScan(),
newCommit(commitUser));
}
+
+ @Override
+ @Nullable
+ public TagAutoCreation newTagCreationManager() {
+ return TagAutoCreation.create(
+ options, snapshotManager(), newTagManager(), newTagDeletion());
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 6d91d8485..918e665bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -29,8 +29,10 @@ import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
import javax.annotation.Nullable;
@@ -67,8 +69,13 @@ public interface FileStore<T> extends Serializable {
SnapshotDeletion newSnapshotDeletion();
+ TagManager newTagManager();
+
TagDeletion newTagDeletion();
@Nullable
PartitionExpire newPartitionExpire(String commitUser);
+
+ @Nullable
+ TagAutoCreation newTagCreationManager();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 35bb0049a..4d9ba1115 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -258,6 +258,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
createCommitCallbacks(),
coreOptions().writeOnly() ? null : store().newExpire(),
coreOptions().writeOnly() ? null :
store().newPartitionExpire(commitUser),
+ coreOptions().writeOnly() ? null :
store().newTagCreationManager(),
lockFactory.create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 669359feb..e5ba0eba1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -24,6 +24,7 @@ import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.IOUtils;
import javax.annotation.Nullable;
@@ -50,6 +51,7 @@ public class TableCommitImpl implements InnerTableCommit {
private final List<CommitCallback> commitCallbacks;
@Nullable private final FileStoreExpire expire;
@Nullable private final PartitionExpire partitionExpire;
+ @Nullable private final TagAutoCreation tagAutoCreation;
private final Lock lock;
@Nullable private final Duration consumerExpireTime;
@@ -64,6 +66,7 @@ public class TableCommitImpl implements InnerTableCommit {
List<CommitCallback> commitCallbacks,
@Nullable FileStoreExpire expire,
@Nullable PartitionExpire partitionExpire,
+ @Nullable TagAutoCreation tagAutoCreation,
Lock lock,
@Nullable Duration consumerExpireTime,
ConsumerManager consumerManager) {
@@ -79,6 +82,7 @@ public class TableCommitImpl implements InnerTableCommit {
this.commitCallbacks = commitCallbacks;
this.expire = expire;
this.partitionExpire = partitionExpire;
+ this.tagAutoCreation = tagAutoCreation;
this.lock = lock;
this.consumerExpireTime = consumerExpireTime;
@@ -204,6 +208,10 @@ public class TableCommitImpl implements InnerTableCommit {
if (partitionExpire != null) {
partitionExpire.expire(partitionExpireIdentifier);
}
+
+ if (tagAutoCreation != null) {
+ tagAutoCreation.run();
+ }
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
new file mode 100644
index 000000000..4900d0c8f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.ResolverStyle;
+import java.time.format.SignStyle;
+import java.util.Optional;
+import java.util.SortedMap;
+
+import static java.time.temporal.ChronoField.DAY_OF_MONTH;
+import static java.time.temporal.ChronoField.HOUR_OF_DAY;
+import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
+import static java.time.temporal.ChronoField.YEAR;
+import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
+import static
org.apache.paimon.shade.guava30.com.google.common.base.MoreObjects.firstNonNull;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A manager to create tags automatically. */
+public class TagAutoCreation {
+
+ private static final DateTimeFormatter HOUR_FORMATTER =
+ new DateTimeFormatterBuilder()
+ .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+ .appendLiteral('-')
+ .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+ .appendLiteral('-')
+ .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+ .appendLiteral(" ")
+ .appendValue(HOUR_OF_DAY, 2, 2, SignStyle.NORMAL)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.LENIENT);
+
+ private static final DateTimeFormatter DAY_FORMATTER =
+ new DateTimeFormatterBuilder()
+ .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+ .appendLiteral('-')
+ .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+ .appendLiteral('-')
+ .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.LENIENT);
+
+ private final SnapshotManager snapshotManager;
+ private final TagManager tagManager;
+ private final TagDeletion tagDeletion;
+ private final TimeExtractor timeExtractor;
+ private final TagPeriodHandler periodHandler;
+ private final Duration delay;
+ private final Integer numRetainedMax;
+
+ private LocalDateTime nextTag;
+ private long nextSnapshot;
+
+ private TagAutoCreation(
+ SnapshotManager snapshotManager,
+ TagManager tagManager,
+ TagDeletion tagDeletion,
+ TimeExtractor timeExtractor,
+ TagPeriodHandler periodHandler,
+ Duration delay,
+ Integer numRetainedMax) {
+ this.snapshotManager = snapshotManager;
+ this.tagManager = tagManager;
+ this.tagDeletion = tagDeletion;
+ this.timeExtractor = timeExtractor;
+ this.periodHandler = periodHandler;
+ this.delay = delay;
+ this.numRetainedMax = numRetainedMax;
+
+ this.periodHandler.validateDelay(delay);
+
+ SortedMap<Snapshot, String> tags = tagManager.tags();
+
+ if (tags.isEmpty()) {
+ this.nextSnapshot =
+ firstNonNull(snapshotManager.earliestSnapshotId(),
FIRST_SNAPSHOT_ID);
+ } else {
+ Snapshot lastTag = tags.lastKey();
+ this.nextSnapshot = lastTag.id() + 1;
+
+ LocalDateTime time = periodHandler.tagToTime(tags.get(lastTag));
+ this.nextTag = periodHandler.nextTagTime(time);
+ }
+ }
+
+ public void run() {
+ while (true) {
+ if (snapshotManager.snapshotExists(nextSnapshot)) {
+ tryToTag(snapshotManager.snapshot(nextSnapshot));
+ nextSnapshot++;
+ } else {
+ break;
+ }
+ }
+ }
+
+ private void tryToTag(Snapshot snapshot) {
+ Optional<LocalDateTime> timeOptional = timeExtractor.extract(snapshot);
+ if (!timeOptional.isPresent()) {
+ return;
+ }
+
+ LocalDateTime time = timeOptional.get();
+ if (nextTag == null
+ || isAfterOrEqual(time.minus(delay),
periodHandler.nextTagTime(nextTag))) {
+ LocalDateTime thisTag = periodHandler.normalizeToTagTime(time);
+ String tagName = periodHandler.timeToTag(thisTag);
+ tagManager.createTag(snapshot, tagName);
+ nextTag = periodHandler.nextTagTime(thisTag);
+
+ if (numRetainedMax != null) {
+ SortedMap<Snapshot, String> tags = tagManager.tags();
+ if (tags.size() > numRetainedMax) {
+ int toDelete = tags.size() - numRetainedMax;
+ int i = 0;
+ for (String tag : tags.values()) {
+ tagManager.deleteTag(tag, tagDeletion,
snapshotManager);
+ i++;
+ if (i == toDelete) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isAfterOrEqual(LocalDateTime t1, LocalDateTime t2) {
+ return t1.isAfter(t2) || t1.isEqual(t2);
+ }
+
+ private interface TimeExtractor {
+
+ Optional<LocalDateTime> extract(Snapshot snapshot);
+ }
+
+ private static class ProcessTimeExtractor implements TimeExtractor {
+
+ @Override
+ public Optional<LocalDateTime> extract(Snapshot snapshot) {
+ return Optional.of(
+ Instant.ofEpochMilli(snapshot.timeMillis())
+ .atZone(ZoneId.systemDefault())
+ .toLocalDateTime());
+ }
+ }
+
+ private static class WatermarkExtractor implements TimeExtractor {
+
+ private final ZoneId watermarkZoneId;
+
+ private WatermarkExtractor(ZoneId watermarkZoneId) {
+ this.watermarkZoneId = watermarkZoneId;
+ }
+
+ @Override
+ public Optional<LocalDateTime> extract(Snapshot snapshot) {
+ Long watermark = snapshot.watermark();
+ if (watermark == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+
Instant.ofEpochMilli(watermark).atZone(watermarkZoneId).toLocalDateTime());
+ }
+ }
+
+ private interface TagPeriodHandler {
+
+ void validateDelay(Duration delay);
+
+ LocalDateTime tagToTime(String tag);
+
+ LocalDateTime normalizeToTagTime(LocalDateTime time);
+
+ String timeToTag(LocalDateTime time);
+
+ LocalDateTime nextTagTime(LocalDateTime time);
+ }
+
+ private abstract static class BaseTagPeriodHandler implements
TagPeriodHandler {
+
+ protected abstract Duration onePeriod();
+
+ protected abstract DateTimeFormatter formatter();
+
+ @Override
+ public void validateDelay(Duration delay) {
+ checkArgument(onePeriod().compareTo(delay) > 0);
+ }
+
+ @Override
+ public LocalDateTime tagToTime(String tag) {
+ return LocalDateTime.parse(tag, formatter());
+ }
+
+ @Override
+ public LocalDateTime normalizeToTagTime(LocalDateTime time) {
+ long mills = Timestamp.fromLocalDateTime(time).getMillisecond();
+ long periodMills = onePeriod().toMillis();
+ LocalDateTime normalized =
+ Timestamp.fromEpochMillis((mills / periodMills) *
periodMills)
+ .toLocalDateTime();
+ return normalized.minus(onePeriod());
+ }
+
+ @Override
+ public String timeToTag(LocalDateTime time) {
+ return time.format(formatter());
+ }
+
+ @Override
+ public LocalDateTime nextTagTime(LocalDateTime time) {
+ return time.plus(onePeriod());
+ }
+ }
+
+ private static class HourlyTagPeriodHandler extends BaseTagPeriodHandler {
+
+ private static final Duration ONE_PERIOD = Duration.ofHours(1);
+
+ @Override
+ protected Duration onePeriod() {
+ return ONE_PERIOD;
+ }
+
+ @Override
+ protected DateTimeFormatter formatter() {
+ return HOUR_FORMATTER;
+ }
+ }
+
+ private static class DailyTagPeriodHandler extends BaseTagPeriodHandler {
+
+ private static final Duration ONE_PERIOD = Duration.ofDays(1);
+
+ @Override
+ protected Duration onePeriod() {
+ return ONE_PERIOD;
+ }
+
+ @Override
+ protected DateTimeFormatter formatter() {
+ return DAY_FORMATTER;
+ }
+ }
+
+ private static class TwoHoursTagPeriodHandler extends BaseTagPeriodHandler
{
+
+ private static final Duration ONE_PERIOD = Duration.ofHours(2);
+
+ @Override
+ protected Duration onePeriod() {
+ return ONE_PERIOD;
+ }
+
+ @Override
+ protected DateTimeFormatter formatter() {
+ return HOUR_FORMATTER;
+ }
+ }
+
+ @Nullable
+ public static TagAutoCreation create(
+ CoreOptions options,
+ SnapshotManager snapshotManager,
+ TagManager tagManager,
+ TagDeletion tagDeletion) {
+ TimeExtractor timeExtractor;
+ switch (options.tagCreationMode()) {
+ case NONE:
+ return null;
+ case PROCESS_TIME:
+ timeExtractor = new ProcessTimeExtractor();
+ break;
+ case WATERMARK:
+ timeExtractor = new
WatermarkExtractor(ZoneId.of(options.sinkWatermarkTimeZone()));
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported " +
options.tagCreationMode());
+ }
+
+ TagPeriodHandler periodHandler;
+ switch (options.tagCreationPeriod()) {
+ case DAILY:
+ periodHandler = new DailyTagPeriodHandler();
+ break;
+ case HOURLY:
+ periodHandler = new HourlyTagPeriodHandler();
+ break;
+ case TWO_HOURS:
+ periodHandler = new TwoHoursTagPeriodHandler();
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported " + options.tagCreationPeriod());
+ }
+
+ return new TagAutoCreation(
+ snapshotManager,
+ tagManager,
+ tagDeletion,
+ timeExtractor,
+ periodHandler,
+ options.tagCreationDelay(),
+ options.tagNumRetainedMax());
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
new file mode 100644
index 000000000..6c0ab0426
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions.TagCreationMode;
+import org.apache.paimon.CoreOptions.TagCreationPeriod;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.utils.TagManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+
+import static org.apache.paimon.CoreOptions.SINK_WATERMARK_TIME_ZONE;
+import static org.apache.paimon.CoreOptions.TAG_AUTOMATIC_CREATION;
+import static org.apache.paimon.CoreOptions.TAG_CREATION_DELAY;
+import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD;
+import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for tag automatic creation. */
+public class TagAutoCreationTest extends PrimaryKeyTableTestBase {
+
+ @Test
+ public void testTag() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+ options.set(TAG_NUM_RETAINED_MAX, 3);
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ // test normal creation
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+
+ // test not creation
+ commit.commit(new ManifestCommittable(1,
utcMills("2023-07-18T12:59:00")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+
+ // test just in time
+ commit.commit(new ManifestCommittable(2,
utcMills("2023-07-18T13:00:00")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11",
"2023-07-18 12");
+
+ // test expire old tag
+ commit.commit(new ManifestCommittable(3,
utcMills("2023-07-18T14:00:00")));
+ commit.commit(new ManifestCommittable(4,
utcMills("2023-07-18T15:00:00")));
+ assertThat(tagManager.tags().values())
+ .containsOnly("2023-07-18 12", "2023-07-18 13", "2023-07-18
14");
+
+ // test restore
+ commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
+ commit.commit(new ManifestCommittable(5,
utcMills("2023-07-18T16:00:00")));
+ assertThat(tagManager.tags().values())
+ .containsOnly("2023-07-18 13", "2023-07-18 14", "2023-07-18
15");
+ }
+
+ @Test
+ public void testTagDelay() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+ options.set(TAG_CREATION_DELAY, Duration.ofSeconds(10));
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ // test first create tag anyway
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:00:09")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+
+ // test not create due to delay
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T13:00:09")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+
+ // test create
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T13:00:10")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11",
"2023-07-18 12");
+ }
+
+ @Test
+ public void testTagSinkWatermark() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+ options.set(SINK_WATERMARK_TIME_ZONE,
ZoneId.systemDefault().toString());
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ // test first create
+ commit.commit(new ManifestCommittable(0,
localZoneMills("2023-07-18T12:00:09")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");
+
+ // test second create
+ commit.commit(new ManifestCommittable(0,
localZoneMills("2023-07-18T13:00:10")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11",
"2023-07-18 12");
+ }
+
+ @Test
+ public void testTagTwoHour() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.TWO_HOURS);
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ // test first create
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:00:01")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 10");
+
+ // test no create
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T13:00:01")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 10");
+
+ // test second create
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T14:00:09")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-18 10",
"2023-07-18 12");
+ }
+
+ @Test
+ public void testTagDaily() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ // test first create
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:00:01")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-17");
+
+ // test second create
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-19T12:00:01")));
+ assertThat(tagManager.tags().values()).containsOnly("2023-07-17",
"2023-07-18");
+ }
+
+ private long utcMills(String timestamp) {
+ return
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
+ }
+
+ private long localZoneMills(String timestamp) {
+ return LocalDateTime.parse(timestamp)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ }
+}