This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 28ebec5bb [hive] Introduce metastore.tag-to-partition for Hive
metastore (#2134)
28ebec5bb is described below
commit 28ebec5bb1ca7cf313d7271273c7568219e7a28e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Oct 25 14:58:37 2023 +0800
[hive] Introduce metastore.tag-to-partition for Hive metastore (#2134)
---
docs/content/{project => migration}/_index.md | 7 +-
docs/content/migration/upsert-to-partitioned.md | 174 +++++++++++++++++
docs/content/project/_index.md | 2 +-
.../shortcodes/generated/core_configuration.html | 24 +++
.../generated/spark_connector_configuration.html | 12 +-
.../main/java/org/apache/paimon/CoreOptions.java | 58 +++++-
.../java/org/apache/paimon/data/JoinedRow.java | 8 +
.../paimon/metastore/AddPartitionTagCallback.java | 51 +++++
.../apache/paimon/metastore/MetastoreClient.java | 3 +
.../paimon/metastore/TagPreviewCommitCallback.java | 52 +++++
.../paimon/table/AbstractFileStoreTable.java | 71 +++++--
.../sink/TagCallback.java} | 21 +-
.../org/apache/paimon/tag/TagAutoCreation.java | 216 +--------------------
.../org/apache/paimon/tag/TagPeriodHandler.java | 177 +++++++++++++++++
.../java/org/apache/paimon/tag/TagPreview.java | 94 +++++++++
.../org/apache/paimon/tag/TagTimeExtractor.java | 91 +++++++++
.../apache/paimon/utils/PartitionPathUtils.java | 56 ++++++
.../org/apache/paimon/utils/SnapshotManager.java | 17 +-
.../paimon/catalog/PrimaryKeyTableTestBase.java | 6 +
.../org/apache/paimon/tag/TagAutoCreationTest.java | 5 -
.../java/org/apache/paimon/tag/TagPreviewTest.java | 101 ++++++++++
.../apache/paimon/utils/SnapshotManagerTest.java | 3 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 20 +-
.../apache/paimon/hive/HiveMetastoreClient.java | 10 +-
.../java/org/apache/paimon/hive/HiveSchema.java | 25 ++-
.../paimon/hive/mapred/PaimonInputFormat.java | 56 ++++--
.../paimon/hive/mapred/PaimonRecordReader.java | 20 +-
.../org/apache/paimon/hive/utils/HiveUtils.java | 23 ++-
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 85 ++++++++
.../paimon/hive/mapred/PaimonRecordReaderTest.java | 3 +-
30 files changed, 1205 insertions(+), 286 deletions(-)
diff --git a/docs/content/project/_index.md b/docs/content/migration/_index.md
similarity index 87%
copy from docs/content/project/_index.md
copy to docs/content/migration/_index.md
index dca9a5637..3da339e38 100644
--- a/docs/content/project/_index.md
+++ b/docs/content/migration/_index.md
@@ -1,10 +1,9 @@
---
-title: Project
-icon: <i class="fa fa-sitemap title maindish" aria-hidden="true"></i>
+title: Migration
+icon: <i class="fa fa-briefcase title maindish" aria-hidden="true"></i>
bold: true
bookCollapseSection: true
-sectionBreak: true
-weight: 7
+weight: 8
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/migration/upsert-to-partitioned.md
b/docs/content/migration/upsert-to-partitioned.md
new file mode 100644
index 000000000..8b8830c8f
--- /dev/null
+++ b/docs/content/migration/upsert-to-partitioned.md
@@ -0,0 +1,174 @@
+---
+title: "Upsert To Partitioned"
+weight: 1
+type: docs
+aliases:
+- /migration/upsert-to-partitioned.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Upsert To Partitioned
+
+The [Tag Management]({{< ref "maintenance/manage-tags" >}}) will maintain the
manifests and data files of the snapshot.
+A typical usage is creating tags daily, then you can maintain the historical
data of each day for batch reading.
+
+When using primary key tables, a non-partitioned approach is often used to
maintain updates, in order to mirror and
+synchronize tables from upstream database tables. This allows users to query
the latest data. The tradition of Hive
+data warehouses is not like this. Offline data warehouses require an immutable
view every day to ensure the idempotence
+of calculations. So we created a Tag mechanism to output these views.
+
+However, the traditional use of Hive data warehouses is more accustomed to
using partitions to specify the query's Tag,
+and is more accustomed to using Hive computing engines.
+
+So, we introduce `'metastore.tag-to-partition'` and
`'metastore.tag-to-partition.preview'` to mapping a non-partitioned
+primary key table to the partition table in Hive metastore, and mapping the
partition field to the name of the Tag to be
+fully compatible with Hive.
+
+## Example for Tag to Partition
+
+**Step 1: Create table and tag in Flink SQL**
+
+{{< tabs "Create table and tag in Flink SQL 1" >}}
+{{< tab "Flink" >}}
+```sql
+CREATE CATALOG my_hive WITH (
+ 'type' = 'paimon',
+ 'metastore' = 'hive',
+ 'uri' = 'thrift://<hive-metastore-host-name>:<port>',
+ -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
+ -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos
environment
+ 'warehouse' = 'hdfs:///path/to/warehouse'
+);
+
+USE CATALOG my_hive;
+
+CREATE TABLE mydb.T (
+ pk INT,
+ col1 STRING,
+ col2 STRING
+) WITH (
+ 'bucket' = '-1',
+ 'metastore.tag-to-partition' = 'dt'
+);
+
+INSERT INTO t VALUES (1, '10', '100'), (2, '20', '200');
+
+-- create tag '2023-10-16' for snapshot 1
+CALL sys.create_tag('mydb.T', '2023-10-16', 1);
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+**Step 2: Query table in Hive with Partition Pruning**
+
+{{< tabs "Query table in Hive with Partition Pruning 1" >}}
+{{< tab "Hive" >}}
+```sql
+SHOW PARTITIONS T;
+/*
+OK
+dt=2023-10-16
+*/
+
+SELECT * FROM T WHERE dt='2023-10-16';
+/*
+OK
+1 10 100 2023-10-16
+2 20 200 2023-10-16
+*/
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+## Example for Tag Preview
+
+The above example can only query tags that have already been created, but
Paimon is a real-time data lake, and you also
+need to query the latest data. Therefore, Paimon provides a preview feature:
+
+**Step 1: Create table and tag in Flink SQL**
+
+{{< tabs "Create table and tag in Flink SQL 2" >}}
+{{< tab "Flink" >}}
+```sql
+CREATE CATALOG my_hive WITH (
+ 'type' = 'paimon',
+ 'metastore' = 'hive',
+ 'uri' = 'thrift://<hive-metastore-host-name>:<port>',
+ -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
+ -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos
environment
+ 'warehouse' = 'hdfs:///path/to/warehouse'
+);
+
+USE CATALOG my_hive;
+
+CREATE TABLE mydb.T (
+ pk INT,
+ col1 STRING,
+ col2 STRING
+) WITH (
+ 'bucket' = '-1',
+ 'metastore.tag-to-partition' = 'dt',
+ -- preview tag creation mode process-time
+ -- paimon will create partitions early based on process-time
+ 'metastore.tag-to-partition.preview' = 'process-time'
+);
+
+INSERT INTO t VALUES (1, '10', '100'), (2, '20', '200');
+
+-- create tag '2023-10-16' for snapshot 1
+CALL sys.create_tag('mydb.T', '2023-10-16', 1);
+
+-- new data in '2023-10-17'
+INSERT INTO t VALUES (3, '30', '300'), (4, '40', '400');
+
+-- haven't finished writing the data for '2023-10-17' yet, so there's no need
to create a tag for now
+-- but the data is already visible for Hive
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+**Step 2: Query table in Hive with Partition Pruning**
+
+{{< tabs "Query table in Hive with Partition Pruning 2" >}}
+{{< tab "Hive" >}}
+```sql
+SHOW PARTITIONS T;
+/*
+OK
+dt=2023-10-16
+dt=2023-10-17
+*/
+
+SELECT * FROM T WHERE dt='2023-10-17';
+-- preview tag '2023-10-17'
+/*
+OK
+1 10 100 2023-10-17
+2 20 200 2023-10-17
+3 30 300 2023-10-17
+4 40 400 2023-10-17
+*/
+```
+
+{{< /tab >}}
+{{< /tabs >}}
diff --git a/docs/content/project/_index.md b/docs/content/project/_index.md
index dca9a5637..84aee3057 100644
--- a/docs/content/project/_index.md
+++ b/docs/content/project/_index.md
@@ -4,7 +4,7 @@ icon: <i class="fa fa-sitemap title maindish"
aria-hidden="true"></i>
bold: true
bookCollapseSection: true
sectionBreak: true
-weight: 7
+weight: 9
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 253e6414c..0be1a41c6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -305,6 +305,18 @@ Mainly to resolve data skew on primary keys. We recommend
starting with 64 mb wh
For example, if you want to list all partitions of a Paimon table in Hive, you
need to create this table as a partitioned table in Hive metastore.
This config option does not affect the default filesystem metastore.</td>
</tr>
+ <tr>
+ <td><h5>metastore.tag-to-partition</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Whether to create this table as a partitioned table for
mapping non-partitioned table tags in metastore. This allows the Hive engine to
view this table in a partitioned table view and use partitioning field to read
specific partitions (specific tags).</td>
+ </tr>
+ <tr>
+ <td><h5>metastore.tag-to-partition.preview</h5></td>
+ <td style="word-wrap: break-word;">none</td>
+ <td><p>Enum</p></td>
+ <td>Whether to preview tag of generated snapshots in metastore.
This allows the Hive engine to query specific tag before creation.<br /><br
/>Possible values:<ul><li>"none": No automatically created
tags.</li><li>"process-time": Based on the time of the machine, create TAG once
the processing time passes period time plus delay.</li><li>"watermark": Based
on the watermark of the input, create TAG once the watermark passes period time
plus delay.</li></ul></td>
+ </tr>
<tr>
<td><h5>num-levels</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -557,6 +569,18 @@ This config option does not affect the default filesystem
metastore.</td>
<td><p>Enum</p></td>
<td>Whether to create tag automatically. And how to generate
tags.<br /><br />Possible values:<ul><li>"none": No automatically created
tags.</li><li>"process-time": Based on the time of the machine, create TAG once
the processing time passes period time plus delay.</li><li>"watermark": Based
on the watermark of the input, create TAG once the watermark passes period time
plus delay.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>tag.callback.#.param</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Parameter string for the constructor of class #. Callback
class should parse the parameter by itself.</td>
+ </tr>
+ <tr>
+ <td><h5>tag.callbacks</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>A list of commit callback classes to be called after a
successful tag. Class names are connected with comma (example:
com.test.CallbackA,com.sample.CallbackB).</td>
+ </tr>
<tr>
<td><h5>tag.creation-delay</h5></td>
<td style="word-wrap: break-word;">0 ms</td>
diff --git
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 2e6bf03e4..00ca2ba17 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -26,6 +26,12 @@ under the License.
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>read.changelog</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to read row in the form of changelog (add rowkind
column in row to represent its change type).</td>
+ </tr>
<tr>
<td><h5>read.stream.maxBytesPerTrigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -56,12 +62,6 @@ under the License.
<td>Long</td>
<td>The minimum number of rows returned in a single batch, which
used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs
together.</td>
</tr>
- <tr>
- <td><h5>read.changelog</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to read row in the form of changelog (add rowkind
column in row to represent its change type).</td>
- </tr>
<tr>
<td><h5>write.merge-schema</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 84890acbf..88243fc6b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -33,6 +33,8 @@ import org.apache.paimon.options.description.InlineElement;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
@@ -790,6 +792,23 @@ public class CoreOptions implements Serializable {
"Parameter string for the constructor of class #. "
+ "Callback class should parse the
parameter by itself.");
+ public static final ConfigOption<String> TAG_CALLBACKS =
+ key("tag.callbacks")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ "A list of commit callback classes to be called
after a successful tag. "
+ + "Class names are connected with comma "
+ + "(example:
com.test.CallbackA,com.sample.CallbackB).");
+
+ public static final ConfigOption<String> TAG_CALLBACK_PARAM =
+ key("tag.callback.#.param")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Parameter string for the constructor of class #. "
+ + "Callback class should parse the
parameter by itself.");
+
public static final ConfigOption<Boolean> METASTORE_PARTITIONED_TABLE =
key("metastore.partitioned-table")
.booleanType()
@@ -800,6 +819,23 @@ public class CoreOptions implements Serializable {
+ "you need to create this table as a
partitioned table in Hive metastore.\n"
+ "This config option does not affect the
default filesystem metastore.");
+ public static final ConfigOption<String> METASTORE_TAG_TO_PARTITION =
+ key("metastore.tag-to-partition")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Whether to create this table as a partitioned
table for mapping non-partitioned table tags in metastore. "
+ + "This allows the Hive engine to view
this table in a partitioned table view and "
+ + "use partitioning field to read specific
partitions (specific tags).");
+
+ public static final ConfigOption<TagCreationMode>
METASTORE_TAG_TO_PARTITION_PREVIEW =
+ key("metastore.tag-to-partition.preview")
+ .enumType(TagCreationMode.class)
+ .defaultValue(TagCreationMode.NONE)
+ .withDescription(
+ "Whether to preview tag of generated snapshots in
metastore. "
+ + "This allows the Hive engine to query
specific tag before creation.");
+
public static final ConfigOption<TagCreationMode> TAG_AUTOMATIC_CREATION =
key("tag.automatic-creation")
.enumType(TagCreationMode.class)
@@ -1270,6 +1306,15 @@ public class CoreOptions implements Serializable {
return options.get(METASTORE_PARTITIONED_TABLE);
}
+ @Nullable
+ public String tagToPartitionField() {
+ return options.get(METASTORE_TAG_TO_PARTITION);
+ }
+
+ public TagCreationMode tagToPartitionPreview() {
+ return options.get(METASTORE_TAG_TO_PARTITION_PREVIEW);
+ }
+
public TagCreationMode tagCreationMode() {
return options.get(TAG_AUTOMATIC_CREATION);
}
@@ -1305,14 +1350,23 @@ public class CoreOptions implements Serializable {
}
public Map<String, String> commitCallbacks() {
+ return callbacks(COMMIT_CALLBACKS, COMMIT_CALLBACK_PARAM);
+ }
+
+ public Map<String, String> tagCallbacks() {
+ return callbacks(TAG_CALLBACKS, TAG_CALLBACK_PARAM);
+ }
+
+ private Map<String, String> callbacks(
+ ConfigOption<String> callbacks, ConfigOption<String>
callbackParam) {
Map<String, String> result = new HashMap<>();
- for (String className : options.get(COMMIT_CALLBACKS).split(",")) {
+ for (String className : options.get(callbacks).split(",")) {
className = className.trim();
if (className.length() == 0) {
continue;
}
- String param =
options.get(COMMIT_CALLBACK_PARAM.key().replace("#", className));
+ String param = options.get(callbackParam.key().replace("#",
className));
result.put(className, param);
}
return result;
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
index e9126cd7c..526886598 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
@@ -87,6 +87,14 @@ public class JoinedRow implements InternalRow {
return this;
}
+ public InternalRow row1() {
+ return row1;
+ }
+
+ public InternalRow row2() {
+ return row2;
+ }
+
//
---------------------------------------------------------------------------------------------
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
new file mode 100644
index 000000000..33f5ed5a9
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+import org.apache.paimon.table.sink.TagCallback;
+
+import java.util.LinkedHashMap;
+
+/** A {@link TagCallback} to add newly created partitions to metastore. */
+public class AddPartitionTagCallback implements TagCallback {
+
+ private final MetastoreClient client;
+ private final String partitionField;
+
+ public AddPartitionTagCallback(MetastoreClient client, String
partitionField) {
+ this.client = client;
+ this.partitionField = partitionField;
+ }
+
+ @Override
+ public void notifyCreation(String tagName) {
+ LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
+ partitionSpec.put(partitionField, tagName);
+ try {
+ client.addPartition(partitionSpec);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
index 2ad601d2e..615e78330 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
@@ -21,6 +21,7 @@ package org.apache.paimon.metastore;
import org.apache.paimon.data.BinaryRow;
import java.io.Serializable;
+import java.util.LinkedHashMap;
/**
* A metastore client related to a table. All methods of this interface
operate on the same specific
@@ -30,6 +31,8 @@ public interface MetastoreClient extends AutoCloseable {
void addPartition(BinaryRow partition) throws Exception;
+ void addPartition(LinkedHashMap<String, String> partitionSpec) throws
Exception;
+
/** Factory to create {@link MetastoreClient}. */
interface Factory extends Serializable {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
new file mode 100644
index 000000000..e0aa8597b
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.tag.TagPreview;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link CommitCallback} to add partitions to metastore for tag preview. */
+public class TagPreviewCommitCallback implements CommitCallback {
+
+ private final AddPartitionTagCallback tagCallback;
+ private final TagPreview tagPreview;
+
+ public TagPreviewCommitCallback(AddPartitionTagCallback tagCallback,
TagPreview tagPreview) {
+ this.tagCallback = tagCallback;
+ this.tagPreview = tagPreview;
+ }
+
+ @Override
+ public void call(List<ManifestCommittable> committables) {
+ long currentMillis = System.currentTimeMillis();
+ for (ManifestCommittable c : committables) {
+ Optional<String> tagOptional =
tagPreview.extractTag(currentMillis, c.watermark());
+ tagOptional.ifPresent(tagCallback::notifyCreation);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ tagCallback.close();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 875e2fed7..596b62109 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -26,6 +26,9 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.AddPartitionCommitCallback;
+import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.metastore.TagPreviewCommitCallback;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.Options;
@@ -39,6 +42,7 @@ import
org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerStreamTableScanImpl;
@@ -51,6 +55,8 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
+import org.apache.paimon.tag.TagPreview;
+import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -58,6 +64,7 @@ import org.apache.paimon.utils.TagManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -267,20 +274,53 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
private List<CommitCallback> createCommitCallbacks() {
List<CommitCallback> callbacks = new
ArrayList<>(loadCommitCallbacks());
- if (coreOptions().partitionedTableInMetastore()
- && catalogEnvironment.metastoreClientFactory() != null
+ CoreOptions options = coreOptions();
+ MetastoreClient.Factory metastoreClientFactory =
+ catalogEnvironment.metastoreClientFactory();
+ if (options.partitionedTableInMetastore()
+ && metastoreClientFactory != null
&& tableSchema.partitionKeys().size() > 0) {
+ callbacks.add(new
AddPartitionCommitCallback(metastoreClientFactory.create()));
+ }
+ TagPreview tagPreview = TagPreview.create(options);
+ if (options.tagToPartitionField() != null
+ && tagPreview != null
+ && metastoreClientFactory != null
+ && tableSchema.partitionKeys().isEmpty()) {
+ TagPreviewCommitCallback callback =
+ new TagPreviewCommitCallback(
+ new AddPartitionTagCallback(
+ metastoreClientFactory.create(),
options.tagToPartitionField()),
+ tagPreview);
+ callbacks.add(callback);
+ }
+ return callbacks;
+ }
+
+ private List<TagCallback> createTagCallbacks() {
+ List<TagCallback> callbacks = new ArrayList<>(loadTagCallbacks());
+ String partitionField = coreOptions().tagToPartitionField();
+ MetastoreClient.Factory metastoreClientFactory =
+ catalogEnvironment.metastoreClientFactory();
+ if (partitionField != null && metastoreClientFactory != null) {
callbacks.add(
- new AddPartitionCommitCallback(
-
catalogEnvironment.metastoreClientFactory().create()));
+ new
AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));
}
return callbacks;
}
+ private List<TagCallback> loadTagCallbacks() {
+ return loadCallbacks(coreOptions().tagCallbacks(), TagCallback.class);
+ }
+
private List<CommitCallback> loadCommitCallbacks() {
- List<CommitCallback> result = new ArrayList<>();
+ return loadCallbacks(coreOptions().commitCallbacks(),
CommitCallback.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> List<T> loadCallbacks(Map<String, String> clazzParamMaps,
Class<T> expectClass) {
+ List<T> result = new ArrayList<>();
- Map<String, String> clazzParamMaps = coreOptions().commitCallbacks();
for (Map.Entry<String, String> classParamEntry :
clazzParamMaps.entrySet()) {
String className = classParamEntry.getKey();
String param = classParamEntry.getValue();
@@ -293,15 +333,14 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
}
Preconditions.checkArgument(
- CommitCallback.class.isAssignableFrom(clazz),
- "Class " + clazz + " must implement " +
CommitCallback.class);
+ expectClass.isAssignableFrom(clazz),
+ "Class " + clazz + " must implement " + expectClass);
try {
if (param == null) {
- result.add((CommitCallback) clazz.newInstance());
+ result.add((T) clazz.newInstance());
} else {
- result.add(
- (CommitCallback)
clazz.getConstructor(String.class).newInstance(param));
+ result.add((T)
clazz.getConstructor(String.class).newInstance(param));
}
} catch (Exception e) {
throw new RuntimeException(
@@ -414,6 +453,16 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
Snapshot snapshot = snapshotManager.snapshot(fromSnapshotId);
tagManager().createTag(snapshot, tagName);
+
+ List<TagCallback> callbacks = Collections.emptyList();
+ try {
+ callbacks = createTagCallbacks();
+ callbacks.forEach(callback -> callback.notifyCreation(tagName));
+ } finally {
+ for (TagCallback tagCallback : callbacks) {
+ IOUtils.closeQuietly(tagCallback);
+ }
+ }
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
similarity index 63%
copy from
paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
copy to paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
index 2ad601d2e..397b341d9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
@@ -16,23 +16,14 @@
* limitations under the License.
*/
-package org.apache.paimon.metastore;
-
-import org.apache.paimon.data.BinaryRow;
-
-import java.io.Serializable;
+package org.apache.paimon.table.sink;
/**
- * A metastore client related to a table. All methods of this interface
operate on the same specific
- * table.
+ * This callback will be called after tag operations.
+ *
+ * <p>NOTE: No guarantee that this callback must be called.
*/
-public interface MetastoreClient extends AutoCloseable {
-
- void addPartition(BinaryRow partition) throws Exception;
-
- /** Factory to create {@link MetastoreClient}. */
- interface Factory extends Serializable {
+public interface TagCallback extends AutoCloseable {
- MetastoreClient create();
- }
+ void notifyCreation(String tagName);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 81014ef6d..6ec7f1cf2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -20,7 +20,6 @@ package org.apache.paimon.tag;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.Timestamp;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -28,54 +27,20 @@ import org.apache.paimon.utils.TagManager;
import javax.annotation.Nullable;
import java.time.Duration;
-import java.time.Instant;
-import java.time.LocalDate;
import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.format.ResolverStyle;
-import java.time.format.SignStyle;
import java.util.Optional;
import java.util.SortedMap;
-import static java.time.temporal.ChronoField.DAY_OF_MONTH;
-import static java.time.temporal.ChronoField.HOUR_OF_DAY;
-import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
-import static java.time.temporal.ChronoField.YEAR;
import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
import static
org.apache.paimon.shade.guava30.com.google.common.base.MoreObjects.firstNonNull;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/** A manager to create tags automatically. */
public class TagAutoCreation {
- private static final DateTimeFormatter HOUR_FORMATTER =
- new DateTimeFormatterBuilder()
- .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
- .appendLiteral('-')
- .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
- .appendLiteral('-')
- .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
- .appendLiteral(" ")
- .appendValue(HOUR_OF_DAY, 2, 2, SignStyle.NORMAL)
- .toFormatter()
- .withResolverStyle(ResolverStyle.LENIENT);
-
- private static final DateTimeFormatter DAY_FORMATTER =
- new DateTimeFormatterBuilder()
- .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
- .appendLiteral('-')
- .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
- .appendLiteral('-')
- .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
- .toFormatter()
- .withResolverStyle(ResolverStyle.LENIENT);
-
private final SnapshotManager snapshotManager;
private final TagManager tagManager;
private final TagDeletion tagDeletion;
- private final TimeExtractor timeExtractor;
+ private final TagTimeExtractor timeExtractor;
private final TagPeriodHandler periodHandler;
private final Duration delay;
private final Integer numRetainedMax;
@@ -87,7 +52,7 @@ public class TagAutoCreation {
SnapshotManager snapshotManager,
TagManager tagManager,
TagDeletion tagDeletion,
- TimeExtractor timeExtractor,
+ TagTimeExtractor timeExtractor,
TagPeriodHandler periodHandler,
Duration delay,
Integer numRetainedMax) {
@@ -133,7 +98,8 @@ public class TagAutoCreation {
}
private void tryToTag(Snapshot snapshot) {
- Optional<LocalDateTime> timeOptional = timeExtractor.extract(snapshot);
+ Optional<LocalDateTime> timeOptional =
+ timeExtractor.extract(snapshot.timeMillis(),
snapshot.watermark());
if (!timeOptional.isPresent()) {
return;
}
@@ -141,7 +107,7 @@ public class TagAutoCreation {
LocalDateTime time = timeOptional.get();
if (nextTag == null
|| isAfterOrEqual(time.minus(delay),
periodHandler.nextTagTime(nextTag))) {
- LocalDateTime thisTag = periodHandler.normalizeToTagTime(time);
+ LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time);
String tagName = periodHandler.timeToTag(thisTag);
tagManager.createTag(snapshot, tagName);
nextTag = periodHandler.nextTagTime(thisTag);
@@ -167,184 +133,22 @@ public class TagAutoCreation {
return t1.isAfter(t2) || t1.isEqual(t2);
}
- private interface TimeExtractor {
-
- Optional<LocalDateTime> extract(Snapshot snapshot);
- }
-
- private static class ProcessTimeExtractor implements TimeExtractor {
-
- @Override
- public Optional<LocalDateTime> extract(Snapshot snapshot) {
- return Optional.of(
- Instant.ofEpochMilli(snapshot.timeMillis())
- .atZone(ZoneId.systemDefault())
- .toLocalDateTime());
- }
- }
-
- private static class WatermarkExtractor implements TimeExtractor {
-
- private final ZoneId watermarkZoneId;
-
- private WatermarkExtractor(ZoneId watermarkZoneId) {
- this.watermarkZoneId = watermarkZoneId;
- }
-
- @Override
- public Optional<LocalDateTime> extract(Snapshot snapshot) {
- Long watermark = snapshot.watermark();
- if (watermark == null) {
- return Optional.empty();
- }
-
- return Optional.of(
-
Instant.ofEpochMilli(watermark).atZone(watermarkZoneId).toLocalDateTime());
- }
- }
-
- private interface TagPeriodHandler {
-
- void validateDelay(Duration delay);
-
- LocalDateTime tagToTime(String tag);
-
- LocalDateTime normalizeToTagTime(LocalDateTime time);
-
- String timeToTag(LocalDateTime time);
-
- LocalDateTime nextTagTime(LocalDateTime time);
- }
-
- private abstract static class BaseTagPeriodHandler implements
TagPeriodHandler {
-
- protected abstract Duration onePeriod();
-
- protected abstract DateTimeFormatter formatter();
-
- @Override
- public void validateDelay(Duration delay) {
- checkArgument(onePeriod().compareTo(delay) > 0);
- }
-
- @Override
- public LocalDateTime tagToTime(String tag) {
- return LocalDateTime.parse(tag, formatter());
- }
-
- @Override
- public LocalDateTime normalizeToTagTime(LocalDateTime time) {
- long mills = Timestamp.fromLocalDateTime(time).getMillisecond();
- long periodMills = onePeriod().toMillis();
- LocalDateTime normalized =
- Timestamp.fromEpochMillis((mills / periodMills) *
periodMills)
- .toLocalDateTime();
- return normalized.minus(onePeriod());
- }
-
- @Override
- public String timeToTag(LocalDateTime time) {
- return time.format(formatter());
- }
-
- @Override
- public LocalDateTime nextTagTime(LocalDateTime time) {
- return time.plus(onePeriod());
- }
- }
-
- private static class HourlyTagPeriodHandler extends BaseTagPeriodHandler {
-
- private static final Duration ONE_PERIOD = Duration.ofHours(1);
-
- @Override
- protected Duration onePeriod() {
- return ONE_PERIOD;
- }
-
- @Override
- protected DateTimeFormatter formatter() {
- return HOUR_FORMATTER;
- }
- }
-
- private static class DailyTagPeriodHandler extends BaseTagPeriodHandler {
-
- private static final Duration ONE_PERIOD = Duration.ofDays(1);
-
- @Override
- protected Duration onePeriod() {
- return ONE_PERIOD;
- }
-
- @Override
- protected DateTimeFormatter formatter() {
- return DAY_FORMATTER;
- }
-
- @Override
- public LocalDateTime tagToTime(String tag) {
- return LocalDate.parse(tag, formatter()).atStartOfDay();
- }
- }
-
- private static class TwoHoursTagPeriodHandler extends BaseTagPeriodHandler
{
-
- private static final Duration ONE_PERIOD = Duration.ofHours(2);
-
- @Override
- protected Duration onePeriod() {
- return ONE_PERIOD;
- }
-
- @Override
- protected DateTimeFormatter formatter() {
- return HOUR_FORMATTER;
- }
- }
-
@Nullable
public static TagAutoCreation create(
CoreOptions options,
SnapshotManager snapshotManager,
TagManager tagManager,
TagDeletion tagDeletion) {
- TimeExtractor timeExtractor;
- switch (options.tagCreationMode()) {
- case NONE:
- return null;
- case PROCESS_TIME:
- timeExtractor = new ProcessTimeExtractor();
- break;
- case WATERMARK:
- timeExtractor = new
WatermarkExtractor(ZoneId.of(options.sinkWatermarkTimeZone()));
- break;
- default:
- throw new UnsupportedOperationException("Unsupported " +
options.tagCreationMode());
- }
-
- TagPeriodHandler periodHandler;
- switch (options.tagCreationPeriod()) {
- case DAILY:
- periodHandler = new DailyTagPeriodHandler();
- break;
- case HOURLY:
- periodHandler = new HourlyTagPeriodHandler();
- break;
- case TWO_HOURS:
- periodHandler = new TwoHoursTagPeriodHandler();
- break;
- default:
- throw new UnsupportedOperationException(
- "Unsupported " + options.tagCreationPeriod());
+ TagTimeExtractor extractor =
TagTimeExtractor.createForAutoTag(options);
+ if (extractor == null) {
+ return null;
}
-
return new TagAutoCreation(
snapshotManager,
tagManager,
tagDeletion,
- timeExtractor,
- periodHandler,
+ extractor,
+ TagPeriodHandler.create(options),
options.tagCreationDelay(),
options.tagNumRetainedMax());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
new file mode 100644
index 000000000..bf7af0497
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.Timestamp;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.ResolverStyle;
+import java.time.format.SignStyle;
+
+import static java.time.temporal.ChronoField.DAY_OF_MONTH;
+import static java.time.temporal.ChronoField.HOUR_OF_DAY;
+import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
+import static java.time.temporal.ChronoField.YEAR;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Handle time for tag. */
+public interface TagPeriodHandler {
+
+ DateTimeFormatter HOUR_FORMATTER =
+ new DateTimeFormatterBuilder()
+ .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+ .appendLiteral('-')
+ .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+ .appendLiteral('-')
+ .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+ .appendLiteral(" ")
+ .appendValue(HOUR_OF_DAY, 2, 2, SignStyle.NORMAL)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.LENIENT);
+
+ DateTimeFormatter DAY_FORMATTER =
+ new DateTimeFormatterBuilder()
+ .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+ .appendLiteral('-')
+ .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+ .appendLiteral('-')
+ .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.LENIENT);
+
+ void validateDelay(Duration delay);
+
+ LocalDateTime tagToTime(String tag);
+
+ LocalDateTime normalizeToPreviousTag(LocalDateTime time);
+
+ String timeToTag(LocalDateTime time);
+
+ LocalDateTime nextTagTime(LocalDateTime time);
+
+ /** Base implementation of {@link TagPeriodHandler}. */
+ abstract class BaseTagPeriodHandler implements TagPeriodHandler {
+
+ protected abstract Duration onePeriod();
+
+ protected abstract DateTimeFormatter formatter();
+
+ @Override
+ public void validateDelay(Duration delay) {
+ checkArgument(onePeriod().compareTo(delay) > 0);
+ }
+
+ @Override
+ public LocalDateTime tagToTime(String tag) {
+ return LocalDateTime.parse(tag, formatter());
+ }
+
+ @Override
+ public LocalDateTime normalizeToPreviousTag(LocalDateTime time) {
+ long mills = Timestamp.fromLocalDateTime(time).getMillisecond();
+ long periodMills = onePeriod().toMillis();
+ LocalDateTime normalized =
+ Timestamp.fromEpochMillis((mills / periodMills) *
periodMills)
+ .toLocalDateTime();
+ return normalized.minus(onePeriod());
+ }
+
+ @Override
+ public String timeToTag(LocalDateTime time) {
+ return time.format(formatter());
+ }
+
+ @Override
+ public LocalDateTime nextTagTime(LocalDateTime time) {
+ return time.plus(onePeriod());
+ }
+ }
+
+ /** Hourly {@link TagPeriodHandler}. */
+ class HourlyTagPeriodHandler extends BaseTagPeriodHandler {
+
+ static final Duration ONE_PERIOD = Duration.ofHours(1);
+
+ @Override
+ protected Duration onePeriod() {
+ return ONE_PERIOD;
+ }
+
+ @Override
+ protected DateTimeFormatter formatter() {
+ return HOUR_FORMATTER;
+ }
+ }
+
+ /** Daily {@link TagPeriodHandler}. */
+ class DailyTagPeriodHandler extends BaseTagPeriodHandler {
+
+ static final Duration ONE_PERIOD = Duration.ofDays(1);
+
+ @Override
+ protected Duration onePeriod() {
+ return ONE_PERIOD;
+ }
+
+ @Override
+ protected DateTimeFormatter formatter() {
+ return DAY_FORMATTER;
+ }
+
+ @Override
+ public LocalDateTime tagToTime(String tag) {
+ return LocalDate.parse(tag, formatter()).atStartOfDay();
+ }
+ }
+
+ /** Two Hours {@link TagPeriodHandler}. */
+ class TwoHoursTagPeriodHandler extends BaseTagPeriodHandler {
+
+ static final Duration ONE_PERIOD = Duration.ofHours(2);
+
+ @Override
+ protected Duration onePeriod() {
+ return ONE_PERIOD;
+ }
+
+ @Override
+ protected DateTimeFormatter formatter() {
+ return HOUR_FORMATTER;
+ }
+ }
+
+ static TagPeriodHandler create(CoreOptions options) {
+ switch (options.tagCreationPeriod()) {
+ case DAILY:
+ return new DailyTagPeriodHandler();
+ case HOURLY:
+ return new HourlyTagPeriodHandler();
+ case TWO_HOURS:
+ return new TwoHoursTagPeriodHandler();
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported " + options.tagCreationPeriod());
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
new file mode 100644
index 000000000..671e8841d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import javax.annotation.Nullable;
+
+import java.time.LocalDateTime;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
+
+/** A class of tag preview to find suitable snapshots. */
+public class TagPreview {
+
+ private final TagTimeExtractor timeExtractor;
+ private final TagPeriodHandler periodHandler;
+
+ private TagPreview(CoreOptions options) {
+ this.timeExtractor = TagTimeExtractor.createForTagPreview(options);
+ this.periodHandler = TagPeriodHandler.create(options);
+ }
+
+ public static TagPreview create(CoreOptions options) {
+ if (options.tagToPartitionPreview() !=
CoreOptions.TagCreationMode.NONE) {
+ return new TagPreview(options);
+ }
+ return null;
+ }
+
+ public Optional<String> extractTag(long timeMilli, @Nullable Long
watermark) {
+ Optional<LocalDateTime> timeOptional =
timeExtractor.extract(timeMilli, watermark);
+ if (!timeOptional.isPresent()) {
+ return Optional.empty();
+ }
+ LocalDateTime currentTag =
+
periodHandler.nextTagTime(periodHandler.normalizeToPreviousTag(timeOptional.get()));
+ String tag = periodHandler.timeToTag(currentTag);
+ return Optional.of(tag);
+ }
+
+ public Map<String, String> timeTravel(FileStoreTable table, String tag) {
+ TagManager tagManager = table.tagManager();
+ if (tagManager.tagExists(tag)) {
+ return singletonMap(SCAN_TAG_NAME.key(), tag);
+ }
+
+ SnapshotManager snapshotManager = table.snapshotManager();
+ Snapshot snapshot =
+ snapshotManager.traversalSnapshotsFromLatestSafely(
+ s ->
+ extractTag(s.timeMillis(), s.watermark())
+ .map(t -> t.compareTo(tag) <= 0)
+ .orElse(false));
+ if (snapshot != null) {
+ return singletonMap(SCAN_SNAPSHOT_ID.key(),
String.valueOf(snapshot.id()));
+ }
+
+ Optional<String> findTag =
+ tagManager.tags().values().stream()
+ .filter(t -> t.compareTo(tag) <= 0)
+ .max(Comparator.naturalOrder());
+ if (findTag.isPresent()) {
+ return singletonMap(SCAN_TAG_NAME.key(), findTag.get());
+ }
+
+ throw new RuntimeException("Cannot find snapshot or tag for tag name:
" + tag);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
new file mode 100644
index 000000000..dadfde3a7
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Optional;
+
+/** Extractor to extract tag time from {@link Snapshot}. */
+public interface TagTimeExtractor {
+
+ Optional<LocalDateTime> extract(long timeMilli, @Nullable Long watermark);
+
+ /** Extract time from snapshot time millis. */
+ class ProcessTimeExtractor implements TagTimeExtractor {
+
+ @Override
+ public Optional<LocalDateTime> extract(long timeMilli, @Nullable Long
watermark) {
+ return Optional.of(
+ Instant.ofEpochMilli(timeMilli)
+ .atZone(ZoneId.systemDefault())
+ .toLocalDateTime());
+ }
+ }
+
+ /** Extract time from snapshot watermark. */
+ class WatermarkExtractor implements TagTimeExtractor {
+
+ private final ZoneId watermarkZoneId;
+
+ private WatermarkExtractor(ZoneId watermarkZoneId) {
+ this.watermarkZoneId = watermarkZoneId;
+ }
+
+ @Override
+ public Optional<LocalDateTime> extract(long timeMilli, @Nullable Long
watermark) {
+ if (watermark == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+
Instant.ofEpochMilli(watermark).atZone(watermarkZoneId).toLocalDateTime());
+ }
+ }
+
+ @Nullable
+ static TagTimeExtractor createForAutoTag(CoreOptions options) {
+ return create(options.tagCreationMode(), options);
+ }
+
+ @Nullable
+ static TagTimeExtractor createForTagPreview(CoreOptions options) {
+ return create(options.tagToPartitionPreview(), options);
+ }
+
+ @Nullable
+ static TagTimeExtractor create(CoreOptions.TagCreationMode mode,
CoreOptions options) {
+ switch (mode) {
+ case NONE:
+ return null;
+ case PROCESS_TIME:
+ return new ProcessTimeExtractor();
+ case WATERMARK:
+ return new
WatermarkExtractor(ZoneId.of(options.sinkWatermarkTimeZone()));
+ default:
+ throw new UnsupportedOperationException("Unsupported " +
options.tagCreationMode());
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
index 30e359ffd..7c1a5b6db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
@@ -25,10 +25,14 @@ import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/** Utils for file system. */
public class PartitionPathUtils {
+ private static final Pattern PARTITION_NAME_PATTERN =
Pattern.compile("([^/]+)=([^/]+)");
+
private static final BitSet CHAR_TO_ESCAPE = new BitSet(128);
static {
@@ -151,4 +155,56 @@ public class PartitionPathUtils {
}
sb.append(Integer.toHexString(c).toUpperCase());
}
+
+ public static String unescapePathName(String path) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < path.length(); i++) {
+ char c = path.charAt(i);
+ if (c == '%' && i + 2 < path.length()) {
+ int code = -1;
+ try {
+ code = Integer.parseInt(path.substring(i + 1, i + 3), 16);
+ } catch (Exception ignored) {
+ }
+ if (code >= 0) {
+ sb.append((char) code);
+ i += 2;
+ continue;
+ }
+ }
+ sb.append(c);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Make partition spec from path.
+ *
+ * @param currPath partition file path.
+ * @return Sequential partition specs.
+ */
+ public static LinkedHashMap<String, String>
extractPartitionSpecFromPath(Path currPath) {
+ LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>();
+ List<String[]> kvs = new ArrayList<>();
+ do {
+ String component = currPath.getName();
+ Matcher m = PARTITION_NAME_PATTERN.matcher(component);
+ if (m.matches()) {
+ String k = unescapePathName(m.group(1));
+ String v = unescapePathName(m.group(2));
+ String[] kv = new String[2];
+ kv[0] = k;
+ kv[1] = v;
+ kvs.add(kv);
+ }
+ currPath = currPath.getParent();
+ } while (currPath != null && !currPath.getName().isEmpty());
+
+ // reverse the list since we checked the part from leaf dir to table's
base dir
+ for (int i = kvs.size(); i > 0; i--) {
+ fullPartSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
+ }
+
+ return fullPartSpec;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 0c8832ca3..80f4a11b0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -37,7 +37,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
-import java.util.function.Function;
import java.util.function.Predicate;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
@@ -258,14 +257,15 @@ public class SnapshotManager implements Serializable {
* because the committer may delete obsolete snapshots, which may cause
the writer to encounter
* unreadable snapshots.
*/
- public void traversalSnapshotsFromLatestSafely(Function<Snapshot, Boolean>
consumer) {
+ @Nullable
+ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot>
checker) {
Long latestId = latestSnapshotId();
if (latestId == null) {
- return;
+ return null;
}
Long earliestId = earliestSnapshotId();
if (earliestId == null) {
- return;
+ return null;
}
for (long id = latestId; id >= earliestId; id--) {
@@ -275,7 +275,7 @@ public class SnapshotManager implements Serializable {
} catch (Exception e) {
Long newEarliestId = earliestSnapshotId();
if (newEarliestId == null) {
- return;
+ return null;
}
// this is a valid snapshot, should not throw exception
@@ -284,13 +284,14 @@ public class SnapshotManager implements Serializable {
}
// ok, this is an expired snapshot
- return;
+ return null;
}
- if (consumer.apply(snapshot)) {
- return;
+ if (checker.test(snapshot)) {
+ return snapshot;
}
}
+ return null;
}
private @Nullable Long findLatest() throws IOException {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
index 8e3118856..529f84b6e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
@@ -30,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.time.LocalDateTime;
import java.util.UUID;
import java.util.function.Predicate;
@@ -76,4 +78,8 @@ public abstract class PrimaryKeyTableTestBase {
protected Options tableOptions() {
return new Options();
}
+
+ protected static long utcMills(String timestamp) {
+ return
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
index 2d7b771e0..5ce5ba9c5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
@@ -21,7 +21,6 @@ package org.apache.paimon.tag;
import org.apache.paimon.CoreOptions.TagCreationMode;
import org.apache.paimon.CoreOptions.TagCreationPeriod;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
-import org.apache.paimon.data.Timestamp;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
@@ -189,10 +188,6 @@ public class TagAutoCreationTest extends
PrimaryKeyTableTestBase {
commit.close();
}
- private long utcMills(String timestamp) {
- return
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
- }
-
private long localZoneMills(String timestamp) {
return LocalDateTime.parse(timestamp)
.atZone(ZoneId.systemDefault())
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/TagPreviewTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagPreviewTest.java
new file mode 100644
index 000000000..36a4cf569
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagPreviewTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.TagCreationMode;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.sink.TableCommitImpl;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION_PREVIEW;
+import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TagPreview}. */
+public class TagPreviewTest extends PrimaryKeyTableTestBase {
+
+ @Test
+ public void testExtractTag() {
+ TagPreview preview = create();
+ Optional<String> tag = preview.extractTag(-1,
utcMills("2023-07-18T12:12:00"));
+ assertThat(tag).hasValue("2023-07-18");
+ }
+
+ @Test
+ public void testTimeTravel() {
+ TagPreview preview = create();
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
+ dynamicOptions.put(SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
+ TableCommitImpl commit =
+
table.copy(dynamicOptions).newCommit(commitUser).ignoreEmptyCommit(false);
+
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
+ assertThat(preview.timeTravel(table, "2023-07-18"))
+ .containsAllEntriesOf(singletonMap(SCAN_SNAPSHOT_ID.key(),
"1"));
+
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T13:12:00")));
+ assertThat(preview.timeTravel(table, "2023-07-18"))
+ .containsAllEntriesOf(singletonMap(SCAN_SNAPSHOT_ID.key(),
"2"));
+
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-19T12:12:00")));
+ assertThat(preview.timeTravel(table, "2023-07-18"))
+ .containsAllEntriesOf(singletonMap(SCAN_SNAPSHOT_ID.key(),
"2"));
+ assertThat(preview.timeTravel(table, "2023-07-19"))
+ .containsAllEntriesOf(singletonMap(SCAN_SNAPSHOT_ID.key(),
"3"));
+ assertThat(preview.timeTravel(table, "2023-07-20"))
+ .containsAllEntriesOf(singletonMap(SCAN_SNAPSHOT_ID.key(),
"3"));
+
+ table.createTag("2023-07-17", 1);
+ table.createTag("2023-07-18", 2);
+ assertThat(preview.timeTravel(table, "2023-07-18"))
+ .containsAllEntriesOf(singletonMap(SCAN_TAG_NAME.key(),
"2023-07-18"));
+
+ // expire 2023-07-19
+ for (int i = 0; i < 5; i++) {
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-21T12:12:00")));
+ }
+ table.createTag("2023-07-20", 7);
+ assertThat(preview.timeTravel(table, "2023-07-20"))
+ .containsAllEntriesOf(singletonMap(SCAN_TAG_NAME.key(),
"2023-07-20"));
+
+ assertThat(preview.timeTravel(table, "2023-07-19"))
+ .containsAllEntriesOf(singletonMap(SCAN_TAG_NAME.key(),
"2023-07-18"));
+ }
+
+ private TagPreview create() {
+ Options options = new Options();
+ options.set(METASTORE_TAG_TO_PARTITION_PREVIEW,
TagCreationMode.WATERMARK);
+ TagPreview preview = TagPreview.create(new CoreOptions(options));
+ assertThat(preview).isNotNull();
+ return preview;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 33c81b146..5e689a826 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -30,7 +30,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -136,7 +135,7 @@ public class SnapshotManagerTest {
});
// test safely
- Function<Snapshot, Boolean> func =
+ Filter<Snapshot> func =
snapshot -> {
try {
Thread.sleep(100);
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 59aaebe65..f4ca05751 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -34,6 +34,7 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.TableType;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
import org.apache.flink.table.hive.LegacyHiveClasses;
import org.apache.hadoop.conf.Configuration;
@@ -79,6 +80,7 @@ import static
org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
@@ -476,7 +478,8 @@ public class HiveCatalog extends AbstractCatalog {
serDeInfo.setSerializationLib(SERDE_CLASS_NAME);
sd.setSerdeInfo(serDeInfo);
- if (new CoreOptions(schema.options()).partitionedTableInMetastore()) {
+ CoreOptions options = new CoreOptions(schema.options());
+ if (options.partitionedTableInMetastore() &&
schema.partitionKeys().size() > 0) {
Map<String, DataField> fieldMap =
schema.fields().stream()
.collect(Collectors.toMap(DataField::name,
Function.identity()));
@@ -495,6 +498,21 @@ public class HiveCatalog extends AbstractCatalog {
}
sd.setCols(normalFields);
} else {
+ if (options.tagToPartitionField() != null) {
+ // map a non-partitioned table to a partitioned table
+ // partition field is tag field which is offered by user
+ checkArgument(
+ schema.partitionKeys().isEmpty(),
+ "Partition table can not use
timeTravelToPartitionField.");
+ table.setPartitionKeys(
+ Collections.singletonList(
+ convertToFieldSchema(
+ new DataField(
+ 0,
+ options.tagToPartitionField(),
+ DataTypes.STRING()))));
+ }
+
sd.setCols(
schema.fields().stream()
.map(this::convertToFieldSchema)
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 66bada2da..d24944b34 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -60,10 +60,12 @@ public class HiveMetastoreClient implements MetastoreClient
{
@Override
public void addPartition(BinaryRow partition) throws Exception {
- LinkedHashMap<String, String> partitionMap =
- partitionComputer.generatePartValues(partition);
- List<String> partitionValues = new ArrayList<>(partitionMap.values());
+ addPartition(partitionComputer.generatePartValues(partition));
+ }
+ @Override
+ public void addPartition(LinkedHashMap<String, String> partitionSpec)
throws Exception {
+ List<String> partitionValues = new ArrayList<>(partitionSpec.values());
try {
client.getPartition(
identifier.getDatabaseName(), identifier.getObjectName(),
partitionValues);
@@ -74,7 +76,7 @@ public class HiveMetastoreClient implements MetastoreClient {
newSd.setLocation(
sd.getLocation()
+ "/"
- +
PartitionPathUtils.generatePartitionPath(partitionMap));
+ +
PartitionPathUtils.generatePartitionPath(partitionSpec));
Partition hivePartition = new Partition();
hivePartition.setDbName(identifier.getDatabaseName());
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
index 431ca9da4..3e5671878 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
@@ -59,6 +59,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Column names, types and comments of a Hive table. */
public class HiveSchema {
@@ -152,24 +154,32 @@ public class HiveSchema {
"Extract schema with exists DDL and exists paimon table,
table location:[{}].",
location);
+ TableSchema paimonSchema = tableSchema.get();
+ String tagToPartField =
+
paimonSchema.options().get(CoreOptions.METASTORE_TAG_TO_PARTITION.key());
+
boolean isPartitionedTable =
partitionTypeInfos.size() > 0
// for some Hive compatible system
||
properties.containsKey("TABLE_TOTAL_PARTITIONS");
- checkFieldsMatched(columnNames, typeInfos, tableSchema.get(),
isPartitionedTable);
- checkPartitionMatched(partitionKeys, partitionTypeInfos,
tableSchema.get());
+ checkFieldsMatched(columnNames, typeInfos, paimonSchema,
isPartitionedTable);
+ checkPartitionMatched(partitionKeys, partitionTypeInfos,
paimonSchema, tagToPartField);
// Use paimon table data types and column comments when the paimon
table exists.
// Using paimon data types first because hive's
TypeInfoFactory.timestampTypeInfo
// doesn't contain precision and thus may cause casting problems
Map<String, DataField> paimonFields =
- tableSchema.get().fields().stream()
+ paimonSchema.fields().stream()
.collect(
Collectors.toMap(
dataField ->
dataField.name().toLowerCase(),
Function.identity()));
for (int i = 0; i < columnNames.size(); i++) {
String columnName = columnNames.get(i).toLowerCase();
+ if (Objects.equals(columnName, tagToPartField)) {
+ // ignore tagToPartField, it should just be a string type
+ continue;
+ }
dataTypes.set(i, paimonFields.get(columnName).type());
comments.set(i, paimonFields.get(columnName).description());
}
@@ -267,11 +277,18 @@ public class HiveSchema {
private static void checkPartitionMatched(
List<String> hivePartitionKeys,
List<TypeInfo> hivePartitionTypeInfos,
- TableSchema tableSchema) {
+ TableSchema tableSchema,
+ @Nullable String tagToPartField) {
if (hivePartitionKeys.isEmpty()) {
// only partitioned Hive table needs to consider this part
return;
}
+ if (tagToPartField != null) {
+ // fast check path for tagToPartField
+ checkArgument(tableSchema.partitionKeys().isEmpty());
+
checkArgument(hivePartitionKeys.equals(Collections.singletonList(tagToPartField)));
+ return;
+ }
List<String> schemaPartitionKeys = tableSchema.partitionKeys();
List<TypeInfo> schemaPartitionTypeInfos =
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index ed9848a67..314b50d70 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.paimon.hive.mapred;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.RowDataContainer;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -25,7 +26,9 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.PartitionPathUtils;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -38,15 +41,20 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import static java.util.Collections.singletonMap;
+import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
import static org.apache.paimon.hive.utils.HiveUtils.createFileStoreTable;
import static org.apache.paimon.hive.utils.HiveUtils.createPredicate;
@@ -70,19 +78,37 @@ public class PaimonInputFormat implements InputFormat<Void,
RowDataContainer> {
// the location of Hive.
String locations = jobConf.get(FileInputFormat.INPUT_DIR);
+ @Nullable String tagToPartField =
table.coreOptions().tagToPartitionField();
+ @Nullable TagPreview tagPreview =
TagPreview.create(table.coreOptions());
+
List<PaimonInputSplit> splits = new ArrayList<>();
// locations may contain multiple partitions
for (String location : locations.split(",")) {
- List<Predicate> predicatePerPartition = new
ArrayList<>(predicates);
- createPartitionPredicate(
- table.schema().logicalRowType(),
- table.schema().partitionKeys(),
- location)
- .ifPresent(predicatePerPartition::add);
-
- InnerTableScan scan = table.newScan();
- if (predicatePerPartition.size() > 0) {
- scan.withFilter(PredicateBuilder.and(predicatePerPartition));
+ InnerTableScan scan;
+ if (tagToPartField != null) {
+ // the location should be pruned by partition predicate
+ // we can extract tag name from location, and use time travel
to scan
+ String tag = extractTagName(location, tagToPartField);
+ Map<String, String> dynamicOptions =
+ tagPreview == null
+ ? singletonMap(SCAN_TAG_NAME.key(), tag)
+ : tagPreview.timeTravel(table, tag);
+ scan = table.copy(dynamicOptions).newScan();
+ if (predicates.size() > 0) {
+ scan.withFilter(PredicateBuilder.and(predicates));
+ }
+ } else {
+ List<Predicate> predicatePerPartition = new
ArrayList<>(predicates);
+ createPartitionPredicate(
+ table.schema().logicalRowType(),
+ table.schema().partitionKeys(),
+ location)
+ .ifPresent(predicatePerPartition::add);
+
+ scan = table.newScan();
+ if (predicatePerPartition.size() > 0) {
+
scan.withFilter(PredicateBuilder.and(predicatePerPartition));
+ }
}
scan.plan()
.splits()
@@ -129,7 +155,8 @@ public class PaimonInputFormat implements InputFormat<Void,
RowDataContainer> {
split,
paimonColumns,
getHiveColumns(jobConf).orElse(paimonColumns),
- Arrays.asList(getSelectedColumns(jobConf)));
+ Arrays.asList(getSelectedColumns(jobConf)),
+ table.coreOptions().tagToPartitionField());
}
private Optional<List<String>> getHiveColumns(JobConf jobConf) {
@@ -156,4 +183,11 @@ public class PaimonInputFormat implements
InputFormat<Void, RowDataContainer> {
.distinct()
.toArray(String[]::new);
}
+
+ /** Extract tag name from location, partition field should be tag name. */
+ public static String extractTagName(String location, String
tagToPartField) {
+ LinkedHashMap<String, String> partSpec =
+ PartitionPathUtils.extractPartitionSpecFromPath(new
Path(location));
+ return partSpec.get(tagToPartField);
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
index b50feacdf..4e30b83c7 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
@@ -19,7 +19,10 @@
package org.apache.paimon.hive.mapred;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.hive.RowDataContainer;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.source.ReadBuilder;
@@ -47,6 +50,7 @@ public class PaimonRecordReader implements RecordReader<Void,
RowDataContainer>
// project from selectedColumns to hiveColumns
@Nullable private final ProjectedRow reusedProjectedRow;
+ @Nullable private final JoinedRow addTagToPartFieldRow;
private float progress;
@@ -60,7 +64,8 @@ public class PaimonRecordReader implements RecordReader<Void,
RowDataContainer>
PaimonInputSplit split,
List<String> paimonColumns,
List<String> hiveColumns,
- List<String> selectedColumns)
+ List<String> selectedColumns,
+ @Nullable String tagToPartField)
throws IOException {
if (!paimonColumns.equals(selectedColumns)) {
readBuilder.withProjection(
@@ -78,6 +83,16 @@ public class PaimonRecordReader implements
RecordReader<Void, RowDataContainer>
this.iterator =
new
RecordReaderIterator<>(readBuilder.newRead().createReader(split.split()));
this.splitLength = split.getLength();
+ if (tagToPartField != null) {
+ // in case of reading partition field
+ // add last field (partition field from tag name) to row
+ String tag =
+
PaimonInputFormat.extractTagName(split.getPath().getName(), tagToPartField);
+ addTagToPartFieldRow = new JoinedRow();
+ addTagToPartFieldRow.replace(null,
GenericRow.of(BinaryString.fromString(tag)));
+ } else {
+ addTagToPartFieldRow = null;
+ }
this.progress = 0;
}
@@ -94,6 +109,9 @@ public class PaimonRecordReader implements
RecordReader<Void, RowDataContainer>
} else {
value.set(rowData);
}
+ if (addTagToPartFieldRow != null) {
+ value.set(addTagToPartFieldRow.replace(value.get(),
addTagToPartFieldRow.row2()));
+ }
return true;
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
index af7f7a051..a97f283ad 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
@@ -36,8 +36,10 @@ import org.apache.hadoop.mapred.JobConf;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
@@ -58,15 +60,28 @@ public class HiveUtils {
if (sarg == null) {
return Optional.empty();
}
+ Set<String> readColumnNames = null;
+ if (limitToReadColumnNames) {
+ readColumnNames =
+
Arrays.stream(ColumnProjectionUtils.getReadColumnNames(jobConf))
+ .collect(Collectors.toSet());
+ }
+ String tagToPartField =
+
tableSchema.options().get(CoreOptions.METASTORE_TAG_TO_PARTITION.key());
+ if (tagToPartField != null) {
+ // exclude tagToPartField, this should be done in Hive partition
prune
+ // cannot find the field in paimon table schema
+ if (readColumnNames == null) {
+ readColumnNames = new HashSet<>(tableSchema.fieldNames());
+ }
+ readColumnNames.remove(tagToPartField);
+ }
SearchArgumentToPredicateConverter converter =
new SearchArgumentToPredicateConverter(
sarg,
tableSchema.fieldNames(),
tableSchema.logicalRowType().getFieldTypes(),
- limitToReadColumnNames
- ?
Arrays.stream(ColumnProjectionUtils.getReadColumnNames(jobConf))
- .collect(Collectors.toSet())
- : null);
+ readColumnNames);
return converter.convert();
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index f31463cbf..c1b010d9f 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -21,11 +21,13 @@ package org.apache.paimon.hive;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.DataCatalogTable;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.hive.annotation.Minio;
import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
import org.apache.paimon.s3.MinioTestContainer;
+import org.apache.paimon.table.Table;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
@@ -35,6 +37,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -909,6 +912,88 @@ public abstract class HiveCatalogITCaseBase {
.containsExactlyInAnyOrder("1\t10", "2\t20");
}
+ @Test
+ public void testAddPartitionsForTag() throws Exception {
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE TABLE t (",
+ " k INT,",
+ " v BIGINT,",
+ " PRIMARY KEY (k) NOT ENFORCED",
+ ") WITH (",
+ " 'bucket' = '2',",
+ " 'metastore.tag-to-partition' = 'dt'",
+ ")"));
+ tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
+
+ // TODO modify to CALL after Flink 1.18
+ Table table =
+ ((DataCatalogTable)
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .get()
+ .getTable(new
ObjectPath(tEnv.getCurrentDatabase(), "t")))
+ .table();
+ table.createTag("2023-10-16", 1);
+
+ assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+ .containsExactlyInAnyOrder("dt=2023-10-16");
+
+ assertThat(hiveShell.executeQuery("SELECT k, v FROM t WHERE
dt='2023-10-16'"))
+ .containsExactlyInAnyOrder("1\t10", "2\t20");
+
+ assertThat(hiveShell.executeQuery("SELECT * FROM t WHERE
dt='2023-10-16'"))
+ .containsExactlyInAnyOrder("1\t10\t2023-10-16",
"2\t20\t2023-10-16");
+
+ // another tag
+
+ tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
+ table.createTag("2023-10-17", 2);
+
+ assertThat(hiveShell.executeQuery("SELECT * FROM t"))
+ .containsExactlyInAnyOrder(
+ "1\t10\t2023-10-16",
+ "2\t20\t2023-10-16",
+ "1\t10\t2023-10-17",
+ "2\t20\t2023-10-17",
+ "3\t30\t2023-10-17",
+ "4\t40\t2023-10-17");
+ }
+
+ @Test
+ public void testAddPartitionsForTagPreview() throws Exception {
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE TABLE t (",
+ " k INT,",
+ " v BIGINT,",
+ " PRIMARY KEY (k) NOT ENFORCED",
+ ") WITH (",
+ " 'bucket' = '2',",
+ " 'metastore.tag-to-partition' = 'dt',",
+ " 'metastore.tag-to-partition.preview' =
'process-time'",
+ ")"));
+
+ tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
+
+ List<String> result = hiveShell.executeQuery("SHOW PARTITIONS t");
+ assertThat(result).hasSize(1);
+ String tag = result.get(0).split("=")[1];
+
+ assertThat(hiveShell.executeQuery(String.format("SELECT k, v FROM t
WHERE dt='%s'", tag)))
+ .containsExactlyInAnyOrder("1\t10", "2\t20");
+
+ tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
+ if (hiveShell.executeQuery("SHOW PARTITIONS t").size() == 1) {
+ // no new partition
+ assertThat(
+ hiveShell.executeQuery(
+ String.format("SELECT k, v FROM t WHERE
dt='%s'", tag)))
+ .containsExactlyInAnyOrder("1\t10", "2\t20", "3\t30",
"4\t40");
+ }
+ }
+
protected List<Row> collect(String sql) throws Exception {
List<Row> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
index dc2700004..738958869 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
@@ -157,7 +157,8 @@ public class PaimonRecordReaderTest {
new PaimonInputSplit(tempDir.toString(), dataSplit),
originalColumns,
originalColumns,
- selectedColumns);
+ selectedColumns,
+ null);
}
}
throw new IllegalArgumentException(