This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1336275a3 [core] Support tag date formatter (#2566)
1336275a3 is described below
commit 1336275a318e56a5df4c5f87fe310f45df941a4e
Author: monster <[email protected]>
AuthorDate: Tue Jan 16 21:28:18 2024 +0800
[core] Support tag date formatter (#2566)
---
.../shortcodes/generated/core_configuration.html | 6 +++
.../main/java/org/apache/paimon/CoreOptions.java | 34 ++++++++++++++
.../org/apache/paimon/tag/TagPeriodHandler.java | 52 ++++++++++++++++++++--
.../java/org/apache/paimon/utils/TagManager.java | 4 --
.../paimon/table/FileStoreTableTestBase.java | 6 ---
.../org/apache/paimon/tag/TagAutoCreationTest.java | 36 +++++++++++++++
6 files changed, 124 insertions(+), 14 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 946681796..35e23e34c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -605,6 +605,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td><p>Enum</p></td>
<td>Whether to create tag automatically. And how to generate
tags.<br /><br />Possible values:<ul><li>"none": No automatically created
tags.</li><li>"process-time": Based on the time of the machine, create TAG once
the processing time passes period time plus delay.</li><li>"watermark": Based
on the watermark of the input, create TAG once the watermark passes period time
plus delay.</li><li>"batch": In the batch processing scenario, the tag
corresponding to the current snapsho [...]
</tr>
+ <tr>
+ <td><h5>tag.period-formatter</h5></td>
+ <td style="word-wrap: break-word;">with_dashes</td>
+ <td><p>Enum</p></td>
+ <td>The date format for tag periods.<br /><br />Possible
values:<ul><li>"with_dashes": Dates and hours with dashes, e.g., 'yyyy-MM-dd
HH'</li><li>"without_dashes": Dates and hours without dashes, e.g., 'yyyyMMdd
HH'</li></ul></td>
+ </tr>
<tr>
<td><h5>tag.callback.#.param</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 8337db566..5d9e00a57 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -951,6 +951,12 @@ public class CoreOptions implements Serializable {
"How long is the delay after the period ends
before creating a tag."
+ " This can allow some late data to enter
the Tag.");
+ public static final ConfigOption<TagPeriodFormatter> TAG_PERIOD_FORMATTER =
+ key("tag.period-formatter")
+ .enumType(TagPeriodFormatter.class)
+ .defaultValue(TagPeriodFormatter.WITH_DASHES)
+ .withDescription("The date format for tag periods.");
+
public static final ConfigOption<Integer> TAG_NUM_RETAINED_MAX =
key("tag.num-retained-max")
.intType()
@@ -1489,6 +1495,10 @@ public class CoreOptions implements Serializable {
return options.get(TAG_CREATION_DELAY);
}
+ public TagPeriodFormatter tagPeriodFormatter() {
+ return options.get(TAG_PERIOD_FORMATTER);
+ }
+
public Integer tagNumRetainedMax() {
return options.get(TAG_NUM_RETAINED_MAX);
}
@@ -2067,6 +2077,30 @@ public class CoreOptions implements Serializable {
}
}
+ /** The period format options for tag creation. */
+ public enum TagPeriodFormatter implements DescribedEnum {
+ WITH_DASHES("with_dashes", "Dates and hours with dashes, e.g.,
'yyyy-MM-dd HH'"),
+ WITHOUT_DASHES("without_dashes", "Dates and hours without dashes,
e.g., 'yyyyMMdd HH'");
+
+ private final String value;
+ private final String description;
+
+ TagPeriodFormatter(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
/** The period for tag creation. */
public enum TagCreationPeriod implements DescribedEnum {
DAILY("daily", "Generate a tag every day."),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
index 106b0d974..fa18eebea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
@@ -50,6 +50,16 @@ public interface TagPeriodHandler {
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);
+ DateTimeFormatter HOUR_FORMATTER_WITHOUT_DASHES =
+ new DateTimeFormatterBuilder()
+ .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+ .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+ .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+ .appendLiteral(" ")
+ .appendValue(HOUR_OF_DAY, 2, 2, SignStyle.NORMAL)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.LENIENT);
+
DateTimeFormatter DAY_FORMATTER =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
@@ -60,6 +70,14 @@ public interface TagPeriodHandler {
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);
+ DateTimeFormatter DAY_FORMATTER_WITHOUT_DASHES =
+ new DateTimeFormatterBuilder()
+ .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+ .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+ .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.LENIENT);
+
void validateDelay(Duration delay);
LocalDateTime tagToTime(String tag);
@@ -111,6 +129,12 @@ public interface TagPeriodHandler {
/** Hourly {@link TagPeriodHandler}. */
class HourlyTagPeriodHandler extends BaseTagPeriodHandler {
+ CoreOptions.TagPeriodFormatter formatter;
+
+ public HourlyTagPeriodHandler(CoreOptions.TagPeriodFormatter
formatter) {
+ this.formatter = formatter;
+ }
+
static final Duration ONE_PERIOD = Duration.ofHours(1);
@Override
@@ -120,13 +144,26 @@ public interface TagPeriodHandler {
@Override
protected DateTimeFormatter formatter() {
- return HOUR_FORMATTER;
+ switch (formatter) {
+ case WITH_DASHES:
+ return HOUR_FORMATTER;
+ case WITHOUT_DASHES:
+ return HOUR_FORMATTER_WITHOUT_DASHES;
+ default:
+ throw new IllegalArgumentException("Unsupported date
format type");
+ }
}
}
/** Daily {@link TagPeriodHandler}. */
class DailyTagPeriodHandler extends BaseTagPeriodHandler {
+ CoreOptions.TagPeriodFormatter formatter;
+
+ public DailyTagPeriodHandler(CoreOptions.TagPeriodFormatter formatter)
{
+ this.formatter = formatter;
+ }
+
static final Duration ONE_PERIOD = Duration.ofDays(1);
@Override
@@ -136,7 +173,14 @@ public interface TagPeriodHandler {
@Override
protected DateTimeFormatter formatter() {
- return DAY_FORMATTER;
+ switch (formatter) {
+ case WITH_DASHES:
+ return DAY_FORMATTER;
+ case WITHOUT_DASHES:
+ return DAY_FORMATTER_WITHOUT_DASHES;
+ default:
+ throw new IllegalArgumentException("Unsupported date
format type");
+ }
}
@Override
@@ -165,9 +209,9 @@ public interface TagPeriodHandler {
static TagPeriodHandler create(CoreOptions options) {
switch (options.tagCreationPeriod()) {
case DAILY:
- return new DailyTagPeriodHandler();
+ return new DailyTagPeriodHandler(options.tagPeriodFormatter());
case HOURLY:
- return new HourlyTagPeriodHandler();
+ return new
HourlyTagPeriodHandler(options.tagPeriodFormatter());
case TWO_HOURS:
return new TwoHoursTagPeriodHandler();
default:
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index a83e201d5..f011328b4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -70,10 +70,6 @@ public class TagManager {
public void createTag(Snapshot snapshot, String tagName, List<TagCallback>
callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is
blank.", tagName);
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.",
tagName);
- checkArgument(
- !tagName.chars().allMatch(Character::isDigit),
- "Tag name cannot be pure numeric string but is '%s'.",
- tagName);
Path newTagPath = tagPath(tagName);
try {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index abb07083a..f8e864e8c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -927,12 +927,6 @@ public abstract class FileStoreTableTestBase {
AssertionUtils.anyCauseMatches(
IllegalArgumentException.class,
String.format("Tag name '%s' is blank", "")));
-
- assertThatThrownBy(() -> table.createTag("10", 1))
- .satisfies(
- AssertionUtils.anyCauseMatches(
- IllegalArgumentException.class,
- "Tag name cannot be pure numeric string but is
'10'."));
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
index 3b753093b..9b0233ef4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.tag;
import org.apache.paimon.CoreOptions.TagCreationMode;
import org.apache.paimon.CoreOptions.TagCreationPeriod;
+import org.apache.paimon.CoreOptions.TagPeriodFormatter;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
@@ -40,6 +41,7 @@ import static
org.apache.paimon.CoreOptions.TAG_AUTOMATIC_CREATION;
import static org.apache.paimon.CoreOptions.TAG_CREATION_DELAY;
import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD;
import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX;
+import static org.apache.paimon.CoreOptions.TAG_PERIOD_FORMATTER;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for tag automatic creation. */
@@ -246,6 +248,40 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
.containsOnly("savepoint-11", "2023-07-18 13", "2023-07-18
14", "2023-07-18 15");
}
+ @Test
+ public void testTagDatePeriodFormatter() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
+ options.set(TAG_PERIOD_FORMATTER, TagPeriodFormatter.WITHOUT_DASHES);
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
+ assertThat(tagManager.tags().values()).containsOnly("20230717");
+
+ commit.commit(new ManifestCommittable(1,
utcMills("2023-07-19T12:12:00")));
+ assertThat(tagManager.tags().values()).contains("20230717",
"20230718");
+ }
+
+ @Test
+ public void testTagHourlyPeriodFormatter() {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+ options.set(TAG_PERIOD_FORMATTER, TagPeriodFormatter.WITHOUT_DASHES);
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
+ assertThat(tagManager.tags().values()).containsOnly("20230718 11");
+
+ commit.commit(new ManifestCommittable(1,
utcMills("2023-07-18T13:13:00")));
+ assertThat(tagManager.tags().values()).contains("20230718 11",
"20230718 12");
+ }
+
private long localZoneMills(String timestamp) {
return LocalDateTime.parse(timestamp)
.atZone(ZoneId.systemDefault())