This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 035c89c020 [core] Introduce batch partition methods in catalog (#4866)
035c89c020 is described below

commit 035c89c020654b7915e023752e6dd9df2fa9141c
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jan 8 21:52:29 2025 +0800

    [core] Introduce batch partition methods in catalog (#4866)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |   6 +-
 .../org/apache/paimon/catalog/AbstractCatalog.java |  40 ++--
 .../org/apache/paimon/catalog/CachingCatalog.java  |  15 +-
 .../java/org/apache/paimon/catalog/Catalog.java    | 111 +++++------
 .../org/apache/paimon/catalog/CatalogUtils.java    |   2 +-
 .../org/apache/paimon/catalog/DelegateCatalog.java |  36 ++--
 .../paimon/operation/AbstractFileStoreScan.java    |   6 +
 .../org/apache/paimon/operation/FileStoreScan.java |   2 +
 .../apache/paimon/operation/ManifestsReader.java   |  11 ++
 .../apache/paimon/privilege/PrivilegedCatalog.java |  28 ++-
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  64 ++-----
 .../paimon/table/FallbackReadFileStoreTable.java   |   8 +
 .../paimon/table/source/AbstractDataTableScan.java |   6 +
 .../apache/paimon/table/source/InnerTableScan.java |   4 +
 .../table/source/snapshot/SnapshotReader.java      |   2 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |   6 +
 .../apache/paimon/table/system/AuditLogTable.java  |  12 ++
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 102 ----------
 .../java/org/apache/paimon/flink/FlinkCatalog.java |   8 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 209 +++++++++++++++++++--
 .../org/apache/paimon/hive/HiveCatalogTest.java    |  55 ++++++
 21 files changed, 457 insertions(+), 276 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 953096c0b5..22987c6292 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -232,7 +232,11 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
     }
 
     protected ManifestsReader newManifestsReader(boolean forWrite) {
-        return new ManifestsReader(partitionType, snapshotManager(), 
manifestListFactory(forWrite));
+        return new ManifestsReader(
+                partitionType,
+                options.partitionDefaultName(),
+                snapshotManager(),
+                manifestListFactory(forWrite));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 439f456efb..a6790004a1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -50,10 +50,8 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -160,32 +158,11 @@ public abstract class AbstractCatalog implements Catalog {
     protected abstract Database getDatabaseImpl(String name) throws 
DatabaseNotExistException;
 
     @Override
-    public void createPartition(Identifier identifier, Map<String, String> 
partitionSpec)
-            throws TableNotExistException {
-        Identifier tableIdentifier =
-                Identifier.create(identifier.getDatabaseName(), 
identifier.getTableName());
-        FileStoreTable table = (FileStoreTable) getTable(tableIdentifier);
-
-        if (table.partitionKeys().isEmpty() || 
!table.coreOptions().partitionedTableInMetastore()) {
-            return;
-        }
-
-        MetastoreClient.Factory metastoreFactory =
-                table.catalogEnvironment().metastoreClientFactory();
-        if (metastoreFactory == null) {
-            throw new UnsupportedOperationException(
-                    "The catalog must have metastore to create partition.");
-        }
-
-        try (MetastoreClient client = metastoreFactory.create()) {
-            client.addPartition(new LinkedHashMap<>(partitionSpec));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
+    public void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {}
 
     @Override
-    public void dropPartition(Identifier identifier, Map<String, String> 
partitionSpec)
+    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
             throws TableNotExistException {
         checkNotSystemTable(identifier, "dropPartition");
         Table table = getTable(identifier);
@@ -195,11 +172,18 @@ public abstract class AbstractCatalog implements Catalog {
                         .store()
                         .newCommit(
                                 
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
-            commit.dropPartitions(
-                    Collections.singletonList(partitionSpec), 
BatchWriteBuilder.COMMIT_IDENTIFIER);
+            commit.dropPartitions(partitions, 
BatchWriteBuilder.COMMIT_IDENTIFIER);
         }
     }
 
+    @Override
+    public void alterPartitions(Identifier identifier, List<Partition> 
partitions)
+            throws TableNotExistException {}
+
+    @Override
+    public void markDonePartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {}
+
     @Override
     public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
         return listPartitionsFromFileSystem(getTable(identifier));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 4796276972..23408e5692 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -295,9 +295,18 @@ public class CachingCatalog extends DelegateCatalog {
     }
 
     @Override
-    public void dropPartition(Identifier identifier, Map<String, String> 
partitions)
-            throws TableNotExistException, PartitionNotExistException {
-        wrapped.dropPartition(identifier, partitions);
+    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        wrapped.dropPartitions(identifier, partitions);
+        if (partitionCache != null) {
+            partitionCache.invalidate(identifier);
+        }
+    }
+
+    @Override
+    public void alterPartitions(Identifier identifier, List<Partition> 
partitions)
+            throws TableNotExistException {
+        wrapped.alterPartitions(identifier, partitions);
         if (partitionCache != null) {
             partitionCache.invalidate(identifier);
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index e90d3c79c5..e7d07d6dc4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -73,6 +73,8 @@ public interface Catalog extends AutoCloseable {
     /** Return a boolean that indicates whether this catalog is 
case-sensitive. */
     boolean caseSensitive();
 
+    // ======================= database methods ===============================
+
     /**
      * Get the names of all databases in this catalog.
      *
@@ -139,6 +141,8 @@ public interface Catalog extends AutoCloseable {
     void alterDatabase(String name, List<PropertyChange> changes, boolean 
ignoreIfNotExists)
             throws DatabaseNotExistException;
 
+    // ======================= table methods ===============================
+
     /**
      * Return a {@link Table} identified by the given {@link Identifier}.
      *
@@ -231,52 +235,81 @@ public interface Catalog extends AutoCloseable {
     default void invalidateTable(Identifier identifier) {}
 
     /**
-     * Create the partition of the specify table.
+     * Modify an existing table from a {@link SchemaChange}.
+     *
+     * <p>NOTE: System tables can not be altered.
+     *
+     * @param identifier path of the table to be modified
+     * @param change the schema change
+     * @param ignoreIfNotExists flag to specify behavior when the table does 
not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @throws TableNotExistException if the table does not exist
+     */
+    default void alterTable(Identifier identifier, SchemaChange change, 
boolean ignoreIfNotExists)
+            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
+        alterTable(identifier, Collections.singletonList(change), 
ignoreIfNotExists);
+    }
+
+    // ======================= partition methods 
===============================
+
+    /**
+     * Create partitions of the specify table.
      *
      * <p>Only catalog with metastore can support this method, and only table 
with
      * 'metastore.partitioned-table' can support this method.
      *
-     * @param identifier path of the table to drop partition
-     * @param partitionSpec the partition to be created
+     * @param identifier path of the table to create partitions
+     * @param partitions partitions to be created
      * @throws TableNotExistException if the table does not exist
      */
-    void createPartition(Identifier identifier, Map<String, String> 
partitionSpec)
+    void createPartitions(Identifier identifier, List<Map<String, String>> 
partitions)
             throws TableNotExistException;
 
     /**
-     * Drop the partition of the specify table.
+     * Drop partitions of the specify table.
      *
-     * @param identifier path of the table to drop partition
-     * @param partition the partition to be deleted
+     * @param identifier path of the table to drop partitions
+     * @param partitions partitions to be deleted
      * @throws TableNotExistException if the table does not exist
-     * @throws PartitionNotExistException if the partition does not exist
      */
-    void dropPartition(Identifier identifier, Map<String, String> partition)
-            throws TableNotExistException, PartitionNotExistException;
+    void dropPartitions(Identifier identifier, List<Map<String, String>> 
partitions)
+            throws TableNotExistException;
 
     /**
-     * Get Partition of all partitions of the table.
+     * Alter partitions of the specify table.
      *
-     * @param identifier path of the table to list partitions
+     * <p>Only catalog with metastore can support this method, and only table 
with
+     * 'metastore.partitioned-table' can support this method.
+     *
+     * @param identifier path of the table to alter partitions
+     * @param partitions partitions to be altered
      * @throws TableNotExistException if the table does not exist
      */
-    List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException;
+    void alterPartitions(Identifier identifier, List<Partition> partitions)
+            throws TableNotExistException;
 
     /**
-     * Modify an existing table from a {@link SchemaChange}.
+     * Mark partitions done of the specify table.
      *
-     * <p>NOTE: System tables can not be altered.
+     * <p>Only catalog with metastore can support this method, and only table 
with
+     * 'metastore.partitioned-table' can support this method.
      *
-     * @param identifier path of the table to be modified
-     * @param change the schema change
-     * @param ignoreIfNotExists flag to specify behavior when the table does 
not exist: if set to
-     *     false, throw an exception, if set to true, do nothing.
+     * @param identifier path of the table to mark done partitions
+     * @param partitions partitions to be marked done
      * @throws TableNotExistException if the table does not exist
      */
-    default void alterTable(Identifier identifier, SchemaChange change, 
boolean ignoreIfNotExists)
-            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
-        alterTable(identifier, Collections.singletonList(change), 
ignoreIfNotExists);
-    }
+    void markDonePartitions(Identifier identifier, List<Map<String, String>> 
partitions)
+            throws TableNotExistException;
+
+    /**
+     * Get Partition of all partitions of the table.
+     *
+     * @param identifier path of the table to list partitions
+     * @throws TableNotExistException if the table does not exist
+     */
+    List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException;
+
+    // ======================= view methods ===============================
 
     /**
      * Return a {@link View} identified by the given {@link Identifier}.
@@ -340,6 +373,8 @@ public interface Catalog extends AutoCloseable {
         throw new UnsupportedOperationException();
     }
 
+    // ======================= repair methods ===============================
+
     /**
      * Repair the entire Catalog, repair the metadata in the metastore 
consistent with the metadata
      * in the filesystem, register missing tables in the metastore.
@@ -508,36 +543,6 @@ public interface Catalog extends AutoCloseable {
         }
     }
 
-    /** Exception for trying to operate on a partition that doesn't exist. */
-    class PartitionNotExistException extends Exception {
-
-        private static final String MSG = "Partition %s do not exist in the 
table %s.";
-
-        private final Identifier identifier;
-
-        private final Map<String, String> partitionSpec;
-
-        public PartitionNotExistException(
-                Identifier identifier, Map<String, String> partitionSpec) {
-            this(identifier, partitionSpec, null);
-        }
-
-        public PartitionNotExistException(
-                Identifier identifier, Map<String, String> partitionSpec, 
Throwable cause) {
-            super(String.format(MSG, partitionSpec, identifier.getFullName()), 
cause);
-            this.identifier = identifier;
-            this.partitionSpec = partitionSpec;
-        }
-
-        public Identifier identifier() {
-            return identifier;
-        }
-
-        public Map<String, String> partitionSpec() {
-            return partitionSpec;
-        }
-    }
-
     /** Exception for trying to alter a column that already exists. */
     class ColumnAlreadyExistException extends Exception {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index cddb76e683..fabfa50fc4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -131,7 +131,7 @@ public class CatalogUtils {
         InternalRowPartitionComputer computer =
                 new InternalRowPartitionComputer(
                         options.get(PARTITION_DEFAULT_NAME),
-                        table.rowType(),
+                        table.rowType().project(table.partitionKeys()),
                         table.partitionKeys().toArray(new String[0]),
                         options.get(PARTITION_GENERATE_LEGCY_NAME));
         List<PartitionEntry> partitionEntries =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index e2d1a94cfa..23c50e9986 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -119,6 +119,30 @@ public class DelegateCatalog implements Catalog {
         wrapped.alterTable(identifier, changes, ignoreIfNotExists);
     }
 
+    @Override
+    public void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        wrapped.createPartitions(identifier, partitions);
+    }
+
+    @Override
+    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        wrapped.dropPartitions(identifier, partitions);
+    }
+
+    @Override
+    public void alterPartitions(Identifier identifier, List<Partition> 
partitions)
+            throws TableNotExistException {
+        wrapped.alterPartitions(identifier, partitions);
+    }
+
+    @Override
+    public void markDonePartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        wrapped.markDonePartitions(identifier, partitions);
+    }
+
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
         return wrapped.getTable(identifier);
@@ -152,18 +176,6 @@ public class DelegateCatalog implements Catalog {
         wrapped.renameView(fromView, toView, ignoreIfNotExists);
     }
 
-    @Override
-    public void createPartition(Identifier identifier, Map<String, String> 
partitions)
-            throws TableNotExistException {
-        wrapped.createPartition(identifier, partitions);
-    }
-
-    @Override
-    public void dropPartition(Identifier identifier, Map<String, String> 
partitions)
-            throws TableNotExistException, PartitionNotExistException {
-        wrapped.dropPartition(identifier, partitions);
-    }
-
     @Override
     public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
         return wrapped.listPartitions(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 4f8a1f3264..27ba4703b9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -118,6 +118,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withPartitionsFilter(List<Map<String, String>> 
partitions) {
+        manifestsReader.withPartitionsFilter(partitions);
+        return this;
+    }
+
     @Override
     public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
         manifestsReader.withPartitionFilter(predicate);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 179d16de6c..99ae3ef47d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -53,6 +53,8 @@ public interface FileStoreScan {
 
     FileStoreScan withPartitionFilter(List<BinaryRow> partitions);
 
+    FileStoreScan withPartitionsFilter(List<Map<String, String>> partitions);
+
     FileStoreScan withPartitionFilter(PartitionPredicate predicate);
 
     FileStoreScan withBucket(int bucket);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index 5ee468af20..2eaa3646f7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -34,13 +34,17 @@ import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
+
 /** A util class to read manifest files. */
 @ThreadSafe
 public class ManifestsReader {
 
     private final RowType partitionType;
+    private final String partitionDefaultValue;
     private final SnapshotManager snapshotManager;
     private final ManifestList.Factory manifestListFactory;
 
@@ -48,9 +52,11 @@ public class ManifestsReader {
 
     public ManifestsReader(
             RowType partitionType,
+            String partitionDefaultValue,
             SnapshotManager snapshotManager,
             ManifestList.Factory manifestListFactory) {
         this.partitionType = partitionType;
+        this.partitionDefaultValue = partitionDefaultValue;
         this.snapshotManager = snapshotManager;
         this.manifestListFactory = manifestListFactory;
     }
@@ -65,6 +71,11 @@ public class ManifestsReader {
         return this;
     }
 
+    public ManifestsReader withPartitionsFilter(List<Map<String, String>> 
partitions) {
+        return withPartitionFilter(
+                createBinaryPartitions(partitions, partitionType, 
partitionDefaultValue));
+    }
+
     public ManifestsReader withPartitionFilter(PartitionPredicate predicate) {
         this.partitionFilter = predicate;
         return this;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index 35822471a2..6be09fa9b9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
@@ -143,10 +144,31 @@ public class PrivilegedCatalog extends DelegateCatalog {
     }
 
     @Override
-    public void dropPartition(Identifier identifier, Map<String, String> 
partitions)
-            throws TableNotExistException, PartitionNotExistException {
+    public void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
+        wrapped.createPartitions(identifier, partitions);
+    }
+
+    @Override
+    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
+        wrapped.dropPartitions(identifier, partitions);
+    }
+
+    @Override
+    public void alterPartitions(Identifier identifier, List<Partition> 
partitions)
+            throws TableNotExistException {
+        privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
+        wrapped.alterPartitions(identifier, partitions);
+    }
+
+    @Override
+    public void markDonePartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
         privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
-        wrapped.dropPartition(identifier, partitions);
+        wrapped.markDonePartitions(identifier, partitions);
     }
 
     public void createPrivilegedUser(String user, String password) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index c547656e7c..8659fbf655 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -41,9 +41,7 @@ import 
org.apache.paimon.rest.exceptions.NoSuchResourceException;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
-import org.apache.paimon.rest.requests.CreatePartitionRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
-import org.apache.paimon.rest.requests.DropPartitionRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.ConfigResponse;
@@ -53,7 +51,6 @@ import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
 import org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
-import org.apache.paimon.rest.responses.PartitionResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.TableSchema;
@@ -62,7 +59,6 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.object.ObjectTable;
-import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 
@@ -84,7 +80,6 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
-import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
 import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
 import static 
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
 import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
@@ -362,56 +357,27 @@ public class RESTCatalog implements Catalog {
     }
 
     @Override
-    public void createPartition(Identifier identifier, Map<String, String> 
partitionSpec)
+    public void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
             throws TableNotExistException {
-        Table table = getTable(identifier);
-        Options options = Options.fromMap(table.options());
-        if (!options.get(METASTORE_PARTITIONED_TABLE)) {
-            return;
-        }
-
-        try {
-            CreatePartitionRequest request = new 
CreatePartitionRequest(identifier, partitionSpec);
-            client.post(
-                    resourcePaths.partitions(
-                            identifier.getDatabaseName(), 
identifier.getTableName()),
-                    request,
-                    PartitionResponse.class,
-                    headers());
-        } catch (NoSuchResourceException e) {
-            throw new TableNotExistException(identifier);
-        } catch (ForbiddenException e) {
-            throw new TableNoPermissionException(identifier, e);
-        }
+        throw new UnsupportedOperationException();
     }
 
     @Override
-    public void dropPartition(Identifier identifier, Map<String, String> 
partition)
-            throws TableNotExistException, PartitionNotExistException {
-        checkNotSystemTable(identifier, "dropPartition");
+    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        throw new UnsupportedOperationException();
+    }
 
-        Table table = getTable(identifier);
-        Options options = Options.fromMap(table.options());
-        if (options.get(METASTORE_PARTITIONED_TABLE)) {
-            try {
-                client.delete(
-                        resourcePaths.partitions(
-                                identifier.getDatabaseName(), 
identifier.getTableName()),
-                        new DropPartitionRequest(partition),
-                        headers());
-            } catch (NoSuchResourceException ignore) {
-                throw new PartitionNotExistException(identifier, partition);
-            } catch (ForbiddenException e) {
-                throw new TableNoPermissionException(identifier, e);
-            }
-        }
+    @Override
+    public void alterPartitions(Identifier identifier, List<Partition> 
partitions)
+            throws TableNotExistException {
+        throw new UnsupportedOperationException();
+    }
 
-        try (BatchTableCommit commit =
-                
table.newBatchWriteBuilder().withOverwrite(partition).newCommit()) {
-            commit.commit(Collections.emptyList());
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+    @Override
+    public void markDonePartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        throw new UnsupportedOperationException();
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index e3e290f060..eb405a5d5d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
@@ -258,6 +259,13 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
             return this;
         }
 
+        @Override
+        public InnerTableScan withPartitionsFilter(List<Map<String, String>> 
partitions) {
+            mainScan.withPartitionsFilter(partitions);
+            fallbackScan.withPartitionsFilter(partitions);
+            return this;
+        }
+
         @Override
         public Scan withBucketFilter(Filter<Integer> bucketFilter) {
             mainScan.withBucketFilter(bucketFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index a5810bfc24..59b11281cc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -101,6 +101,12 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
         return this;
     }
 
+    @Override
+    public AbstractDataTableScan withPartitionsFilter(List<Map<String, 
String>> partitions) {
+        snapshotReader.withPartitionsFilter(partitions);
+        return this;
+    }
+
     @Override
     public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
         snapshotReader.withLevelFilter(levelFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index c2425ff16f..f7d609187d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -39,6 +39,10 @@ public interface InnerTableScan extends TableScan {
         return this;
     }
 
+    default InnerTableScan withPartitionsFilter(List<Map<String, String>> 
partitions) {
+        return this;
+    }
+
     default InnerTableScan withPartitionFilter(List<BinaryRow> partitions) {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index f3e0a92b8f..3329ab95fc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -73,6 +73,8 @@ public interface SnapshotReader {
 
     SnapshotReader withPartitionFilter(List<BinaryRow> partitions);
 
+    SnapshotReader withPartitionsFilter(List<Map<String, String>> partitions);
+
     SnapshotReader withMode(ScanMode scanMode);
 
     SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 43a6d3c872..032738c38c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -192,6 +192,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withPartitionsFilter(List<Map<String, String>> 
partitions) {
+        scan.withPartitionsFilter(partitions);
+        return this;
+    }
+
     @Override
     public SnapshotReader withFilter(Predicate predicate) {
         List<String> partitionKeys = tableSchema.partitionKeys();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 1cb967f8d1..005535094e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -313,6 +313,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withPartitionsFilter(List<Map<String, String>> 
partitions) {
+            wrapped.withPartitionsFilter(partitions);
+            return this;
+        }
+
         @Override
         public SnapshotReader withMode(ScanMode scanMode) {
             wrapped.withMode(scanMode);
@@ -446,6 +452,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public InnerTableScan withPartitionsFilter(List<Map<String, String>> 
partitions) {
+            batchScan.withPartitionsFilter(partitions);
+            return this;
+        }
+
         @Override
         public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
             batchScan.withBucketFilter(bucketFilter);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 627b02c1e3..344807b4c9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -50,9 +50,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
@@ -317,106 +315,6 @@ public class RESTCatalogTest {
                 () -> restCatalog.dropTable(Identifier.create(databaseName, 
tableName), false));
     }
 
-    @Test
-    public void testCreatePartition() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        GetTableResponse response = MockRESTMessage.getTableResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
-
-        Map<String, String> partitionSpec = new HashMap<>();
-        partitionSpec.put("p1", "v1");
-        
mockResponse(mapper.writeValueAsString(MockRESTMessage.partitionResponse()), 
200);
-        assertDoesNotThrow(
-                () ->
-                        restCatalog.createPartition(
-                                Identifier.create(databaseName, "table"), 
partitionSpec));
-    }
-
-    @Test
-    public void testCreatePartitionWhenTableNotExist() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        Map<String, String> partitionSpec = new HashMap<>();
-        partitionSpec.put("p1", "v1");
-        mockResponse("", 404);
-        assertThrows(
-                Catalog.TableNotExistException.class,
-                () ->
-                        restCatalog.createPartition(
-                                Identifier.create(databaseName, "table"), 
partitionSpec));
-    }
-
-    @Test
-    public void testCreatePartitionWhenTableNoPermissionException() throws 
Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        Map<String, String> partitionSpec = new HashMap<>();
-        partitionSpec.put("p1", "v1");
-        mockResponse("", 403);
-        assertThrows(
-                Catalog.TableNoPermissionException.class,
-                () ->
-                        restCatalog.createPartition(
-                                Identifier.create(databaseName, "table"), 
partitionSpec));
-    }
-
-    @Test
-    public void testDropPartition() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        Map<String, String> partitionSpec = new HashMap<>();
-        GetTableResponse response = MockRESTMessage.getTableResponse();
-        partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
-        mockResponse(mapper.writeValueAsString(""), 200);
-        mockResponse(mapper.writeValueAsString(response), 200);
-        assertThrows(
-                RuntimeException.class,
-                () ->
-                        restCatalog.dropPartition(
-                                Identifier.create(databaseName, "table"), 
partitionSpec));
-    }
-
-    @Test
-    public void testDropPartitionWhenPartitionNoExist() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        GetTableResponse response = 
MockRESTMessage.getTableResponseEnablePartition();
-        mockResponse(mapper.writeValueAsString(response), 200);
-
-        Map<String, String> partitionSpec = new HashMap<>();
-        partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
-        mockResponse(mapper.writeValueAsString(""), 404);
-        assertThrows(
-                Catalog.PartitionNotExistException.class,
-                () ->
-                        restCatalog.dropPartition(
-                                Identifier.create(databaseName, "table"), 
partitionSpec));
-    }
-
-    @Test
-    public void testDropPartitionWhenTableNoPermission() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        Map<String, String> partitionSpec = new HashMap<>();
-        GetTableResponse response = MockRESTMessage.getTableResponse();
-        partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
-        mockResponse(mapper.writeValueAsString(""), 403);
-        assertThrows(
-                Catalog.TableNoPermissionException.class,
-                () ->
-                        restCatalog.dropPartition(
-                                Identifier.create(databaseName, "table"), 
partitionSpec));
-    }
-
-    @Test
-    public void testDropPartitionWhenTableNoExist() throws Exception {
-        String databaseName = MockRESTMessage.databaseName();
-        Map<String, String> partitionSpec = new HashMap<>();
-        GetTableResponse response = MockRESTMessage.getTableResponse();
-        partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
-        mockResponse("", 404);
-        assertThrows(
-                Catalog.TableNotExistException.class,
-                () ->
-                        restCatalog.dropPartition(
-                                Identifier.create(databaseName, "table"), 
partitionSpec));
-    }
-
     @Test
     public void testListPartitionsWhenMetastorePartitionedIsTrue() throws 
Exception {
         String databaseName = MockRESTMessage.databaseName();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index ec3c4a47a6..7d19db3177 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -1421,7 +1421,8 @@ public class FlinkCatalog extends AbstractCatalog {
 
         try {
             Identifier identifier = toIdentifier(tablePath);
-            catalog.createPartition(identifier, 
partitionSpec.getPartitionSpec());
+            catalog.createPartitions(
+                    identifier, 
Collections.singletonList(partitionSpec.getPartitionSpec()));
         } catch (Catalog.TableNotExistException e) {
             throw new CatalogException(e);
         }
@@ -1440,11 +1441,10 @@ public class FlinkCatalog extends AbstractCatalog {
 
         try {
             Identifier identifier = toIdentifier(tablePath);
-            catalog.dropPartition(identifier, 
partitionSpec.getPartitionSpec());
+            catalog.dropPartitions(
+                    identifier, 
Collections.singletonList(partitionSpec.getPartitionSpec()));
         } catch (Catalog.TableNotExistException e) {
             throw new CatalogException(e);
-        } catch (Catalog.PartitionNotExistException e) {
-            throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec);
         }
     }
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index a213909beb..5afb60e84f 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -50,6 +50,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PartitionPathUtils;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewImpl;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -105,6 +107,7 @@ import static 
org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
 import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
+import static 
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
 import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
 import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
 import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
@@ -145,6 +148,8 @@ public class HiveCatalog extends AbstractCatalog {
     public static final String HIVE_SITE_FILE = "hive-site.xml";
     private static final String HIVE_EXTERNAL_TABLE_PROP = "EXTERNAL";
     private static final int DEFAULT_TABLE_BATCH_SIZE = 300;
+    private static final String HIVE_LAST_UPDATE_TIME_PROP = 
"transient_lastDdlTime";
+
     private final HiveConf hiveConf;
     private final String clientClassName;
     private final Options options;
@@ -344,40 +349,177 @@ public class HiveCatalog extends AbstractCatalog {
         }
     }
 
+    private boolean metastorePartitioned(TableSchema schema) {
+        CoreOptions options = CoreOptions.fromMap(schema.options());
+        return (!schema.partitionKeys().isEmpty() && 
options.partitionedTableInMetastore())
+                || options.tagToPartitionField() != null;
+    }
+
+    @Override
+    public void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        Identifier tableIdentifier =
+                Identifier.create(identifier.getDatabaseName(), 
identifier.getTableName());
+        Table hmsTable = getHmsTable(tableIdentifier);
+        Path location = getTableLocation(tableIdentifier, hmsTable);
+        TableSchema schema = getDataTableSchema(tableIdentifier, hmsTable);
+
+        if (!metastorePartitioned(schema)) {
+            return;
+        }
+
+        int currentTime = (int) (System.currentTimeMillis() / 1000);
+
+        try {
+            List<Partition> hivePartitions =
+                    toHivePartitions(
+                            identifier,
+                            location.toString(),
+                            hmsTable.getSd(),
+                            partitions,
+                            currentTime);
+            clients.execute(client -> client.add_partitions(hivePartitions, 
true, false));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        TableSchema schema = getDataTableSchema(identifier);
+        CoreOptions options = CoreOptions.fromMap(schema.options());
+        boolean tagToPart = options.tagToPartitionField() != null;
+        if (metastorePartitioned(schema)) {
+            List<Map<String, String>> metaPartitions =
+                    tagToPart
+                            ? partitions
+                            : 
removePartitionsExistsInOtherBranches(identifier, partitions);
+            for (Map<String, String> part : metaPartitions) {
+                List<String> partitionValues = new ArrayList<>(part.values());
+                try {
+                    clients.execute(
+                            client ->
+                                    client.dropPartition(
+                                            identifier.getDatabaseName(),
+                                            identifier.getTableName(),
+                                            partitionValues,
+                                            false));
+                } catch (NoSuchObjectException e) {
+                    // do nothing if the partition not exists
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        if (!tagToPart) {
+            super.dropPartitions(identifier, partitions);
+        }
+    }
+
     @Override
-    public void dropPartition(Identifier identifier, Map<String, String> 
partitionSpec)
+    public void alterPartitions(
+            Identifier identifier, List<org.apache.paimon.partition.Partition> 
partitions)
             throws TableNotExistException {
         TableSchema tableSchema = getDataTableSchema(identifier);
         if (!tableSchema.partitionKeys().isEmpty()
-                && new 
CoreOptions(tableSchema.options()).partitionedTableInMetastore()
-                && !partitionExistsInOtherBranches(identifier, partitionSpec)) 
{
+                && new 
CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
+            for (org.apache.paimon.partition.Partition partition : partitions) 
{
+                Map<String, String> spec = partition.spec();
+                List<String> partitionValues =
+                        tableSchema.partitionKeys().stream()
+                                .map(spec::get)
+                                .collect(Collectors.toList());
+
+                Map<String, String> statistic = new HashMap<>();
+                statistic.put(NUM_FILES_PROP, 
String.valueOf(partition.fileCount()));
+                statistic.put(TOTAL_SIZE_PROP, 
String.valueOf(partition.fileSizeInBytes()));
+                statistic.put(NUM_ROWS_PROP, 
String.valueOf(partition.recordCount()));
+
+                String modifyTimeSeconds = 
String.valueOf(partition.lastFileCreationTime() / 1000);
+                statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
+                // just for being compatible with hive metastore
+                statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
+                try {
+                    Partition hivePartition =
+                            clients.run(
+                                    client ->
+                                            client.getPartition(
+                                                    
identifier.getDatabaseName(),
+                                                    identifier.getObjectName(),
+                                                    partitionValues));
+                    hivePartition.setValues(partitionValues);
+                    hivePartition.setLastAccessTime(
+                            (int) (partition.lastFileCreationTime() / 1000));
+                    hivePartition.getParameters().putAll(statistic);
+                    clients.execute(
+                            client ->
+                                    client.alter_partition(
+                                            identifier.getDatabaseName(),
+                                            identifier.getObjectName(),
+                                            hivePartition));
+                } catch (NoSuchObjectException e) {
+                    // do nothing if the partition not exists
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<org.apache.paimon.partition.Partition> 
listPartitions(Identifier identifier)
+            throws TableNotExistException {
+        FileStoreTable table = (FileStoreTable) getTable(identifier);
+        String tagToPartitionField = table.coreOptions().tagToPartitionField();
+        if (tagToPartitionField != null) {
             try {
-                // Do not close client, it is for HiveCatalog
-                @SuppressWarnings("resource")
-                HiveMetastoreClient metastoreClient =
-                        new HiveMetastoreClient(
-                                new Identifier(
-                                        identifier.getDatabaseName(), 
identifier.getTableName()),
-                                clients);
-                metastoreClient.dropPartition(new 
LinkedHashMap<>(partitionSpec));
+                List<Partition> partitions =
+                        clients.run(
+                                client ->
+                                        client.listPartitions(
+                                                identifier.getDatabaseName(),
+                                                identifier.getTableName(),
+                                                Short.MAX_VALUE));
+                return partitions.stream()
+                        .map(
+                                part ->
+                                        new 
org.apache.paimon.partition.Partition(
+                                                Collections.singletonMap(
+                                                        tagToPartitionField,
+                                                        
part.getValues().get(0)),
+                                                1L,
+                                                1L,
+                                                1L,
+                                                System.currentTimeMillis()))
+                        .collect(Collectors.toList());
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
         }
-        super.dropPartition(identifier, partitionSpec);
+        return listPartitionsFromFileSystem(table);
     }
 
-    private boolean partitionExistsInOtherBranches(
-            Identifier identifier, Map<String, String> partitionSpec)
-            throws TableNotExistException {
+    private List<Map<String, String>> removePartitionsExistsInOtherBranches(
+            Identifier identifier, List<Map<String, String>> inputs) throws 
TableNotExistException {
         FileStoreTable mainTable =
                 (FileStoreTable)
                         getTable(
                                 new Identifier(
                                         identifier.getDatabaseName(), 
identifier.getTableName()));
+
+        InternalRowPartitionComputer partitionComputer =
+                new InternalRowPartitionComputer(
+                        mainTable.coreOptions().partitionDefaultName(),
+                        mainTable.rowType().project(mainTable.partitionKeys()),
+                        mainTable.partitionKeys().toArray(new String[0]),
+                        mainTable.coreOptions().legacyPartitionName());
         List<String> branchNames = new 
ArrayList<>(mainTable.branchManager().branches());
         branchNames.add(DEFAULT_MAIN_BRANCH);
 
+        Set<Map<String, String>> inputsToRemove = new HashSet<>(inputs);
         for (String branchName : branchNames) {
             if (branchName.equals(identifier.getBranchNameOrDefault())) {
                 continue;
@@ -389,12 +531,13 @@ public class HiveCatalog extends AbstractCatalog {
                 continue;
             }
 
-            FileStoreTable table = mainTable.switchToBranch(branchName);
-            if 
(!table.newScan().withPartitionFilter(partitionSpec).listPartitions().isEmpty())
 {
-                return true;
-            }
+            mainTable.switchToBranch(branchName).newScan()
+                    .withPartitionsFilter(new 
ArrayList<>(inputsToRemove)).listPartitions().stream()
+                    .map(partitionComputer::generatePartValues)
+                    .forEach(inputsToRemove::remove);
         }
-        return false;
+
+        return new ArrayList<>(inputsToRemove);
     }
 
     @Override
@@ -1464,4 +1607,30 @@ public class HiveCatalog extends AbstractCatalog {
             return DEFAULT_TABLE_BATCH_SIZE;
         }
     }
+
+    private List<Partition> toHivePartitions(
+            Identifier identifier,
+            String tablePath,
+            StorageDescriptor sd,
+            List<Map<String, String>> partitions,
+            int currentTime) {
+        List<Partition> hivePartitions = new ArrayList<>();
+        for (Map<String, String> partitionSpec : partitions) {
+            Partition hivePartition = new Partition();
+            StorageDescriptor newSd = new StorageDescriptor(sd);
+            newSd.setLocation(
+                    tablePath
+                            + "/"
+                            + PartitionPathUtils.generatePartitionPath(
+                                    new LinkedHashMap<>(partitionSpec)));
+            hivePartition.setDbName(identifier.getDatabaseName());
+            hivePartition.setTableName(identifier.getTableName());
+            hivePartition.setValues(new ArrayList<>(partitionSpec.values()));
+            hivePartition.setSd(newSd);
+            hivePartition.setCreateTime(currentTime);
+            hivePartition.setLastAccessTime(currentTime);
+            hivePartitions.add(hivePartition);
+        }
+        return hivePartitions;
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index e733ec16c8..d96fac808c 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.client.ClientPool;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.DataField;
@@ -55,6 +56,8 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
+import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
+import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION;
 import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -448,4 +451,56 @@ public class HiveCatalogTest extends CatalogTestBase {
 
         externalWarehouseCatalog.close();
     }
+
+    @Test
+    public void testTagToPartitionTable() throws Exception {
+        String databaseName = "testTagToPartitionTable";
+        catalog.dropDatabase(databaseName, true, true);
+        catalog.createDatabase(databaseName, true);
+        Identifier identifier = Identifier.create(databaseName, "table");
+        catalog.createTable(
+                identifier,
+                Schema.newBuilder()
+                        .option(METASTORE_TAG_TO_PARTITION.key(), "dt")
+                        .column("col", DataTypes.INT())
+                        .column("dt", DataTypes.STRING())
+                        .build(),
+                true);
+
+        catalog.createPartitions(
+                identifier,
+                Arrays.asList(
+                        Collections.singletonMap("dt", "20250101"),
+                        Collections.singletonMap("dt", "20250102")));
+        
assertThat(catalog.listPartitions(identifier).stream().map(Partition::spec))
+                .containsExactlyInAnyOrder(
+                        Collections.singletonMap("dt", "20250102"),
+                        Collections.singletonMap("dt", "20250101"));
+    }
+
+    @Test
+    public void testPartitionTable() throws Exception {
+        String databaseName = "testPartitionTable";
+        catalog.dropDatabase(databaseName, true, true);
+        catalog.createDatabase(databaseName, true);
+        Identifier identifier = Identifier.create(databaseName, "table");
+        catalog.createTable(
+                identifier,
+                Schema.newBuilder()
+                        .option(METASTORE_PARTITIONED_TABLE.key(), "true")
+                        .column("col", DataTypes.INT())
+                        .column("dt", DataTypes.STRING())
+                        .partitionKeys("dt")
+                        .build(),
+                true);
+
+        catalog.createPartitions(
+                identifier,
+                Arrays.asList(
+                        Collections.singletonMap("dt", "20250101"),
+                        Collections.singletonMap("dt", "20250102")));
+
+        // hive catalog list partitions from filesystem, so here return empty.
+        assertThat(catalog.listPartitions(identifier)).isEmpty();
+    }
 }

Reply via email to