This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 28ebec5bb [hive] Introduce metastore.tag-to-partition for Hive 
metastore (#2134)
28ebec5bb is described below

commit 28ebec5bb1ca7cf313d7271273c7568219e7a28e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Oct 25 14:58:37 2023 +0800

    [hive] Introduce metastore.tag-to-partition for Hive metastore (#2134)
---
 docs/content/{project => migration}/_index.md      |   7 +-
 docs/content/migration/upsert-to-partitioned.md    | 174 +++++++++++++++++
 docs/content/project/_index.md                     |   2 +-
 .../shortcodes/generated/core_configuration.html   |  24 +++
 .../generated/spark_connector_configuration.html   |  12 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  58 +++++-
 .../java/org/apache/paimon/data/JoinedRow.java     |   8 +
 .../paimon/metastore/AddPartitionTagCallback.java  |  51 +++++
 .../apache/paimon/metastore/MetastoreClient.java   |   3 +
 .../paimon/metastore/TagPreviewCommitCallback.java |  52 +++++
 .../paimon/table/AbstractFileStoreTable.java       |  71 +++++--
 .../sink/TagCallback.java}                         |  21 +-
 .../org/apache/paimon/tag/TagAutoCreation.java     | 216 +--------------------
 .../org/apache/paimon/tag/TagPeriodHandler.java    | 177 +++++++++++++++++
 .../java/org/apache/paimon/tag/TagPreview.java     |  94 +++++++++
 .../org/apache/paimon/tag/TagTimeExtractor.java    |  91 +++++++++
 .../apache/paimon/utils/PartitionPathUtils.java    |  56 ++++++
 .../org/apache/paimon/utils/SnapshotManager.java   |  17 +-
 .../paimon/catalog/PrimaryKeyTableTestBase.java    |   6 +
 .../org/apache/paimon/tag/TagAutoCreationTest.java |   5 -
 .../java/org/apache/paimon/tag/TagPreviewTest.java | 101 ++++++++++
 .../apache/paimon/utils/SnapshotManagerTest.java   |   3 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   |  20 +-
 .../apache/paimon/hive/HiveMetastoreClient.java    |  10 +-
 .../java/org/apache/paimon/hive/HiveSchema.java    |  25 ++-
 .../paimon/hive/mapred/PaimonInputFormat.java      |  56 ++++--
 .../paimon/hive/mapred/PaimonRecordReader.java     |  20 +-
 .../org/apache/paimon/hive/utils/HiveUtils.java    |  23 ++-
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  |  85 ++++++++
 .../paimon/hive/mapred/PaimonRecordReaderTest.java |   3 +-
 30 files changed, 1205 insertions(+), 286 deletions(-)

diff --git a/docs/content/project/_index.md b/docs/content/migration/_index.md
similarity index 87%
copy from docs/content/project/_index.md
copy to docs/content/migration/_index.md
index dca9a5637..3da339e38 100644
--- a/docs/content/project/_index.md
+++ b/docs/content/migration/_index.md
@@ -1,10 +1,9 @@
 ---
-title: Project
-icon: <i class="fa fa-sitemap title maindish" aria-hidden="true"></i>
+title: Migration
+icon: <i class="fa fa-briefcase title maindish" aria-hidden="true"></i>
 bold: true
 bookCollapseSection: true
-sectionBreak: true
-weight: 7
+weight: 8
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/migration/upsert-to-partitioned.md 
b/docs/content/migration/upsert-to-partitioned.md
new file mode 100644
index 000000000..8b8830c8f
--- /dev/null
+++ b/docs/content/migration/upsert-to-partitioned.md
@@ -0,0 +1,174 @@
+---
+title: "Upsert To Partitioned"
+weight: 1
+type: docs
+aliases:
+- /migration/upsert-to-partitioned.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Upsert To Partitioned
+
+The [Tag Management]({{< ref "maintenance/manage-tags" >}}) will maintain the 
manifests and data files of the snapshot.
+A typical usage is creating tags daily, then you can maintain the historical 
data of each day for batch reading.
+
+When using primary key tables, a non-partitioned approach is often used to 
maintain updates, in order to mirror and
+synchronize tables from upstream database tables. This allows users to query 
the latest data. The tradition of Hive
+data warehouses is not like this. Offline data warehouses require an immutable 
view every day to ensure the idempotence
+of calculations. So we created a Tag mechanism to output these views.
+
+However, the traditional use of Hive data warehouses is more accustomed to 
using partitions to specify the query's Tag,
+and is more accustomed to using Hive computing engines.
+
+So, we introduce `'metastore.tag-to-partition'` and 
`'metastore.tag-to-partition.preview'` to mapping a non-partitioned
+primary key table to the partition table in Hive metastore, and mapping the 
partition field to the name of the Tag to be
+fully compatible with Hive.
+
+## Example for Tag to Partition
+
+**Step 1: Create table and tag in Flink SQL**
+
+{{< tabs "Create table and tag in Flink SQL 1" >}}
+{{< tab "Flink" >}}
+```sql
+CREATE CATALOG my_hive WITH (
+    'type' = 'paimon',
+    'metastore' = 'hive',
+    'uri' = 'thrift://<hive-metastore-host-name>:<port>',
+    -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
+    -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos 
environment
+    'warehouse' = 'hdfs:///path/to/warehouse'
+);
+
+USE CATALOG my_hive;
+
+CREATE TABLE mydb.T (
+    pk INT,
+    col1 STRING,
+    col2 STRING
+) WITH (
+    'bucket' = '-1',
+    'metastore.tag-to-partition' = 'dt'
+);
+
+INSERT INTO t VALUES (1, '10', '100'), (2, '20', '200');
+
+-- create tag '2023-10-16' for snapshot 1
+CALL sys.create_tag('mydb.T', '2023-10-16', 1);
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+**Step 2: Query table in Hive with Partition Pruning**
+
+{{< tabs "Query table in Hive with Partition Pruning 1" >}}
+{{< tab "Hive" >}}
+```sql
+SHOW PARTITIONS T;
+/*
+OK
+dt=2023-10-16
+*/
+
+SELECT * FROM T WHERE dt='2023-10-16';
+/*
+OK
+1 10 100 2023-10-16
+2 20 200 2023-10-16
+*/
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+## Example for Tag Preview
+
+The above example can only query tags that have already been created, but 
Paimon is a real-time data lake, and you also
+need to query the latest data. Therefore, Paimon provides a preview feature:
+
+**Step 1: Create table and tag in Flink SQL**
+
+{{< tabs "Create table and tag in Flink SQL 2" >}}
+{{< tab "Flink" >}}
+```sql
+CREATE CATALOG my_hive WITH (
+    'type' = 'paimon',
+    'metastore' = 'hive',
+    'uri' = 'thrift://<hive-metastore-host-name>:<port>',
+    -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
+    -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos 
environment
+    'warehouse' = 'hdfs:///path/to/warehouse'
+);
+
+USE CATALOG my_hive;
+
+CREATE TABLE mydb.T (
+    pk INT,
+    col1 STRING,
+    col2 STRING
+) WITH (
+    'bucket' = '-1',
+    'metastore.tag-to-partition' = 'dt',
+    -- preview tag creation mode process-time
+    -- paimon will create partitions early based on process-time
+    'metastore.tag-to-partition.preview' = 'process-time'
+);
+
+INSERT INTO t VALUES (1, '10', '100'), (2, '20', '200');
+
+-- create tag '2023-10-16' for snapshot 1
+CALL sys.create_tag('mydb.T', '2023-10-16', 1);
+
+-- new data in '2023-10-17'
+INSERT INTO t VALUES (3, '30', '300'), (4, '40', '400');
+
+-- haven't finished writing the data for '2023-10-17' yet, so there's no need 
to create a tag for now
+-- but the data is already visible for Hive
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+**Step 2: Query table in Hive with Partition Pruning**
+
+{{< tabs "Query table in Hive with Partition Pruning 2" >}}
+{{< tab "Hive" >}}
+```sql
+SHOW PARTITIONS T;
+/*
+OK
+dt=2023-10-16
+dt=2023-10-17
+*/
+
+SELECT * FROM T WHERE dt='2023-10-17';
+-- preview tag '2023-10-17'
+/*
+OK
+1 10 100 2023-10-17
+2 20 200 2023-10-17
+3 30 300 2023-10-17
+4 40 400 2023-10-17
+*/
+```
+
+{{< /tab >}}
+{{< /tabs >}}
diff --git a/docs/content/project/_index.md b/docs/content/project/_index.md
index dca9a5637..84aee3057 100644
--- a/docs/content/project/_index.md
+++ b/docs/content/project/_index.md
@@ -4,7 +4,7 @@ icon: <i class="fa fa-sitemap title maindish" 
aria-hidden="true"></i>
 bold: true
 bookCollapseSection: true
 sectionBreak: true
-weight: 7
+weight: 9
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 253e6414c..0be1a41c6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -305,6 +305,18 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
 For example, if you want to list all partitions of a Paimon table in Hive, you 
need to create this table as a partitioned table in Hive metastore.
 This config option does not affect the default filesystem metastore.</td>
         </tr>
+        <tr>
+            <td><h5>metastore.tag-to-partition</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Whether to create this table as a partitioned table for 
mapping non-partitioned table tags in metastore. This allows the Hive engine to 
view this table in a partitioned table view and use partitioning field to read 
specific partitions (specific tags).</td>
+        </tr>
+        <tr>
+            <td><h5>metastore.tag-to-partition.preview</h5></td>
+            <td style="word-wrap: break-word;">none</td>
+            <td><p>Enum</p></td>
+            <td>Whether to preview tag of generated snapshots in metastore. 
This allows the Hive engine to query specific tag before creation.<br /><br 
/>Possible values:<ul><li>"none": No automatically created 
tags.</li><li>"process-time": Based on the time of the machine, create TAG once 
the processing time passes period time plus delay.</li><li>"watermark": Based 
on the watermark of the input, create TAG once the watermark passes period time 
plus delay.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>num-levels</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -557,6 +569,18 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td><p>Enum</p></td>
             <td>Whether to create tag automatically. And how to generate 
tags.<br /><br />Possible values:<ul><li>"none": No automatically created 
tags.</li><li>"process-time": Based on the time of the machine, create TAG once 
the processing time passes period time plus delay.</li><li>"watermark": Based 
on the watermark of the input, create TAG once the watermark passes period time 
plus delay.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>tag.callback.#.param</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Parameter string for the constructor of class #. Callback 
class should parse the parameter by itself.</td>
+        </tr>
+        <tr>
+            <td><h5>tag.callbacks</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A list of commit callback classes to be called after a 
successful tag. Class names are connected with comma (example: 
com.test.CallbackA,com.sample.CallbackB).</td>
+        </tr>
         <tr>
             <td><h5>tag.creation-delay</h5></td>
             <td style="word-wrap: break-word;">0 ms</td>
diff --git 
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html 
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 2e6bf03e4..00ca2ba17 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -26,6 +26,12 @@ under the License.
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>read.changelog</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to read row in the form of changelog (add rowkind 
column in row to represent its change type).</td>
+        </tr>
         <tr>
             <td><h5>read.stream.maxBytesPerTrigger</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -56,12 +62,6 @@ under the License.
             <td>Long</td>
             <td>The minimum number of rows returned in a single batch, which 
used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs 
together.</td>
         </tr>
-        <tr>
-            <td><h5>read.changelog</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to read row in the form of changelog (add rowkind 
column in row to represent its change type).</td>
-        </tr>
         <tr>
             <td><h5>write.merge-schema</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 84890acbf..88243fc6b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -33,6 +33,8 @@ import org.apache.paimon.options.description.InlineElement;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.time.Duration;
@@ -790,6 +792,23 @@ public class CoreOptions implements Serializable {
                             "Parameter string for the constructor of class #. "
                                     + "Callback class should parse the 
parameter by itself.");
 
+    public static final ConfigOption<String> TAG_CALLBACKS =
+            key("tag.callbacks")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            "A list of commit callback classes to be called 
after a successful tag. "
+                                    + "Class names are connected with comma "
+                                    + "(example: 
com.test.CallbackA,com.sample.CallbackB).");
+
+    public static final ConfigOption<String> TAG_CALLBACK_PARAM =
+            key("tag.callback.#.param")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Parameter string for the constructor of class #. "
+                                    + "Callback class should parse the 
parameter by itself.");
+
     public static final ConfigOption<Boolean> METASTORE_PARTITIONED_TABLE =
             key("metastore.partitioned-table")
                     .booleanType()
@@ -800,6 +819,23 @@ public class CoreOptions implements Serializable {
                                     + "you need to create this table as a 
partitioned table in Hive metastore.\n"
                                     + "This config option does not affect the 
default filesystem metastore.");
 
+    public static final ConfigOption<String> METASTORE_TAG_TO_PARTITION =
+            key("metastore.tag-to-partition")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Whether to create this table as a partitioned 
table for mapping non-partitioned table tags in metastore. "
+                                    + "This allows the Hive engine to view 
this table in a partitioned table view and "
+                                    + "use partitioning field to read specific 
partitions (specific tags).");
+
+    public static final ConfigOption<TagCreationMode> 
METASTORE_TAG_TO_PARTITION_PREVIEW =
+            key("metastore.tag-to-partition.preview")
+                    .enumType(TagCreationMode.class)
+                    .defaultValue(TagCreationMode.NONE)
+                    .withDescription(
+                            "Whether to preview tag of generated snapshots in 
metastore. "
+                                    + "This allows the Hive engine to query 
specific tag before creation.");
+
     public static final ConfigOption<TagCreationMode> TAG_AUTOMATIC_CREATION =
             key("tag.automatic-creation")
                     .enumType(TagCreationMode.class)
@@ -1270,6 +1306,15 @@ public class CoreOptions implements Serializable {
         return options.get(METASTORE_PARTITIONED_TABLE);
     }
 
+    @Nullable
+    public String tagToPartitionField() {
+        return options.get(METASTORE_TAG_TO_PARTITION);
+    }
+
+    public TagCreationMode tagToPartitionPreview() {
+        return options.get(METASTORE_TAG_TO_PARTITION_PREVIEW);
+    }
+
     public TagCreationMode tagCreationMode() {
         return options.get(TAG_AUTOMATIC_CREATION);
     }
@@ -1305,14 +1350,23 @@ public class CoreOptions implements Serializable {
     }
 
     public Map<String, String> commitCallbacks() {
+        return callbacks(COMMIT_CALLBACKS, COMMIT_CALLBACK_PARAM);
+    }
+
+    public Map<String, String> tagCallbacks() {
+        return callbacks(TAG_CALLBACKS, TAG_CALLBACK_PARAM);
+    }
+
+    private Map<String, String> callbacks(
+            ConfigOption<String> callbacks, ConfigOption<String> 
callbackParam) {
         Map<String, String> result = new HashMap<>();
-        for (String className : options.get(COMMIT_CALLBACKS).split(",")) {
+        for (String className : options.get(callbacks).split(",")) {
             className = className.trim();
             if (className.length() == 0) {
                 continue;
             }
 
-            String param = 
options.get(COMMIT_CALLBACK_PARAM.key().replace("#", className));
+            String param = options.get(callbackParam.key().replace("#", 
className));
             result.put(className, param);
         }
         return result;
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java 
b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
index e9126cd7c..526886598 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
@@ -87,6 +87,14 @@ public class JoinedRow implements InternalRow {
         return this;
     }
 
+    public InternalRow row1() {
+        return row1;
+    }
+
+    public InternalRow row2() {
+        return row2;
+    }
+
     // 
---------------------------------------------------------------------------------------------
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
new file mode 100644
index 000000000..33f5ed5a9
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+import org.apache.paimon.table.sink.TagCallback;
+
+import java.util.LinkedHashMap;
+
+/** A {@link TagCallback} to add newly created partitions to metastore. */
+public class AddPartitionTagCallback implements TagCallback {
+
+    private final MetastoreClient client;
+    private final String partitionField;
+
+    public AddPartitionTagCallback(MetastoreClient client, String 
partitionField) {
+        this.client = client;
+        this.partitionField = partitionField;
+    }
+
+    @Override
+    public void notifyCreation(String tagName) {
+        LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
+        partitionSpec.put(partitionField, tagName);
+        try {
+            client.addPartition(partitionSpec);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        client.close();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java 
b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
index 2ad601d2e..615e78330 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
@@ -21,6 +21,7 @@ package org.apache.paimon.metastore;
 import org.apache.paimon.data.BinaryRow;
 
 import java.io.Serializable;
+import java.util.LinkedHashMap;
 
 /**
  * A metastore client related to a table. All methods of this interface 
operate on the same specific
@@ -30,6 +31,8 @@ public interface MetastoreClient extends AutoCloseable {
 
     void addPartition(BinaryRow partition) throws Exception;
 
+    void addPartition(LinkedHashMap<String, String> partitionSpec) throws 
Exception;
+
     /** Factory to create {@link MetastoreClient}. */
     interface Factory extends Serializable {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
new file mode 100644
index 000000000..e0aa8597b
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.tag.TagPreview;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link CommitCallback} to add partitions to metastore for tag preview. */
+public class TagPreviewCommitCallback implements CommitCallback {
+
+    private final AddPartitionTagCallback tagCallback;
+    private final TagPreview tagPreview;
+
+    public TagPreviewCommitCallback(AddPartitionTagCallback tagCallback, 
TagPreview tagPreview) {
+        this.tagCallback = tagCallback;
+        this.tagPreview = tagPreview;
+    }
+
+    @Override
+    public void call(List<ManifestCommittable> committables) {
+        long currentMillis = System.currentTimeMillis();
+        for (ManifestCommittable c : committables) {
+            Optional<String> tagOptional = 
tagPreview.extractTag(currentMillis, c.watermark());
+            tagOptional.ifPresent(tagCallback::notifyCreation);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        tagCallback.close();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 875e2fed7..596b62109 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -26,6 +26,9 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.metastore.AddPartitionCommitCallback;
+import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.metastore.TagPreviewCommitCallback;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.Options;
@@ -39,6 +42,7 @@ import 
org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
 import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerStreamTableScanImpl;
@@ -51,6 +55,8 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
+import org.apache.paimon.tag.TagPreview;
+import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -58,6 +64,7 @@ import org.apache.paimon.utils.TagManager;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -267,20 +274,53 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     private List<CommitCallback> createCommitCallbacks() {
         List<CommitCallback> callbacks = new 
ArrayList<>(loadCommitCallbacks());
-        if (coreOptions().partitionedTableInMetastore()
-                && catalogEnvironment.metastoreClientFactory() != null
+        CoreOptions options = coreOptions();
+        MetastoreClient.Factory metastoreClientFactory =
+                catalogEnvironment.metastoreClientFactory();
+        if (options.partitionedTableInMetastore()
+                && metastoreClientFactory != null
                 && tableSchema.partitionKeys().size() > 0) {
+            callbacks.add(new 
AddPartitionCommitCallback(metastoreClientFactory.create()));
+        }
+        TagPreview tagPreview = TagPreview.create(options);
+        if (options.tagToPartitionField() != null
+                && tagPreview != null
+                && metastoreClientFactory != null
+                && tableSchema.partitionKeys().isEmpty()) {
+            TagPreviewCommitCallback callback =
+                    new TagPreviewCommitCallback(
+                            new AddPartitionTagCallback(
+                                    metastoreClientFactory.create(), 
options.tagToPartitionField()),
+                            tagPreview);
+            callbacks.add(callback);
+        }
+        return callbacks;
+    }
+
+    private List<TagCallback> createTagCallbacks() {
+        List<TagCallback> callbacks = new ArrayList<>(loadTagCallbacks());
+        String partitionField = coreOptions().tagToPartitionField();
+        MetastoreClient.Factory metastoreClientFactory =
+                catalogEnvironment.metastoreClientFactory();
+        if (partitionField != null && metastoreClientFactory != null) {
             callbacks.add(
-                    new AddPartitionCommitCallback(
-                            
catalogEnvironment.metastoreClientFactory().create()));
+                    new 
AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));
         }
         return callbacks;
     }
 
+    private List<TagCallback> loadTagCallbacks() {
+        return loadCallbacks(coreOptions().tagCallbacks(), TagCallback.class);
+    }
+
     private List<CommitCallback> loadCommitCallbacks() {
-        List<CommitCallback> result = new ArrayList<>();
+        return loadCallbacks(coreOptions().commitCallbacks(), 
CommitCallback.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> List<T> loadCallbacks(Map<String, String> clazzParamMaps, 
Class<T> expectClass) {
+        List<T> result = new ArrayList<>();
 
-        Map<String, String> clazzParamMaps = coreOptions().commitCallbacks();
         for (Map.Entry<String, String> classParamEntry : 
clazzParamMaps.entrySet()) {
             String className = classParamEntry.getKey();
             String param = classParamEntry.getValue();
@@ -293,15 +333,14 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
             }
 
             Preconditions.checkArgument(
-                    CommitCallback.class.isAssignableFrom(clazz),
-                    "Class " + clazz + " must implement " + 
CommitCallback.class);
+                    expectClass.isAssignableFrom(clazz),
+                    "Class " + clazz + " must implement " + expectClass);
 
             try {
                 if (param == null) {
-                    result.add((CommitCallback) clazz.newInstance());
+                    result.add((T) clazz.newInstance());
                 } else {
-                    result.add(
-                            (CommitCallback) 
clazz.getConstructor(String.class).newInstance(param));
+                    result.add((T) 
clazz.getConstructor(String.class).newInstance(param));
                 }
             } catch (Exception e) {
                 throw new RuntimeException(
@@ -414,6 +453,16 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
         Snapshot snapshot = snapshotManager.snapshot(fromSnapshotId);
         tagManager().createTag(snapshot, tagName);
+
+        List<TagCallback> callbacks = Collections.emptyList();
+        try {
+            callbacks = createTagCallbacks();
+            callbacks.forEach(callback -> callback.notifyCreation(tagName));
+        } finally {
+            for (TagCallback tagCallback : callbacks) {
+                IOUtils.closeQuietly(tagCallback);
+            }
+        }
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
similarity index 63%
copy from 
paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
copy to paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
index 2ad601d2e..397b341d9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
@@ -16,23 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.metastore;
-
-import org.apache.paimon.data.BinaryRow;
-
-import java.io.Serializable;
+package org.apache.paimon.table.sink;
 
 /**
- * A metastore client related to a table. All methods of this interface 
operate on the same specific
- * table.
+ * This callback will be called after tag operations.
+ *
+ * <p>NOTE: No guarantee that this callback must be called.
  */
-public interface MetastoreClient extends AutoCloseable {
-
-    void addPartition(BinaryRow partition) throws Exception;
-
-    /** Factory to create {@link MetastoreClient}. */
-    interface Factory extends Serializable {
+public interface TagCallback extends AutoCloseable {
 
-        MetastoreClient create();
-    }
+    void notifyCreation(String tagName);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 81014ef6d..6ec7f1cf2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -20,7 +20,6 @@ package org.apache.paimon.tag;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -28,54 +27,20 @@ import org.apache.paimon.utils.TagManager;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
-import java.time.Instant;
-import java.time.LocalDate;
 import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.format.ResolverStyle;
-import java.time.format.SignStyle;
 import java.util.Optional;
 import java.util.SortedMap;
 
-import static java.time.temporal.ChronoField.DAY_OF_MONTH;
-import static java.time.temporal.ChronoField.HOUR_OF_DAY;
-import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
-import static java.time.temporal.ChronoField.YEAR;
 import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
 import static 
org.apache.paimon.shade.guava30.com.google.common.base.MoreObjects.firstNonNull;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** A manager to create tags automatically. */
 public class TagAutoCreation {
 
-    private static final DateTimeFormatter HOUR_FORMATTER =
-            new DateTimeFormatterBuilder()
-                    .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
-                    .appendLiteral('-')
-                    .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
-                    .appendLiteral('-')
-                    .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
-                    .appendLiteral(" ")
-                    .appendValue(HOUR_OF_DAY, 2, 2, SignStyle.NORMAL)
-                    .toFormatter()
-                    .withResolverStyle(ResolverStyle.LENIENT);
-
-    private static final DateTimeFormatter DAY_FORMATTER =
-            new DateTimeFormatterBuilder()
-                    .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
-                    .appendLiteral('-')
-                    .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
-                    .appendLiteral('-')
-                    .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
-                    .toFormatter()
-                    .withResolverStyle(ResolverStyle.LENIENT);
-
     private final SnapshotManager snapshotManager;
     private final TagManager tagManager;
     private final TagDeletion tagDeletion;
-    private final TimeExtractor timeExtractor;
+    private final TagTimeExtractor timeExtractor;
     private final TagPeriodHandler periodHandler;
     private final Duration delay;
     private final Integer numRetainedMax;
@@ -87,7 +52,7 @@ public class TagAutoCreation {
             SnapshotManager snapshotManager,
             TagManager tagManager,
             TagDeletion tagDeletion,
-            TimeExtractor timeExtractor,
+            TagTimeExtractor timeExtractor,
             TagPeriodHandler periodHandler,
             Duration delay,
             Integer numRetainedMax) {
@@ -133,7 +98,8 @@ public class TagAutoCreation {
     }
 
     private void tryToTag(Snapshot snapshot) {
-        Optional<LocalDateTime> timeOptional = timeExtractor.extract(snapshot);
+        Optional<LocalDateTime> timeOptional =
+                timeExtractor.extract(snapshot.timeMillis(), 
snapshot.watermark());
         if (!timeOptional.isPresent()) {
             return;
         }
@@ -141,7 +107,7 @@ public class TagAutoCreation {
         LocalDateTime time = timeOptional.get();
         if (nextTag == null
                 || isAfterOrEqual(time.minus(delay), 
periodHandler.nextTagTime(nextTag))) {
-            LocalDateTime thisTag = periodHandler.normalizeToTagTime(time);
+            LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time);
             String tagName = periodHandler.timeToTag(thisTag);
             tagManager.createTag(snapshot, tagName);
             nextTag = periodHandler.nextTagTime(thisTag);
@@ -167,184 +133,22 @@ public class TagAutoCreation {
         return t1.isAfter(t2) || t1.isEqual(t2);
     }
 
-    private interface TimeExtractor {
-
-        Optional<LocalDateTime> extract(Snapshot snapshot);
-    }
-
-    private static class ProcessTimeExtractor implements TimeExtractor {
-
-        @Override
-        public Optional<LocalDateTime> extract(Snapshot snapshot) {
-            return Optional.of(
-                    Instant.ofEpochMilli(snapshot.timeMillis())
-                            .atZone(ZoneId.systemDefault())
-                            .toLocalDateTime());
-        }
-    }
-
-    private static class WatermarkExtractor implements TimeExtractor {
-
-        private final ZoneId watermarkZoneId;
-
-        private WatermarkExtractor(ZoneId watermarkZoneId) {
-            this.watermarkZoneId = watermarkZoneId;
-        }
-
-        @Override
-        public Optional<LocalDateTime> extract(Snapshot snapshot) {
-            Long watermark = snapshot.watermark();
-            if (watermark == null) {
-                return Optional.empty();
-            }
-
-            return Optional.of(
-                    
Instant.ofEpochMilli(watermark).atZone(watermarkZoneId).toLocalDateTime());
-        }
-    }
-
-    private interface TagPeriodHandler {
-
-        void validateDelay(Duration delay);
-
-        LocalDateTime tagToTime(String tag);
-
-        LocalDateTime normalizeToTagTime(LocalDateTime time);
-
-        String timeToTag(LocalDateTime time);
-
-        LocalDateTime nextTagTime(LocalDateTime time);
-    }
-
-    private abstract static class BaseTagPeriodHandler implements 
TagPeriodHandler {
-
-        protected abstract Duration onePeriod();
-
-        protected abstract DateTimeFormatter formatter();
-
-        @Override
-        public void validateDelay(Duration delay) {
-            checkArgument(onePeriod().compareTo(delay) > 0);
-        }
-
-        @Override
-        public LocalDateTime tagToTime(String tag) {
-            return LocalDateTime.parse(tag, formatter());
-        }
-
-        @Override
-        public LocalDateTime normalizeToTagTime(LocalDateTime time) {
-            long mills = Timestamp.fromLocalDateTime(time).getMillisecond();
-            long periodMills = onePeriod().toMillis();
-            LocalDateTime normalized =
-                    Timestamp.fromEpochMillis((mills / periodMills) * 
periodMills)
-                            .toLocalDateTime();
-            return normalized.minus(onePeriod());
-        }
-
-        @Override
-        public String timeToTag(LocalDateTime time) {
-            return time.format(formatter());
-        }
-
-        @Override
-        public LocalDateTime nextTagTime(LocalDateTime time) {
-            return time.plus(onePeriod());
-        }
-    }
-
-    private static class HourlyTagPeriodHandler extends BaseTagPeriodHandler {
-
-        private static final Duration ONE_PERIOD = Duration.ofHours(1);
-
-        @Override
-        protected Duration onePeriod() {
-            return ONE_PERIOD;
-        }
-
-        @Override
-        protected DateTimeFormatter formatter() {
-            return HOUR_FORMATTER;
-        }
-    }
-
-    private static class DailyTagPeriodHandler extends BaseTagPeriodHandler {
-
-        private static final Duration ONE_PERIOD = Duration.ofDays(1);
-
-        @Override
-        protected Duration onePeriod() {
-            return ONE_PERIOD;
-        }
-
-        @Override
-        protected DateTimeFormatter formatter() {
-            return DAY_FORMATTER;
-        }
-
-        @Override
-        public LocalDateTime tagToTime(String tag) {
-            return LocalDate.parse(tag, formatter()).atStartOfDay();
-        }
-    }
-
-    private static class TwoHoursTagPeriodHandler extends BaseTagPeriodHandler 
{
-
-        private static final Duration ONE_PERIOD = Duration.ofHours(2);
-
-        @Override
-        protected Duration onePeriod() {
-            return ONE_PERIOD;
-        }
-
-        @Override
-        protected DateTimeFormatter formatter() {
-            return HOUR_FORMATTER;
-        }
-    }
-
     @Nullable
     public static TagAutoCreation create(
             CoreOptions options,
             SnapshotManager snapshotManager,
             TagManager tagManager,
             TagDeletion tagDeletion) {
-        TimeExtractor timeExtractor;
-        switch (options.tagCreationMode()) {
-            case NONE:
-                return null;
-            case PROCESS_TIME:
-                timeExtractor = new ProcessTimeExtractor();
-                break;
-            case WATERMARK:
-                timeExtractor = new 
WatermarkExtractor(ZoneId.of(options.sinkWatermarkTimeZone()));
-                break;
-            default:
-                throw new UnsupportedOperationException("Unsupported " + 
options.tagCreationMode());
-        }
-
-        TagPeriodHandler periodHandler;
-        switch (options.tagCreationPeriod()) {
-            case DAILY:
-                periodHandler = new DailyTagPeriodHandler();
-                break;
-            case HOURLY:
-                periodHandler = new HourlyTagPeriodHandler();
-                break;
-            case TWO_HOURS:
-                periodHandler = new TwoHoursTagPeriodHandler();
-                break;
-            default:
-                throw new UnsupportedOperationException(
-                        "Unsupported " + options.tagCreationPeriod());
+        TagTimeExtractor extractor = 
TagTimeExtractor.createForAutoTag(options);
+        if (extractor == null) {
+            return null;
         }
-
         return new TagAutoCreation(
                 snapshotManager,
                 tagManager,
                 tagDeletion,
-                timeExtractor,
-                periodHandler,
+                extractor,
+                TagPeriodHandler.create(options),
                 options.tagCreationDelay(),
                 options.tagNumRetainedMax());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
new file mode 100644
index 000000000..bf7af0497
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.Timestamp;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.ResolverStyle;
+import java.time.format.SignStyle;
+
+import static java.time.temporal.ChronoField.DAY_OF_MONTH;
+import static java.time.temporal.ChronoField.HOUR_OF_DAY;
+import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
+import static java.time.temporal.ChronoField.YEAR;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Handle time for tag. */
+public interface TagPeriodHandler {
+
+    DateTimeFormatter HOUR_FORMATTER =
+            new DateTimeFormatterBuilder()
+                    .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+                    .appendLiteral('-')
+                    .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+                    .appendLiteral('-')
+                    .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+                    .appendLiteral(" ")
+                    .appendValue(HOUR_OF_DAY, 2, 2, SignStyle.NORMAL)
+                    .toFormatter()
+                    .withResolverStyle(ResolverStyle.LENIENT);
+
+    DateTimeFormatter DAY_FORMATTER =
+            new DateTimeFormatterBuilder()
+                    .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+                    .appendLiteral('-')
+                    .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+                    .appendLiteral('-')
+                    .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+                    .toFormatter()
+                    .withResolverStyle(ResolverStyle.LENIENT);
+
+    void validateDelay(Duration delay);
+
+    LocalDateTime tagToTime(String tag);
+
+    LocalDateTime normalizeToPreviousTag(LocalDateTime time);
+
+    String timeToTag(LocalDateTime time);
+
+    LocalDateTime nextTagTime(LocalDateTime time);
+
+    /** Base implementation of {@link TagPeriodHandler}. */
+    abstract class BaseTagPeriodHandler implements TagPeriodHandler {
+
+        protected abstract Duration onePeriod();
+
+        protected abstract DateTimeFormatter formatter();
+
+        @Override
+        public void validateDelay(Duration delay) {
+            checkArgument(onePeriod().compareTo(delay) > 0);
+        }
+
+        @Override
+        public LocalDateTime tagToTime(String tag) {
+            return LocalDateTime.parse(tag, formatter());
+        }
+
+        @Override
+        public LocalDateTime normalizeToPreviousTag(LocalDateTime time) {
+            long mills = Timestamp.fromLocalDateTime(time).getMillisecond();
+            long periodMills = onePeriod().toMillis();
+            LocalDateTime normalized =
+                    Timestamp.fromEpochMillis((mills / periodMills) * 
periodMills)
+                            .toLocalDateTime();
+            return normalized.minus(onePeriod());
+        }
+
+        @Override
+        public String timeToTag(LocalDateTime time) {
+            return time.format(formatter());
+        }
+
+        @Override
+        public LocalDateTime nextTagTime(LocalDateTime time) {
+            return time.plus(onePeriod());
+        }
+    }
+
+    /** Hourly {@link TagPeriodHandler}. */
+    class HourlyTagPeriodHandler extends BaseTagPeriodHandler {
+
+        static final Duration ONE_PERIOD = Duration.ofHours(1);
+
+        @Override
+        protected Duration onePeriod() {
+            return ONE_PERIOD;
+        }
+
+        @Override
+        protected DateTimeFormatter formatter() {
+            return HOUR_FORMATTER;
+        }
+    }
+
+    /** Daily {@link TagPeriodHandler}. */
+    class DailyTagPeriodHandler extends BaseTagPeriodHandler {
+
+        static final Duration ONE_PERIOD = Duration.ofDays(1);
+
+        @Override
+        protected Duration onePeriod() {
+            return ONE_PERIOD;
+        }
+
+        @Override
+        protected DateTimeFormatter formatter() {
+            return DAY_FORMATTER;
+        }
+
+        @Override
+        public LocalDateTime tagToTime(String tag) {
+            return LocalDate.parse(tag, formatter()).atStartOfDay();
+        }
+    }
+
+    /** Two Hours {@link TagPeriodHandler}. */
+    class TwoHoursTagPeriodHandler extends BaseTagPeriodHandler {
+
+        static final Duration ONE_PERIOD = Duration.ofHours(2);
+
+        @Override
+        protected Duration onePeriod() {
+            return ONE_PERIOD;
+        }
+
+        @Override
+        protected DateTimeFormatter formatter() {
+            return HOUR_FORMATTER;
+        }
+    }
+
+    static TagPeriodHandler create(CoreOptions options) {
+        switch (options.tagCreationPeriod()) {
+            case DAILY:
+                return new DailyTagPeriodHandler();
+            case HOURLY:
+                return new HourlyTagPeriodHandler();
+            case TWO_HOURS:
+                return new TwoHoursTagPeriodHandler();
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported " + options.tagCreationPeriod());
+        }
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
new file mode 100644
index 000000000..671e8841d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import javax.annotation.Nullable;
+
+import java.time.LocalDateTime;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
+
+/** A class of tag preview to find suitable snapshots. */
+public class TagPreview {
+
+    private final TagTimeExtractor timeExtractor;
+    private final TagPeriodHandler periodHandler;
+
+    private TagPreview(CoreOptions options) {
+        this.timeExtractor = TagTimeExtractor.createForTagPreview(options);
+        this.periodHandler = TagPeriodHandler.create(options);
+    }
+
+    public static TagPreview create(CoreOptions options) {
+        if (options.tagToPartitionPreview() != 
CoreOptions.TagCreationMode.NONE) {
+            return new TagPreview(options);
+        }
+        return null;
+    }
+
+    public Optional<String> extractTag(long timeMilli, @Nullable Long 
watermark) {
+        Optional<LocalDateTime> timeOptional = 
timeExtractor.extract(timeMilli, watermark);
+        if (!timeOptional.isPresent()) {
+            return Optional.empty();
+        }
+        LocalDateTime currentTag =
+                
periodHandler.nextTagTime(periodHandler.normalizeToPreviousTag(timeOptional.get()));
+        String tag = periodHandler.timeToTag(currentTag);
+        return Optional.of(tag);
+    }
+
+    public Map<String, String> timeTravel(FileStoreTable table, String tag) {
+        TagManager tagManager = table.tagManager();
+        if (tagManager.tagExists(tag)) {
+            return singletonMap(SCAN_TAG_NAME.key(), tag);
+        }
+
+        SnapshotManager snapshotManager = table.snapshotManager();
+        Snapshot snapshot =
+                snapshotManager.traversalSnapshotsFromLatestSafely(
+                        s ->
+                                extractTag(s.timeMillis(), s.watermark())
+                                        .map(t -> t.compareTo(tag) <= 0)
+                                        .orElse(false));
+        if (snapshot != null) {
+            return singletonMap(SCAN_SNAPSHOT_ID.key(), 
String.valueOf(snapshot.id()));
+        }
+
+        Optional<String> findTag =
+                tagManager.tags().values().stream()
+                        .filter(t -> t.compareTo(tag) <= 0)
+                        .max(Comparator.naturalOrder());
+        if (findTag.isPresent()) {
+            return singletonMap(SCAN_TAG_NAME.key(), findTag.get());
+        }
+
+        throw new RuntimeException("Cannot find snapshot or tag for tag name: 
" + tag);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
new file mode 100644
index 000000000..dadfde3a7
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Optional;
+
+/** Extractor to extract tag time from {@link Snapshot}. */
+public interface TagTimeExtractor {
+
+    Optional<LocalDateTime> extract(long timeMilli, @Nullable Long watermark);
+
+    /** Extract time from snapshot time millis. */
+    class ProcessTimeExtractor implements TagTimeExtractor {
+
+        @Override
+        public Optional<LocalDateTime> extract(long timeMilli, @Nullable Long 
watermark) {
+            return Optional.of(
+                    Instant.ofEpochMilli(timeMilli)
+                            .atZone(ZoneId.systemDefault())
+                            .toLocalDateTime());
+        }
+    }
+
+    /** Extract time from snapshot watermark. */
+    class WatermarkExtractor implements TagTimeExtractor {
+
+        private final ZoneId watermarkZoneId;
+
+        private WatermarkExtractor(ZoneId watermarkZoneId) {
+            this.watermarkZoneId = watermarkZoneId;
+        }
+
+        @Override
+        public Optional<LocalDateTime> extract(long timeMilli, @Nullable Long 
watermark) {
+            if (watermark == null) {
+                return Optional.empty();
+            }
+
+            return Optional.of(
+                    
Instant.ofEpochMilli(watermark).atZone(watermarkZoneId).toLocalDateTime());
+        }
+    }
+
+    @Nullable
+    static TagTimeExtractor createForAutoTag(CoreOptions options) {
+        return create(options.tagCreationMode(), options);
+    }
+
+    @Nullable
+    static TagTimeExtractor createForTagPreview(CoreOptions options) {
+        return create(options.tagToPartitionPreview(), options);
+    }
+
+    @Nullable
+    static TagTimeExtractor create(CoreOptions.TagCreationMode mode, 
CoreOptions options) {
+        switch (mode) {
+            case NONE:
+                return null;
+            case PROCESS_TIME:
+                return new ProcessTimeExtractor();
+            case WATERMARK:
+                return new 
WatermarkExtractor(ZoneId.of(options.sinkWatermarkTimeZone()));
+            default:
+                throw new UnsupportedOperationException("Unsupported " + 
options.tagCreationMode());
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
index 30e359ffd..7c1a5b6db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
@@ -25,10 +25,14 @@ import java.util.BitSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /** Utils for file system. */
 public class PartitionPathUtils {
 
+    private static final Pattern PARTITION_NAME_PATTERN = 
Pattern.compile("([^/]+)=([^/]+)");
+
     private static final BitSet CHAR_TO_ESCAPE = new BitSet(128);
 
     static {
@@ -151,4 +155,56 @@ public class PartitionPathUtils {
         }
         sb.append(Integer.toHexString(c).toUpperCase());
     }
+
+    public static String unescapePathName(String path) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < path.length(); i++) {
+            char c = path.charAt(i);
+            if (c == '%' && i + 2 < path.length()) {
+                int code = -1;
+                try {
+                    code = Integer.parseInt(path.substring(i + 1, i + 3), 16);
+                } catch (Exception ignored) {
+                }
+                if (code >= 0) {
+                    sb.append((char) code);
+                    i += 2;
+                    continue;
+                }
+            }
+            sb.append(c);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Make partition spec from path.
+     *
+     * @param currPath partition file path.
+     * @return Sequential partition specs.
+     */
+    public static LinkedHashMap<String, String> 
extractPartitionSpecFromPath(Path currPath) {
+        LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>();
+        List<String[]> kvs = new ArrayList<>();
+        do {
+            String component = currPath.getName();
+            Matcher m = PARTITION_NAME_PATTERN.matcher(component);
+            if (m.matches()) {
+                String k = unescapePathName(m.group(1));
+                String v = unescapePathName(m.group(2));
+                String[] kv = new String[2];
+                kv[0] = k;
+                kv[1] = v;
+                kvs.add(kv);
+            }
+            currPath = currPath.getParent();
+        } while (currPath != null && !currPath.getName().isEmpty());
+
+        // reverse the list since we checked the part from leaf dir to table's 
base dir
+        for (int i = kvs.size(); i > 0; i--) {
+            fullPartSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
+        }
+
+        return fullPartSpec;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 0c8832ca3..80f4a11b0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -37,7 +37,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BinaryOperator;
-import java.util.function.Function;
 import java.util.function.Predicate;
 
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
@@ -258,14 +257,15 @@ public class SnapshotManager implements Serializable {
      * because the committer may delete obsolete snapshots, which may cause 
the writer to encounter
      * unreadable snapshots.
      */
-    public void traversalSnapshotsFromLatestSafely(Function<Snapshot, Boolean> 
consumer) {
+    @Nullable
+    public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> 
checker) {
         Long latestId = latestSnapshotId();
         if (latestId == null) {
-            return;
+            return null;
         }
         Long earliestId = earliestSnapshotId();
         if (earliestId == null) {
-            return;
+            return null;
         }
 
         for (long id = latestId; id >= earliestId; id--) {
@@ -275,7 +275,7 @@ public class SnapshotManager implements Serializable {
             } catch (Exception e) {
                 Long newEarliestId = earliestSnapshotId();
                 if (newEarliestId == null) {
-                    return;
+                    return null;
                 }
 
                 // this is a valid snapshot, should not throw exception
@@ -284,13 +284,14 @@ public class SnapshotManager implements Serializable {
                 }
 
                 // ok, this is an expired snapshot
-                return;
+                return null;
             }
 
-            if (consumer.apply(snapshot)) {
-                return;
+            if (checker.test(snapshot)) {
+                return snapshot;
             }
         }
+        return null;
     }
 
     private @Nullable Long findLatest() throws IOException {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
index 8e3118856..529f84b6e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
@@ -30,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.time.LocalDateTime;
 import java.util.UUID;
 import java.util.function.Predicate;
 
@@ -76,4 +78,8 @@ public abstract class PrimaryKeyTableTestBase {
     protected Options tableOptions() {
         return new Options();
     }
+
+    protected static long utcMills(String timestamp) {
+        return 
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
index 2d7b771e0..5ce5ba9c5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java
@@ -21,7 +21,6 @@ package org.apache.paimon.tag;
 import org.apache.paimon.CoreOptions.TagCreationMode;
 import org.apache.paimon.CoreOptions.TagCreationPeriod;
 import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
-import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
@@ -189,10 +188,6 @@ public class TagAutoCreationTest extends 
PrimaryKeyTableTestBase {
         commit.close();
     }
 
-    private long utcMills(String timestamp) {
-        return 
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
-    }
-
     private long localZoneMills(String timestamp) {
         return LocalDateTime.parse(timestamp)
                 .atZone(ZoneId.systemDefault())
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/TagPreviewTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagPreviewTest.java
new file mode 100644
index 000000000..36a4cf569
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagPreviewTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.TagCreationMode;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.sink.TableCommitImpl;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION_PREVIEW;
+import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TagPreview}. */
+public class TagPreviewTest extends PrimaryKeyTableTestBase {
+
+    @Test
+    public void testExtractTag() {
+        TagPreview preview = create();
+        Optional<String> tag = preview.extractTag(-1, 
utcMills("2023-07-18T12:12:00"));
+        assertThat(tag).hasValue("2023-07-18");
+    }
+
+    @Test
+    public void testTimeTravel() {
+        TagPreview preview = create();
+        Map<String, String> dynamicOptions = new HashMap<>();
+        dynamicOptions.put(SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
+        dynamicOptions.put(SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
+        TableCommitImpl commit =
+                
table.copy(dynamicOptions).newCommit(commitUser).ignoreEmptyCommit(false);
+
+        commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:12:00")));
+        assertThat(preview.timeTravel(table, "2023-07-18"))
+                .containsAllEntriesOf(singletonMap(SCAN_SNAPSHOT_ID.key(), 
"1"));
+
+        commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T13:12:00")));
+        assertThat(preview.timeTravel(table, "2023-07-18"))
+                .containsAllEntriesOf(singletonMap(SCAN_SNAPSHOT_ID.key(), 
"2"));
+
+        commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-19T12:12:00")));
+        assertThat(preview.timeTravel(table, "2023-07-18"))
+                .containsAllEntriesOf(singletonMap(SCAN_SNAPSHOT_ID.key(), 
"2"));
+        assertThat(preview.timeTravel(table, "2023-07-19"))
+                .containsAllEntriesOf(singletonMap(SCAN_SNAPSHOT_ID.key(), 
"3"));
+        assertThat(preview.timeTravel(table, "2023-07-20"))
+                .containsAllEntriesOf(singletonMap(SCAN_SNAPSHOT_ID.key(), 
"3"));
+
+        table.createTag("2023-07-17", 1);
+        table.createTag("2023-07-18", 2);
+        assertThat(preview.timeTravel(table, "2023-07-18"))
+                .containsAllEntriesOf(singletonMap(SCAN_TAG_NAME.key(), 
"2023-07-18"));
+
+        // expire 2023-07-19
+        for (int i = 0; i < 5; i++) {
+            commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-21T12:12:00")));
+        }
+        table.createTag("2023-07-20", 7);
+        assertThat(preview.timeTravel(table, "2023-07-20"))
+                .containsAllEntriesOf(singletonMap(SCAN_TAG_NAME.key(), 
"2023-07-20"));
+
+        assertThat(preview.timeTravel(table, "2023-07-19"))
+                .containsAllEntriesOf(singletonMap(SCAN_TAG_NAME.key(), 
"2023-07-18"));
+    }
+
+    private TagPreview create() {
+        Options options = new Options();
+        options.set(METASTORE_TAG_TO_PARTITION_PREVIEW, 
TagCreationMode.WATERMARK);
+        TagPreview preview = TagPreview.create(new CoreOptions(options));
+        assertThat(preview).isNotNull();
+        return preview;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 33c81b146..5e689a826 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
@@ -136,7 +135,7 @@ public class SnapshotManagerTest {
                 });
 
         // test safely
-        Function<Snapshot, Boolean> func =
+        Filter<Snapshot> func =
                 snapshot -> {
                     try {
                         Thread.sleep(100);
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 59aaebe65..f4ca05751 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -34,6 +34,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.TableType;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 
 import org.apache.flink.table.hive.LegacyHiveClasses;
 import org.apache.hadoop.conf.Configuration;
@@ -79,6 +80,7 @@ import static 
org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
 import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
 import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
 
@@ -476,7 +478,8 @@ public class HiveCatalog extends AbstractCatalog {
         serDeInfo.setSerializationLib(SERDE_CLASS_NAME);
         sd.setSerdeInfo(serDeInfo);
 
-        if (new CoreOptions(schema.options()).partitionedTableInMetastore()) {
+        CoreOptions options = new CoreOptions(schema.options());
+        if (options.partitionedTableInMetastore() && 
schema.partitionKeys().size() > 0) {
             Map<String, DataField> fieldMap =
                     schema.fields().stream()
                             .collect(Collectors.toMap(DataField::name, 
Function.identity()));
@@ -495,6 +498,21 @@ public class HiveCatalog extends AbstractCatalog {
             }
             sd.setCols(normalFields);
         } else {
+            if (options.tagToPartitionField() != null) {
+                // map a non-partitioned table to a partitioned table
+                // partition field is tag field which is offered by user
+                checkArgument(
+                        schema.partitionKeys().isEmpty(),
+                        "Partition table can not use 
timeTravelToPartitionField.");
+                table.setPartitionKeys(
+                        Collections.singletonList(
+                                convertToFieldSchema(
+                                        new DataField(
+                                                0,
+                                                options.tagToPartitionField(),
+                                                DataTypes.STRING()))));
+            }
+
             sd.setCols(
                     schema.fields().stream()
                             .map(this::convertToFieldSchema)
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 66bada2da..d24944b34 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -60,10 +60,12 @@ public class HiveMetastoreClient implements MetastoreClient 
{
 
     @Override
     public void addPartition(BinaryRow partition) throws Exception {
-        LinkedHashMap<String, String> partitionMap =
-                partitionComputer.generatePartValues(partition);
-        List<String> partitionValues = new ArrayList<>(partitionMap.values());
+        addPartition(partitionComputer.generatePartValues(partition));
+    }
 
+    @Override
+    public void addPartition(LinkedHashMap<String, String> partitionSpec) 
throws Exception {
+        List<String> partitionValues = new ArrayList<>(partitionSpec.values());
         try {
             client.getPartition(
                     identifier.getDatabaseName(), identifier.getObjectName(), 
partitionValues);
@@ -74,7 +76,7 @@ public class HiveMetastoreClient implements MetastoreClient {
             newSd.setLocation(
                     sd.getLocation()
                             + "/"
-                            + 
PartitionPathUtils.generatePartitionPath(partitionMap));
+                            + 
PartitionPathUtils.generatePartitionPath(partitionSpec));
 
             Partition hivePartition = new Partition();
             hivePartition.setDbName(identifier.getDatabaseName());
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
index 431ca9da4..3e5671878 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
@@ -59,6 +59,8 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Column names, types and comments of a Hive table. */
 public class HiveSchema {
 
@@ -152,24 +154,32 @@ public class HiveSchema {
                     "Extract schema with exists DDL and exists paimon table, 
table location:[{}].",
                     location);
 
+            TableSchema paimonSchema = tableSchema.get();
+            String tagToPartField =
+                    
paimonSchema.options().get(CoreOptions.METASTORE_TAG_TO_PARTITION.key());
+
             boolean isPartitionedTable =
                     partitionTypeInfos.size() > 0
                             // for some Hive compatible system
                             || 
properties.containsKey("TABLE_TOTAL_PARTITIONS");
-            checkFieldsMatched(columnNames, typeInfos, tableSchema.get(), 
isPartitionedTable);
-            checkPartitionMatched(partitionKeys, partitionTypeInfos, 
tableSchema.get());
+            checkFieldsMatched(columnNames, typeInfos, paimonSchema, 
isPartitionedTable);
+            checkPartitionMatched(partitionKeys, partitionTypeInfos, 
paimonSchema, tagToPartField);
 
             // Use paimon table data types and column comments when the paimon 
table exists.
             // Using paimon data types first because hive's 
TypeInfoFactory.timestampTypeInfo
             // doesn't contain precision and thus may cause casting problems
             Map<String, DataField> paimonFields =
-                    tableSchema.get().fields().stream()
+                    paimonSchema.fields().stream()
                             .collect(
                                     Collectors.toMap(
                                             dataField -> 
dataField.name().toLowerCase(),
                                             Function.identity()));
             for (int i = 0; i < columnNames.size(); i++) {
                 String columnName = columnNames.get(i).toLowerCase();
+                if (Objects.equals(columnName, tagToPartField)) {
+                    // ignore tagToPartField, it should just be a string type
+                    continue;
+                }
                 dataTypes.set(i, paimonFields.get(columnName).type());
                 comments.set(i, paimonFields.get(columnName).description());
             }
@@ -267,11 +277,18 @@ public class HiveSchema {
     private static void checkPartitionMatched(
             List<String> hivePartitionKeys,
             List<TypeInfo> hivePartitionTypeInfos,
-            TableSchema tableSchema) {
+            TableSchema tableSchema,
+            @Nullable String tagToPartField) {
         if (hivePartitionKeys.isEmpty()) {
             // only partitioned Hive table needs to consider this part
             return;
         }
+        if (tagToPartField != null) {
+            // fast check path for tagToPartField
+            checkArgument(tableSchema.partitionKeys().isEmpty());
+            
checkArgument(hivePartitionKeys.equals(Collections.singletonList(tagToPartField)));
+            return;
+        }
 
         List<String> schemaPartitionKeys = tableSchema.partitionKeys();
         List<TypeInfo> schemaPartitionTypeInfos =
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index ed9848a67..314b50d70 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.hive.mapred;
 
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.RowDataContainer;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -25,7 +26,9 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.PartitionPathUtils;
 
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -38,15 +41,20 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static java.util.Collections.singletonMap;
+import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
 import static org.apache.paimon.hive.utils.HiveUtils.createFileStoreTable;
 import static org.apache.paimon.hive.utils.HiveUtils.createPredicate;
 
@@ -70,19 +78,37 @@ public class PaimonInputFormat implements InputFormat<Void, 
RowDataContainer> {
         // the location of Hive.
         String locations = jobConf.get(FileInputFormat.INPUT_DIR);
 
+        @Nullable String tagToPartField = 
table.coreOptions().tagToPartitionField();
+        @Nullable TagPreview tagPreview = 
TagPreview.create(table.coreOptions());
+
         List<PaimonInputSplit> splits = new ArrayList<>();
         // locations may contain multiple partitions
         for (String location : locations.split(",")) {
-            List<Predicate> predicatePerPartition = new 
ArrayList<>(predicates);
-            createPartitionPredicate(
-                            table.schema().logicalRowType(),
-                            table.schema().partitionKeys(),
-                            location)
-                    .ifPresent(predicatePerPartition::add);
-
-            InnerTableScan scan = table.newScan();
-            if (predicatePerPartition.size() > 0) {
-                scan.withFilter(PredicateBuilder.and(predicatePerPartition));
+            InnerTableScan scan;
+            if (tagToPartField != null) {
+                // the location should be pruned by partition predicate
+                // we can extract tag name from location, and use time travel 
to scan
+                String tag = extractTagName(location, tagToPartField);
+                Map<String, String> dynamicOptions =
+                        tagPreview == null
+                                ? singletonMap(SCAN_TAG_NAME.key(), tag)
+                                : tagPreview.timeTravel(table, tag);
+                scan = table.copy(dynamicOptions).newScan();
+                if (predicates.size() > 0) {
+                    scan.withFilter(PredicateBuilder.and(predicates));
+                }
+            } else {
+                List<Predicate> predicatePerPartition = new 
ArrayList<>(predicates);
+                createPartitionPredicate(
+                                table.schema().logicalRowType(),
+                                table.schema().partitionKeys(),
+                                location)
+                        .ifPresent(predicatePerPartition::add);
+
+                scan = table.newScan();
+                if (predicatePerPartition.size() > 0) {
+                    
scan.withFilter(PredicateBuilder.and(predicatePerPartition));
+                }
             }
             scan.plan()
                     .splits()
@@ -129,7 +155,8 @@ public class PaimonInputFormat implements InputFormat<Void, 
RowDataContainer> {
                 split,
                 paimonColumns,
                 getHiveColumns(jobConf).orElse(paimonColumns),
-                Arrays.asList(getSelectedColumns(jobConf)));
+                Arrays.asList(getSelectedColumns(jobConf)),
+                table.coreOptions().tagToPartitionField());
     }
 
     private Optional<List<String>> getHiveColumns(JobConf jobConf) {
@@ -156,4 +183,11 @@ public class PaimonInputFormat implements 
InputFormat<Void, RowDataContainer> {
                 .distinct()
                 .toArray(String[]::new);
     }
+
+    /** Extract tag name from location, partition field should be tag name. */
+    public static String extractTagName(String location, String 
tagToPartField) {
+        LinkedHashMap<String, String> partSpec =
+                PartitionPathUtils.extractPartitionSpecFromPath(new 
Path(location));
+        return partSpec.get(tagToPartField);
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
index b50feacdf..4e30b83c7 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
@@ -19,7 +19,10 @@
 package org.apache.paimon.hive.mapred;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.hive.RowDataContainer;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -47,6 +50,7 @@ public class PaimonRecordReader implements RecordReader<Void, 
RowDataContainer>
 
     // project from selectedColumns to hiveColumns
     @Nullable private final ProjectedRow reusedProjectedRow;
+    @Nullable private final JoinedRow addTagToPartFieldRow;
 
     private float progress;
 
@@ -60,7 +64,8 @@ public class PaimonRecordReader implements RecordReader<Void, 
RowDataContainer>
             PaimonInputSplit split,
             List<String> paimonColumns,
             List<String> hiveColumns,
-            List<String> selectedColumns)
+            List<String> selectedColumns,
+            @Nullable String tagToPartField)
             throws IOException {
         if (!paimonColumns.equals(selectedColumns)) {
             readBuilder.withProjection(
@@ -78,6 +83,16 @@ public class PaimonRecordReader implements 
RecordReader<Void, RowDataContainer>
         this.iterator =
                 new 
RecordReaderIterator<>(readBuilder.newRead().createReader(split.split()));
         this.splitLength = split.getLength();
+        if (tagToPartField != null) {
+            // in case of reading partition field
+            // add last field (partition field from tag name) to row
+            String tag =
+                    
PaimonInputFormat.extractTagName(split.getPath().getName(), tagToPartField);
+            addTagToPartFieldRow = new JoinedRow();
+            addTagToPartFieldRow.replace(null, 
GenericRow.of(BinaryString.fromString(tag)));
+        } else {
+            addTagToPartFieldRow = null;
+        }
         this.progress = 0;
     }
 
@@ -94,6 +109,9 @@ public class PaimonRecordReader implements 
RecordReader<Void, RowDataContainer>
             } else {
                 value.set(rowData);
             }
+            if (addTagToPartFieldRow != null) {
+                value.set(addTagToPartFieldRow.replace(value.get(), 
addTagToPartFieldRow.row2()));
+            }
             return true;
         }
     }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
index af7f7a051..a97f283ad 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
@@ -36,8 +36,10 @@ import org.apache.hadoop.mapred.JobConf;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
@@ -58,15 +60,28 @@ public class HiveUtils {
         if (sarg == null) {
             return Optional.empty();
         }
+        Set<String> readColumnNames = null;
+        if (limitToReadColumnNames) {
+            readColumnNames =
+                    
Arrays.stream(ColumnProjectionUtils.getReadColumnNames(jobConf))
+                            .collect(Collectors.toSet());
+        }
+        String tagToPartField =
+                
tableSchema.options().get(CoreOptions.METASTORE_TAG_TO_PARTITION.key());
+        if (tagToPartField != null) {
+            // exclude tagToPartField, this should be done in Hive partition 
prune
+            // cannot find the field in paimon table schema
+            if (readColumnNames == null) {
+                readColumnNames = new HashSet<>(tableSchema.fieldNames());
+            }
+            readColumnNames.remove(tagToPartField);
+        }
         SearchArgumentToPredicateConverter converter =
                 new SearchArgumentToPredicateConverter(
                         sarg,
                         tableSchema.fieldNames(),
                         tableSchema.logicalRowType().getFieldTypes(),
-                        limitToReadColumnNames
-                                ? 
Arrays.stream(ColumnProjectionUtils.getReadColumnNames(jobConf))
-                                        .collect(Collectors.toSet())
-                                : null);
+                        readColumnNames);
         return converter.convert();
     }
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index f31463cbf..c1b010d9f 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -21,11 +21,13 @@ package org.apache.paimon.hive;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogLock;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.DataCatalogTable;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.hive.annotation.Minio;
 import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
 import org.apache.paimon.s3.MinioTestContainer;
+import org.apache.paimon.table.Table;
 
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
@@ -35,6 +37,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -909,6 +912,88 @@ public abstract class HiveCatalogITCaseBase {
                 .containsExactlyInAnyOrder("1\t10", "2\t20");
     }
 
+    @Test
+    public void testAddPartitionsForTag() throws Exception {
+        tEnv.executeSql(
+                String.join(
+                        "\n",
+                        "CREATE TABLE t (",
+                        "    k INT,",
+                        "    v BIGINT,",
+                        "    PRIMARY KEY (k) NOT ENFORCED",
+                        ") WITH (",
+                        "    'bucket' = '2',",
+                        "    'metastore.tag-to-partition' = 'dt'",
+                        ")"));
+        tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
+
+        // TODO modify to CALL after Flink 1.18
+        Table table =
+                ((DataCatalogTable)
+                                tEnv.getCatalog(tEnv.getCurrentCatalog())
+                                        .get()
+                                        .getTable(new 
ObjectPath(tEnv.getCurrentDatabase(), "t")))
+                        .table();
+        table.createTag("2023-10-16", 1);
+
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+                .containsExactlyInAnyOrder("dt=2023-10-16");
+
+        assertThat(hiveShell.executeQuery("SELECT k, v FROM t WHERE 
dt='2023-10-16'"))
+                .containsExactlyInAnyOrder("1\t10", "2\t20");
+
+        assertThat(hiveShell.executeQuery("SELECT * FROM t WHERE 
dt='2023-10-16'"))
+                .containsExactlyInAnyOrder("1\t10\t2023-10-16", 
"2\t20\t2023-10-16");
+
+        // another tag
+
+        tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
+        table.createTag("2023-10-17", 2);
+
+        assertThat(hiveShell.executeQuery("SELECT * FROM t"))
+                .containsExactlyInAnyOrder(
+                        "1\t10\t2023-10-16",
+                        "2\t20\t2023-10-16",
+                        "1\t10\t2023-10-17",
+                        "2\t20\t2023-10-17",
+                        "3\t30\t2023-10-17",
+                        "4\t40\t2023-10-17");
+    }
+
+    @Test
+    public void testAddPartitionsForTagPreview() throws Exception {
+        tEnv.executeSql(
+                String.join(
+                        "\n",
+                        "CREATE TABLE t (",
+                        "    k INT,",
+                        "    v BIGINT,",
+                        "    PRIMARY KEY (k) NOT ENFORCED",
+                        ") WITH (",
+                        "    'bucket' = '2',",
+                        "    'metastore.tag-to-partition' = 'dt',",
+                        "    'metastore.tag-to-partition.preview' = 
'process-time'",
+                        ")"));
+
+        tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
+
+        List<String> result = hiveShell.executeQuery("SHOW PARTITIONS t");
+        assertThat(result).hasSize(1);
+        String tag = result.get(0).split("=")[1];
+
+        assertThat(hiveShell.executeQuery(String.format("SELECT k, v FROM t 
WHERE dt='%s'", tag)))
+                .containsExactlyInAnyOrder("1\t10", "2\t20");
+
+        tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
+        if (hiveShell.executeQuery("SHOW PARTITIONS t").size() == 1) {
+            // no new partition
+            assertThat(
+                            hiveShell.executeQuery(
+                                    String.format("SELECT k, v FROM t WHERE 
dt='%s'", tag)))
+                    .containsExactlyInAnyOrder("1\t10", "2\t20", "3\t30", 
"4\t40");
+        }
+    }
+
     protected List<Row> collect(String sql) throws Exception {
         List<Row> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
index dc2700004..738958869 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
@@ -157,7 +157,8 @@ public class PaimonRecordReaderTest {
                         new PaimonInputSplit(tempDir.toString(), dataSplit),
                         originalColumns,
                         originalColumns,
-                        selectedColumns);
+                        selectedColumns,
+                        null);
             }
         }
         throw new IllegalArgumentException(


Reply via email to