This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a444e84be64 [feature](hive)add 'HmsCommiter' to support inserting data 
into hive table (#32283) (#32362)
a444e84be64 is described below

commit a444e84be6419cfc7df3b11919dbb022ae85ccc5
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Mar 18 10:59:32 2024 +0800

    [feature](hive)add 'HmsCommiter' to support inserting data into hive table 
(#32283) (#32362)
    
    bp #32283
    Co-authored-by: wuwenchi <[email protected]>
---
 fe/fe-core/pom.xml                                 |   4 +
 .../apache/doris/datasource/ExternalCatalog.java   |   5 +
 .../doris/datasource/hive/HMSCachedClient.java     |  16 +
 .../apache/doris/datasource/hive/HMSCommitter.java | 668 +++++++++++++++++++++
 .../datasource/hive/HiveColumnStatistics.java      |  30 +
 .../datasource/hive/HiveCommonStatistics.java      |  44 ++
 .../doris/datasource/hive/HiveMetadataOps.java     |  38 ++
 .../doris/datasource/hive/HivePartition.java       |  16 +
 .../datasource/hive/HivePartitionStatistics.java   | 117 ++++
 .../hive/HivePartitionWithStatistics.java          |  42 ++
 .../org/apache/doris/datasource/hive/HiveUtil.java |  64 +-
 .../hive/PostgreSQLJdbcHMSCachedClient.java        |  26 +
 .../datasource/hive/ThriftHMSCachedClient.java     | 137 +++++
 .../main/java/org/apache/doris/fs/FileSystem.java  |  21 +
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  |  89 ++-
 .../doris/datasource/hive/HmsCommitTest.java       | 247 ++++++++
 fe/pom.xml                                         |   6 +
 17 files changed, 1558 insertions(+), 12 deletions(-)

diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 1ce305a06e0..761b34e8d48 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -753,6 +753,10 @@ under the License.
             <groupId>org.immutables</groupId>
             <artifactId>value</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>concurrent</artifactId>
+        </dependency>
     </dependencies>
     <repositories>
         <!-- for huawei obs sdk -->
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 783f8a0fdfb..aa6ef1f14eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -135,6 +135,11 @@ public abstract class ExternalCatalog
         return conf;
     }
 
+    // only for test
+    public void setInitialized() {
+        initialized = true;
+    }
+
     /**
      * set some default properties when creating catalog
      * @return list of database names in this catalog
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
index c26de66058b..b55ed7bdf0e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 /**
  * A hive metastore client pool for a specific catalog with hive configuration.
@@ -90,4 +91,19 @@ public interface HMSCachedClient {
     void dropTable(String dbName, String tableName);
 
     void createTable(TableMetadata catalogTable, boolean ignoreIfExists);
+
+    void updateTableStatistics(
+            String dbName,
+            String tableName,
+            Function<HivePartitionStatistics, HivePartitionStatistics> update);
+
+    void updatePartitionStatistics(
+            String dbName,
+            String tableName,
+            String partitionName,
+            Function<HivePartitionStatistics, HivePartitionStatistics> update);
+
+    void addPartitions(String dbName, String tableName, 
List<HivePartitionWithStatistics> partitions);
+
+    void dropPartition(String dbName, String tableName, List<String> 
partitionValues, boolean deleteData);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java
new file mode 100644
index 00000000000..64abb985fcf
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java
@@ -0,0 +1,668 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java
+// and modified by Doris
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.Pair;
+import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.thrift.THivePartitionUpdate;
+import org.apache.doris.thrift.TUpdateMode;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.airlift.concurrent.MoreFutures;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.StringJoiner;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+public class HMSCommitter {
+    private static final Logger LOG = LogManager.getLogger(HMSCommitter.class);
+    private final HiveMetadataOps hiveOps;
+    private final RemoteFileSystem fs;
+    private final Table table;
+
+    // update statistics for unPartitioned table or existed partition
+    private final List<UpdateStatisticsTask> updateStatisticsTasks = new 
ArrayList<>();
+    Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16);
+
+    // add new partition
+    private final AddPartitionsTask addPartitionsTask = new 
AddPartitionsTask();
+    private static final int PARTITION_COMMIT_BATCH_SIZE = 20;
+
+    // for file system rename operation
+    // whether to cancel the file system tasks
+    private final AtomicBoolean fileSystemTaskCancelled = new 
AtomicBoolean(false);
+    // file system tasks that are executed asynchronously, including 
rename_file, rename_dir
+    private final List<CompletableFuture<?>> asyncFileSystemTaskFutures = new 
ArrayList<>();
+    // when aborted, we need to delete all files under this path, even the 
current directory
+    private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = 
new ConcurrentLinkedQueue<>();
+    // when aborted, we need restore directory
+    private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new 
ArrayList<>();
+    Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
+
+    public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table 
table) {
+        this.hiveOps = hiveOps;
+        this.fs = fs;
+        this.table = table;
+    }
+
+    public void commit(List<THivePartitionUpdate> hivePUs) {
+        try {
+            prepare(mergePartitions(hivePUs));
+            doCommit();
+        } catch (Throwable t) {
+            LOG.warn("Failed to commit for {}.{}, abort it.", 
table.getDbName(), table.getTableName());
+            try {
+                cancelUnStartedAsyncFileSystemTask();
+                undoUpdateStatisticsTasks();
+                undoAddPartitionsTask();
+                waitForAsyncFileSystemTaskSuppressThrowable();
+                runDirectoryClearUpTasksForAbort();
+                runRenameDirTasksForAbort();
+            } catch (Throwable e) {
+                t.addSuppressed(new Exception("Failed to roll back after 
commit failure", e));
+            }
+            throw t;
+        }
+    }
+
+    public void prepare(List<THivePartitionUpdate> hivePUs) {
+
+        List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
insertExistsPartitions = new ArrayList<>();
+
+        for (THivePartitionUpdate pu : hivePUs) {
+            TUpdateMode updateMode = pu.getUpdateMode();
+            HivePartitionStatistics hivePartitionStatistics = 
HivePartitionStatistics.fromCommonStatistics(
+                    pu.getRowCount(),
+                    pu.getFileNamesSize(),
+                    pu.getFileSize());
+            if (table.getPartitionKeysSize() == 0) {
+                Preconditions.checkArgument(hivePUs.size() == 1,
+                        "When updating a non-partitioned table, multiple 
partitions should not be written");
+                switch (updateMode) {
+                    case APPEND:
+                        prepareAppendTable(pu, hivePartitionStatistics);
+                        break;
+                    case OVERWRITE:
+                        prepareOverwriteTable(pu, hivePartitionStatistics);
+                        break;
+                    default:
+                        throw new RuntimeException("Not support mode:[" + 
updateMode + "] in unPartitioned table");
+                }
+            } else {
+                switch (updateMode) {
+                    case NEW:
+                        prepareCreateNewPartition(pu, hivePartitionStatistics);
+                        break;
+                    case APPEND:
+                        insertExistsPartitions.add(Pair.of(pu, 
hivePartitionStatistics));
+                        break;
+                    case OVERWRITE:
+                        prepareOverwritePartition(pu, hivePartitionStatistics);
+                        break;
+                    default:
+                        throw new RuntimeException("Not support mode:[" + 
updateMode + "] in unPartitioned table");
+                }
+            }
+        }
+
+        if (!insertExistsPartitions.isEmpty()) {
+            prepareInsertExistPartition(insertExistsPartitions);
+        }
+    }
+
+    public List<THivePartitionUpdate> 
mergePartitions(List<THivePartitionUpdate> hivePUs) {
+        Map<String, THivePartitionUpdate> mm = new HashMap<>();
+        for (THivePartitionUpdate pu : hivePUs) {
+            if (mm.containsKey(pu.getName())) {
+                THivePartitionUpdate old = mm.get(pu.getName());
+                old.setFileSize(old.getFileSize() + pu.getFileSize());
+                old.setRowCount(old.getRowCount() + pu.getRowCount());
+                old.getFileNames().addAll(pu.getFileNames());
+            } else {
+                mm.put(pu.getName(), pu);
+            }
+        }
+        return new ArrayList<>(mm.values());
+    }
+
+    public void doCommit() {
+        waitForAsyncFileSystemTasks();
+        doAddPartitionsTask();
+        doUpdateStatisticsTasks();
+    }
+
+    public void rollback() {
+
+    }
+
+    public void cancelUnStartedAsyncFileSystemTask() {
+        fileSystemTaskCancelled.set(true);
+    }
+
+    private void undoUpdateStatisticsTasks() {
+        ImmutableList.Builder<CompletableFuture<?>> undoUpdateFutures = 
ImmutableList.builder();
+        for (UpdateStatisticsTask task : updateStatisticsTasks) {
+            undoUpdateFutures.add(CompletableFuture.runAsync(() -> {
+                try {
+                    task.undo(hiveOps);
+                } catch (Throwable throwable) {
+                    LOG.warn("Failed to rollback: {}", task.getDescription(), 
throwable);
+                }
+            }, updateStatisticsExecutor));
+        }
+
+        for (CompletableFuture<?> undoUpdateFuture : 
undoUpdateFutures.build()) {
+            MoreFutures.getFutureValue(undoUpdateFuture);
+        }
+    }
+
+    private void undoAddPartitionsTask() {
+        if (addPartitionsTask.isEmpty()) {
+            return;
+        }
+
+        HivePartition firstPartition = 
addPartitionsTask.getPartitions().get(0).getPartition();
+        String dbName = firstPartition.getDbName();
+        String tableName = firstPartition.getTblName();
+        List<List<String>> rollbackFailedPartitions = 
addPartitionsTask.rollback(hiveOps);
+        if (!rollbackFailedPartitions.isEmpty()) {
+            LOG.warn("Failed to rollback: add_partition for partition values 
{}.{}.{}",
+                    dbName, tableName, rollbackFailedPartitions);
+        }
+    }
+
+    private void waitForAsyncFileSystemTaskSuppressThrowable() {
+        for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (Throwable t) {
+                // ignore
+            }
+        }
+    }
+
+    public void prepareAppendTable(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
+        String targetPath = pu.getLocation().getTargetPath();
+        String writePath = pu.getLocation().getWritePath();
+        if (!targetPath.equals(writePath)) {
+            fs.asyncRename(
+                    fileSystemExecutor,
+                    asyncFileSystemTaskFutures,
+                    fileSystemTaskCancelled,
+                    writePath,
+                    targetPath,
+                    pu.getFileNames());
+        }
+        directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, 
false));
+        updateStatisticsTasks.add(
+            new UpdateStatisticsTask(
+                table.getDbName(),
+                table.getTableName(),
+                Optional.empty(),
+                ps,
+                true
+            ));
+    }
+
+    public void prepareOverwriteTable(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
+
+    }
+
+    public void prepareCreateNewPartition(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
+
+        String targetPath = pu.getLocation().getTargetPath();
+        String writePath = pu.getLocation().getWritePath();
+
+        if (!targetPath.equals(writePath)) {
+            fs.asyncRenameDir(
+                    fileSystemExecutor,
+                    asyncFileSystemTaskFutures,
+                    fileSystemTaskCancelled,
+                    writePath,
+                    targetPath,
+                    () -> directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, true)));
+        }
+
+        StorageDescriptor sd = table.getSd();
+
+        HivePartition hivePartition = new HivePartition(
+                table.getDbName(),
+                table.getTableName(),
+                false,
+                sd.getInputFormat(),
+                pu.getLocation().getTargetPath(),
+                HiveUtil.toPartitionValues(pu.getName()),
+                Maps.newHashMap(),
+                sd.getOutputFormat(),
+                sd.getSerdeInfo().getSerializationLib(),
+                hiveOps.getClient().getSchema(table.getDbName(), 
table.getTableName())
+        );
+        HivePartitionWithStatistics partitionWithStats =
+                new HivePartitionWithStatistics(pu.getName(), hivePartition, 
ps);
+        addPartitionsTask.addPartition(partitionWithStats);
+    }
+
+    public void prepareInsertExistPartition(List<Pair<THivePartitionUpdate, 
HivePartitionStatistics>> partitions) {
+        for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
partitionBatch :
+                    Iterables.partition(partitions, 100)) {
+            List<String> partitionNames = partitionBatch.stream()
+                    .map(pair -> pair.first.getName())
+                    .collect(Collectors.toList());
+
+            Map<String, Partition> partitionsByNamesMap = 
HiveUtil.convertToNamePartitionMap(
+                    partitionNames,
+                    hiveOps.getClient().getPartitions(table.getDbName(), 
table.getTableName(), partitionNames));
+
+            for (int i = 0; i < partitionsByNamesMap.size(); i++) {
+                String partitionName = partitionNames.get(i);
+                if (partitionsByNamesMap.get(partitionName) == null) {
+                    // Prevent this partition from being deleted by other 
engines
+                    throw new RuntimeException("Not found partition: " + 
partitionName);
+                }
+
+                THivePartitionUpdate pu = partitionBatch.get(i).first;
+                HivePartitionStatistics updateStats = 
partitionBatch.get(i).second;
+
+                String writePath = pu.getLocation().getWritePath();
+                String targetPath = pu.getLocation().getTargetPath();
+                directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, false));
+
+                if (!targetPath.equals(writePath)) {
+                    fs.asyncRename(
+                            fileSystemExecutor,
+                            asyncFileSystemTaskFutures,
+                            fileSystemTaskCancelled,
+                            writePath,
+                            targetPath,
+                            pu.getFileNames());
+                }
+
+                updateStatisticsTasks.add(
+                    new UpdateStatisticsTask(
+                            table.getDbName(),
+                            table.getTableName(),
+                            Optional.of(pu.getName()),
+                            updateStats,
+                            true));
+            }
+        }
+    }
+
+
+    public void prepareOverwritePartition(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
+
+    }
+
+
+    private void waitForAsyncFileSystemTasks() {
+        for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
+            MoreFutures.getFutureValue(future, RuntimeException.class);
+        }
+    }
+
+    private void doAddPartitionsTask() {
+        if (!addPartitionsTask.isEmpty()) {
+            addPartitionsTask.run(hiveOps);
+        }
+    }
+
+    private void doUpdateStatisticsTasks() {
+        ImmutableList.Builder<CompletableFuture<?>> updateStatsFutures = 
ImmutableList.builder();
+        List<String> failedTaskDescriptions = new ArrayList<>();
+        List<Throwable> suppressedExceptions = new ArrayList<>();
+        for (UpdateStatisticsTask task : updateStatisticsTasks) {
+            updateStatsFutures.add(CompletableFuture.runAsync(() -> {
+                try {
+                    task.run(hiveOps);
+                } catch (Throwable t) {
+                    synchronized (suppressedExceptions) {
+                        addSuppressedExceptions(suppressedExceptions, t, 
failedTaskDescriptions, task.getDescription());
+                    }
+                }
+            }, updateStatisticsExecutor));
+        }
+
+        for (CompletableFuture<?> executeUpdateFuture : 
updateStatsFutures.build()) {
+            MoreFutures.getFutureValue(executeUpdateFuture);
+        }
+        if (!suppressedExceptions.isEmpty()) {
+            StringBuilder message = new StringBuilder();
+            message.append("Failed to execute some updating statistics tasks: 
");
+            Joiner.on("; ").appendTo(message, failedTaskDescriptions);
+            RuntimeException exception = new 
RuntimeException(message.toString());
+            suppressedExceptions.forEach(exception::addSuppressed);
+            throw exception;
+        }
+    }
+
+    private static void addSuppressedExceptions(
+            List<Throwable> suppressedExceptions,
+            Throwable t,
+            List<String> descriptions,
+            String description) {
+        descriptions.add(description);
+        // A limit is needed to avoid having a huge exception object. 5 was 
chosen arbitrarily.
+        if (suppressedExceptions.size() < 5) {
+            suppressedExceptions.add(t);
+        }
+    }
+
+    private static class AddPartition {
+
+    }
+
+    private static class UpdateStatisticsTask {
+        private final String dbName;
+        private final String tableName;
+        private final Optional<String> partitionName;
+        private final HivePartitionStatistics updatePartitionStat;
+        private final boolean merge;
+
+        private boolean done;
+
+        public UpdateStatisticsTask(String dbName, String tableName, 
Optional<String> partitionName,
+                                    HivePartitionStatistics statistics, 
boolean merge) {
+            this.dbName = Objects.requireNonNull(dbName, "dbName is null");
+            this.tableName = Objects.requireNonNull(tableName, "tableName is 
null");
+            this.partitionName = Objects.requireNonNull(partitionName, 
"partitionName is null");
+            this.updatePartitionStat = Objects.requireNonNull(statistics, 
"statistics is null");
+            this.merge = merge;
+        }
+
+        public void run(HiveMetadataOps hiveOps) {
+            if (partitionName.isPresent()) {
+                hiveOps.updatePartitionStatistics(dbName, tableName, 
partitionName.get(), this::updateStatistics);
+            } else {
+                hiveOps.updateTableStatistics(dbName, tableName, 
this::updateStatistics);
+            }
+            done = true;
+        }
+
+        public void undo(HiveMetadataOps hmsOps) {
+            if (!done) {
+                return;
+            }
+            if (partitionName.isPresent()) {
+                hmsOps.updatePartitionStatistics(dbName, tableName, 
partitionName.get(), this::resetStatistics);
+            } else {
+                hmsOps.updateTableStatistics(dbName, tableName, 
this::resetStatistics);
+            }
+        }
+
+        public String getDescription() {
+            if (partitionName.isPresent()) {
+                return "alter partition parameters " + tableName + " " + 
partitionName.get();
+            } else {
+                return "alter table parameters " +  tableName;
+            }
+        }
+
+        private HivePartitionStatistics 
updateStatistics(HivePartitionStatistics currentStats) {
+            return merge ? HivePartitionStatistics.merge(currentStats, 
updatePartitionStat) : updatePartitionStat;
+        }
+
+        private HivePartitionStatistics 
resetStatistics(HivePartitionStatistics currentStatistics) {
+            return HivePartitionStatistics
+                    .reduce(currentStatistics, updatePartitionStat, 
HivePartitionStatistics.ReduceOperator.SUBTRACT);
+        }
+    }
+
+    public static class AddPartitionsTask {
+        private final List<HivePartitionWithStatistics> partitions = new 
ArrayList<>();
+        private final List<List<String>> createdPartitionValues = new 
ArrayList<>();
+
+        public boolean isEmpty() {
+            return partitions.isEmpty();
+        }
+
+        public List<HivePartitionWithStatistics> getPartitions() {
+            return partitions;
+        }
+
+        public void addPartition(HivePartitionWithStatistics partition) {
+            partitions.add(partition);
+        }
+
+        public void run(HiveMetadataOps hiveOps) {
+            HivePartition firstPartition = partitions.get(0).getPartition();
+            String dbName = firstPartition.getDbName();
+            String tableName = firstPartition.getTblName();
+            List<List<HivePartitionWithStatistics>> batchedPartitions =
+                    Lists.partition(partitions, PARTITION_COMMIT_BATCH_SIZE);
+            for (List<HivePartitionWithStatistics> batch : batchedPartitions) {
+                try {
+                    hiveOps.addPartitions(dbName, tableName, batch);
+                    for (HivePartitionWithStatistics partition : batch) {
+                        
createdPartitionValues.add(partition.getPartition().getPartitionValues());
+                    }
+                } catch (Throwable t) {
+                    LOG.error("Failed to add partition", t);
+                    throw t;
+                }
+            }
+            partitions.clear();
+        }
+
+        public List<List<String>> rollback(HiveMetadataOps hiveOps) {
+            HivePartition firstPartition = partitions.get(0).getPartition();
+            String dbName = firstPartition.getDbName();
+            String tableName = firstPartition.getTblName();
+            List<List<String>> rollbackFailedPartitions = new ArrayList<>();
+            for (List<String> createdPartitionValue : createdPartitionValues) {
+                try {
+                    hiveOps.dropPartition(dbName, tableName, 
createdPartitionValue, false);
+                } catch (Throwable t) {
+                    LOG.warn("Failed to drop partition on {}.{}.{} when 
rollback",
+                            dbName, tableName, rollbackFailedPartitions);
+                    rollbackFailedPartitions.add(createdPartitionValue);
+                }
+            }
+            return rollbackFailedPartitions;
+        }
+    }
+
+    private static class DirectoryCleanUpTask {
+        private final Path path;
+        private final boolean deleteEmptyDir;
+
+        public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) {
+            this.path = new Path(path);
+            this.deleteEmptyDir = deleteEmptyDir;
+        }
+
+        public Path getPath() {
+            return path;
+        }
+
+        public boolean isDeleteEmptyDir() {
+            return deleteEmptyDir;
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", 
DirectoryCleanUpTask.class.getSimpleName() + "[", "]")
+                .add("path=" + path)
+                .add("deleteEmptyDir=" + deleteEmptyDir)
+                .toString();
+        }
+    }
+
+    public static class DeleteRecursivelyResult {
+        private final boolean dirNoLongerExists;
+        private final List<String> notDeletedEligibleItems;
+
+        public DeleteRecursivelyResult(boolean dirNoLongerExists, List<String> 
notDeletedEligibleItems) {
+            this.dirNoLongerExists = dirNoLongerExists;
+            this.notDeletedEligibleItems = notDeletedEligibleItems;
+        }
+
+        public boolean dirNotExists() {
+            return dirNoLongerExists;
+        }
+
+        public List<String> getNotDeletedEligibleItems() {
+            return notDeletedEligibleItems;
+        }
+    }
+
+    private void runDirectoryClearUpTasksForAbort() {
+        for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) 
{
+            recursiveDeleteItems(cleanUpTask.getPath(), 
cleanUpTask.isDeleteEmptyDir());
+        }
+    }
+
+    private static class RenameDirectoryTask {
+        private final String renameFrom;
+        private final String renameTo;
+
+        public RenameDirectoryTask(String renameFrom, String renameTo) {
+            this.renameFrom = renameFrom;
+            this.renameTo = renameTo;
+        }
+
+        public String getRenameFrom() {
+            return renameFrom;
+        }
+
+        public String getRenameTo() {
+            return renameTo;
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", 
RenameDirectoryTask.class.getSimpleName() + "[", "]")
+                .add("renameFrom:" + renameFrom)
+                .add("renameTo:" + renameTo)
+                .toString();
+        }
+    }
+
+    private void runRenameDirTasksForAbort() {
+        // TODO abort
+    }
+
+
+    private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) {
+        DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, 
deleteEmptyDir);
+
+        if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
+            LOG.error("Failed to delete directory {}. Some eligible items 
can't be deleted: {}.",
+                    directory.toString(), 
deleteResult.getNotDeletedEligibleItems());
+        } else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
+            LOG.error("Failed to delete directory {} due to dir isn't empty", 
directory.toString());
+        }
+    }
+
+    public DeleteRecursivelyResult recursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir) {
+        try {
+            if (!fs.exists(directory.getName()).ok()) {
+                return new DeleteRecursivelyResult(true, ImmutableList.of());
+            }
+        } catch (Exception e) {
+            ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
+            notDeletedEligibleItems.add(directory.toString() + "/*");
+            return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
+        }
+
+        return doRecursiveDeleteFiles(directory, deleteEmptyDir);
+    }
+
+    private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir) {
+        List<RemoteFile> remoteFiles = new ArrayList<>();
+
+        Status status = fs.list(directory.getName(), remoteFiles);
+        if (!status.ok()) {
+            ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
+            notDeletedEligibleItems.add(directory + "/*");
+            return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
+        }
+
+        boolean isEmptyDir = true;
+        List<String> notDeletedEligibleItems = new ArrayList<>();
+        for (RemoteFile file : remoteFiles) {
+            if (file.isFile()) {
+                Path filePath = file.getPath();
+                isEmptyDir = false;
+                // TODO Check if this file was created by this query
+                if (!deleteIfExists(filePath)) {
+                    notDeletedEligibleItems.add(filePath.toString());
+                }
+            } else if (file.isDirectory()) {
+                DeleteRecursivelyResult subResult = 
doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir);
+                if (!subResult.dirNotExists()) {
+                    isEmptyDir = false;
+                }
+                if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
+                    
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
+                }
+            } else {
+                isEmptyDir = false;
+                notDeletedEligibleItems.add(file.getPath().toString());
+            }
+        }
+
+        if (isEmptyDir && deleteEmptyDir) {
+            Verify.verify(notDeletedEligibleItems.isEmpty());
+            if (!deleteIfExists(directory)) {
+                return new DeleteRecursivelyResult(false, 
ImmutableList.of(directory + "/"));
+            }
+            // all items of the location have been deleted.
+            return new DeleteRecursivelyResult(true, ImmutableList.of());
+        }
+
+        return new DeleteRecursivelyResult(false, notDeletedEligibleItems);
+    }
+
+    public boolean deleteIfExists(Path path) {
+        Status status = fs.delete(path.getName());
+        if (status.ok()) {
+            return true;
+        }
+        return !fs.exists(path.getName()).ok();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveColumnStatistics.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveColumnStatistics.java
new file mode 100644
index 00000000000..96cb6b5728b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveColumnStatistics.java
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+
+public class HiveColumnStatistics {
+
+    private long totalSizeBytes;
+    private long numNulls;
+    private long ndv;
+    private final double min = Double.NEGATIVE_INFINITY;
+    private final double max = Double.POSITIVE_INFINITY;
+
+    // TODO add hive column statistics
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCommonStatistics.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCommonStatistics.java
new file mode 100644
index 00000000000..3d8fb2512aa
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCommonStatistics.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+public class HiveCommonStatistics {
+    public static final HiveCommonStatistics EMPTY = new 
HiveCommonStatistics(0L, 0L, 0L);
+
+    private final long rowCount;
+    private final long fileCount;
+    private final long totalFileBytes;
+
+    public HiveCommonStatistics(long rowCount, long fileCount, long 
totalFileBytes) {
+        this.fileCount = fileCount;
+        this.rowCount = rowCount;
+        this.totalFileBytes = totalFileBytes;
+    }
+
+    public long getRowCount() {
+        return rowCount;
+    }
+
+    public long getFileCount() {
+        return fileCount;
+    }
+
+    public long getTotalFileBytes() {
+        return totalFileBytes;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 6779a602cbf..886d6d76fa8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -30,16 +30,21 @@ import org.apache.doris.datasource.ExternalDatabase;
 import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.operations.ExternalMetadataOps;
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+import org.apache.doris.thrift.THivePartitionUpdate;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 public class HiveMetadataOps implements ExternalMetadataOps {
     private static final Logger LOG = 
LogManager.getLogger(HiveMetadataOps.class);
@@ -48,6 +53,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
     private HiveConf hiveConf;
     private HMSExternalCatalog catalog;
     private HMSCachedClient client;
+    private final RemoteFileSystem fs;
 
     public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig 
jdbcClientConfig, HMSExternalCatalog catalog) {
         this.catalog = catalog;
@@ -55,6 +61,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
         this.jdbcClientConfig = jdbcClientConfig;
         this.client = createCachedClient(hiveConf,
                 Math.max(MIN_CLIENT_POOL_SIZE, 
Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig);
+        this.fs = new DFSFileSystem(catalog.getProperties());
     }
 
     public HMSCachedClient getClient() {
@@ -176,4 +183,35 @@ public class HiveMetadataOps implements 
ExternalMetadataOps {
     public List<String> listDatabaseNames() {
         return client.getAllDatabases();
     }
+
+    public void commit(String dbName,
+                       String tableName,
+                       List<THivePartitionUpdate> hivePUs) {
+        Table table = client.getTable(dbName, tableName);
+        HMSCommitter hmsCommitter = new HMSCommitter(this, fs, table);
+        hmsCommitter.commit(hivePUs);
+    }
+
+    public void updateTableStatistics(
+            String dbName,
+            String tableName,
+            Function<HivePartitionStatistics, HivePartitionStatistics> update) 
{
+        client.updateTableStatistics(dbName, tableName, update);
+    }
+
+    void updatePartitionStatistics(
+            String dbName,
+            String tableName,
+            String partitionName,
+            Function<HivePartitionStatistics, HivePartitionStatistics> update) 
{
+        client.updatePartitionStatistics(dbName, tableName, partitionName, 
update);
+    }
+
+    public void addPartitions(String dbName, String tableName, 
List<HivePartitionWithStatistics> partitions) {
+        client.addPartitions(dbName, tableName, partitions);
+    }
+
+    public void dropPartition(String dbName, String tableName, List<String> 
partitionValues, boolean deleteData) {
+        client.dropPartition(dbName, tableName, partitionValues, deleteData);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
index 0663edb48df..a9d97b40628 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
 
 import com.google.common.base.Preconditions;
 import lombok.Data;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 
 import java.util.List;
 import java.util.Map;
@@ -37,7 +38,11 @@ public class HivePartition {
     private List<String> partitionValues;
     private boolean isDummyPartition;
     private Map<String, String> parameters;
+    private String outputFormat;
+    private String serde;
+    private List<FieldSchema> columns;
 
+    // If you want to read the data under a partition, you can use this 
constructor
     public HivePartition(String dbName, String tblName, boolean 
isDummyPartition,
             String inputFormat, String path, List<String> partitionValues, 
Map<String, String> parameters) {
         this.dbName = dbName;
@@ -52,6 +57,17 @@ public class HivePartition {
         this.parameters = parameters;
     }
 
+    // If you want to update hms with partition, then you can use this 
constructor,
+    // as updating hms requires some additional information, such as 
outputFormat and so on
+    public HivePartition(String dbName, String tblName, boolean 
isDummyPartition,
+                         String inputFormat, String path, List<String> 
partitionValues, Map<String, String> parameters,
+                         String outputFormat, String serde, List<FieldSchema> 
columns) {
+        this(dbName, tblName, isDummyPartition, inputFormat, path, 
partitionValues, parameters);
+        this.outputFormat = outputFormat;
+        this.serde = serde;
+        this.columns = columns;
+    }
+
     // return partition name like: nation=cn/city=beijing
     public String getPartitionName(List<Column> partColumns) {
         Preconditions.checkState(partColumns.size() == partitionValues.size());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
new file mode 100644
index 00000000000..49b14504750
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/Statistics.java
+// and modified by Doris
+
+package org.apache.doris.datasource.hive;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+public class HivePartitionStatistics {
+    private static final HivePartitionStatistics EMPTY =
+            new HivePartitionStatistics(HiveCommonStatistics.EMPTY, 
ImmutableMap.of());
+
+    private final HiveCommonStatistics commonStatistics;
+    private final Map<String, HiveColumnStatistics> columnStatisticsMap;
+
+    public HivePartitionStatistics(
+            HiveCommonStatistics commonStatistics,
+            Map<String, HiveColumnStatistics> columnStatisticsMap) {
+        this.commonStatistics = commonStatistics;
+        this.columnStatisticsMap = columnStatisticsMap;
+    }
+
+    public HiveCommonStatistics getCommonStatistics() {
+        return commonStatistics;
+    }
+
+    public Map<String, HiveColumnStatistics> getColumnStatisticsMap() {
+        return columnStatisticsMap;
+    }
+
+    public static HivePartitionStatistics fromCommonStatistics(long rowCount, 
long fileCount, long totalFileBytes) {
+        return new HivePartitionStatistics(
+                new HiveCommonStatistics(rowCount, fileCount, totalFileBytes),
+                ImmutableMap.of()
+        );
+    }
+
+    // only used to update the parameters of partition or table.
+    public static HivePartitionStatistics merge(HivePartitionStatistics 
current, HivePartitionStatistics update) {
+        if (current.getCommonStatistics().getRowCount() <= 0) {
+            return update;
+        } else if (update.getCommonStatistics().getRowCount() <= 0) {
+            return current;
+        }
+
+        return new HivePartitionStatistics(
+            reduce(current.getCommonStatistics(), 
update.getCommonStatistics(), ReduceOperator.ADD),
+            // TODO merge columnStatisticsMap
+            current.getColumnStatisticsMap());
+    }
+
+    public static HivePartitionStatistics reduce(
+            HivePartitionStatistics first,
+            HivePartitionStatistics second,
+            ReduceOperator operator) {
+        HiveCommonStatistics left = first.getCommonStatistics();
+        HiveCommonStatistics right = second.getCommonStatistics();
+        return HivePartitionStatistics.fromCommonStatistics(
+            reduce(left.getRowCount(), right.getRowCount(), operator),
+            reduce(left.getFileCount(), right.getFileCount(), operator),
+            reduce(left.getTotalFileBytes(), right.getTotalFileBytes(), 
operator));
+    }
+
+    public static HiveCommonStatistics reduce(
+            HiveCommonStatistics current,
+            HiveCommonStatistics update,
+            ReduceOperator operator) {
+        return new HiveCommonStatistics(
+            reduce(current.getRowCount(), update.getRowCount(), operator),
+            reduce(current.getFileCount(), update.getFileCount(), operator),
+            reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), 
operator));
+    }
+
+    public static long reduce(long current, long update, ReduceOperator 
operator) {
+        if (current >= 0 && update >= 0) {
+            switch (operator) {
+                case ADD:
+                    return current + update;
+                case SUBTRACT:
+                    return current - update;
+                case MAX:
+                    return Math.max(current, update);
+                case MIN:
+                    return Math.min(current, update);
+                default:
+                    throw new IllegalArgumentException("Unexpected operator: " 
+ operator);
+            }
+        }
+
+        return 0;
+    }
+
+    public enum ReduceOperator {
+        ADD,
+        SUBTRACT,
+        MIN,
+        MAX,
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
new file mode 100644
index 00000000000..b7c28b68ff0
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+public class HivePartitionWithStatistics {
+    private String name;
+    private HivePartition partition;
+    private HivePartitionStatistics statistics;
+
+    public HivePartitionWithStatistics(String name, HivePartition partition, 
HivePartitionStatistics statistics) {
+        this.name = name;
+        this.partition = partition;
+        this.statistics = statistics;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public HivePartition getPartition() {
+        return partition;
+    }
+
+    public HivePartitionStatistics getStatistics() {
+        return statistics;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
index 2fdd4f21f39..eb107464bfc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
@@ -22,6 +22,10 @@ import org.apache.doris.fs.remote.BrokerFileSystem;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
@@ -32,6 +36,9 @@ import org.apache.hadoop.util.ReflectionUtils;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Hive util for create or query hive table.
@@ -44,14 +51,14 @@ public final class HiveUtil {
     /**
      * get input format class from inputFormatName.
      *
-     * @param jobConf jobConf used when getInputFormatClass
+     * @param jobConf         jobConf used when getInputFormatClass
      * @param inputFormatName inputFormat class name
-     * @param symlinkTarget use target inputFormat class when inputFormat is 
SymlinkTextInputFormat
+     * @param symlinkTarget   use target inputFormat class when inputFormat is 
SymlinkTextInputFormat
      * @return a class of inputFormat.
      * @throws UserException when class not found.
      */
     public static InputFormat<?, ?> getInputFormat(JobConf jobConf,
-            String inputFormatName, boolean symlinkTarget) throws 
UserException {
+                                                   String inputFormatName, 
boolean symlinkTarget) throws UserException {
         try {
             Class<? extends InputFormat<?, ?>> inputFormatClass = 
getInputFormatClass(jobConf, inputFormatName);
             if (symlinkTarget && (inputFormatClass == 
SymlinkTextInputFormat.class)) {
@@ -99,4 +106,55 @@ public final class HiveUtil {
             throw new RuntimeException(e);
         }
     }
+
+    // "c1=a/c2=b/c3=c" ---> List("a","b","c")
+    public static List<String> toPartitionValues(String partitionName) {
+        ImmutableList.Builder<String> resultBuilder = ImmutableList.builder();
+        int start = 0;
+        while (true) {
+            while (start < partitionName.length() && 
partitionName.charAt(start) != '=') {
+                start++;
+            }
+            start++;
+            int end = start;
+            while (end < partitionName.length() && partitionName.charAt(end) 
!= '/') {
+                end++;
+            }
+            if (start > partitionName.length()) {
+                break;
+            }
+            
resultBuilder.add(FileUtils.unescapePathName(partitionName.substring(start, 
end)));
+            start = end + 1;
+        }
+        return resultBuilder.build();
+    }
+
+    // List("c1=a/c2=b/c3=c", "c1=a/c2=b/c3=d")
+    //           |
+    //           |
+    //           v
+    // Map(
+    //      key:"c1=a/c2=b/c3=c", value:Partition(values=List(a,b,c))
+    //      key:"c1=a/c2=b/c3=d", value:Partition(values=List(a,b,d))
+    //    )
+    public static Map<String, Partition> convertToNamePartitionMap(
+            List<String> partitionNames,
+            List<Partition> partitions) {
+
+        Map<String, List<String>> partitionNameToPartitionValues =
+                partitionNames
+                    .stream()
+                    .collect(Collectors.toMap(partitionName -> partitionName, 
HiveUtil::toPartitionValues));
+
+        Map<List<String>, Partition> partitionValuesToPartition =
+                partitions.stream()
+                    .collect(Collectors.toMap(Partition::getValues, partition 
-> partition));
+
+        ImmutableMap.Builder<String, Partition> resultBuilder = 
ImmutableMap.builder();
+        for (Map.Entry<String, List<String>> entry : 
partitionNameToPartitionValues.entrySet()) {
+            Partition partition = 
partitionValuesToPartition.get(entry.getValue());
+            resultBuilder.put(entry.getKey(), partition);
+        }
+        return resultBuilder.build();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
index e587debdb35..c18fa30189c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
@@ -53,6 +53,7 @@ import java.sql.ResultSet;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient {
@@ -519,6 +520,31 @@ public class PostgreSQLJdbcHMSCachedClient extends 
JdbcHMSCachedClient {
         throw new NotImplementedException("PostgreSQL createTable not 
implemented");
     }
 
+    @Override
+    public void updateTableStatistics(String dbName,
+                                      String tableName,
+                                      Function<HivePartitionStatistics, 
HivePartitionStatistics> update) {
+        throw new HMSClientException("Do not support in 
PostgreSQLJdbcHMSCachedClient.");
+    }
+
+    @Override
+    public void updatePartitionStatistics(String dbName,
+                                          String tableName,
+                                          String partitionName,
+                                          Function<HivePartitionStatistics, 
HivePartitionStatistics> update) {
+        throw new HMSClientException("Do not support in 
PostgreSQLJdbcHMSCachedClient.");
+    }
+
+    @Override
+    public void addPartitions(String dbName, String tableName, 
List<HivePartitionWithStatistics> partitions) {
+        throw new HMSClientException("Do not support in 
PostgreSQLJdbcHMSCachedClient.");
+    }
+
+    @Override
+    public void dropPartition(String dbName, String tableName, List<String> 
partitionValues, boolean deleteData) {
+        throw new HMSClientException("Do not support in 
PostgreSQLJdbcHMSCachedClient.");
+    }
+
     public void dropTable(String dbName, String tblName) {
         throw new NotImplementedException("PostgreSQL dropTable not 
implemented");
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index c8207906624..cb5328395ce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -28,8 +28,12 @@ import 
org.apache.doris.datasource.property.constants.HMSProperties;
 import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
 import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
@@ -73,6 +77,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * This class uses the thrift protocol to directly access the HiveMetaStore 
service
@@ -647,4 +653,135 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
     private <T> T ugiDoAs(PrivilegedExceptionAction<T> action) {
         return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action);
     }
+
+    @Override
+    public void updateTableStatistics(
+            String dbName,
+            String tableName,
+            Function<HivePartitionStatistics, HivePartitionStatistics> update) 
{
+        try (ThriftHMSClient client = getClient()) {
+
+            Table originTable = getTable(dbName, tableName);
+            Map<String, String> originParams = originTable.getParameters();
+            HivePartitionStatistics updatedStats = 
update.apply(toHivePartitionStatistics(originParams));
+
+            Table newTable = originTable.deepCopy();
+            Map<String, String> newParams =
+                    updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
+            newParams.put("transient_lastDdlTime", 
String.valueOf(System.currentTimeMillis() / 1000));
+            newTable.setParameters(newParams);
+            client.client.alter_table(dbName, tableName, newTable);
+        } catch (Exception e) {
+            throw new RuntimeException("failed to update table statistics for 
" + dbName + "." + tableName);
+        }
+    }
+
+    @Override
+    public void updatePartitionStatistics(
+            String dbName,
+            String tableName,
+            String partitionName,
+            Function<HivePartitionStatistics, HivePartitionStatistics> update) 
{
+        try (ThriftHMSClient client = getClient()) {
+            List<Partition> partitions = client.client.getPartitionsByNames(
+                    dbName, tableName, ImmutableList.of(partitionName));
+            if (partitions.size() != 1) {
+                throw new RuntimeException("Metastore returned multiple 
partitions for name: " + partitionName);
+            }
+
+            Partition originPartition = partitions.get(0);
+            Map<String, String> originParams = originPartition.getParameters();
+            HivePartitionStatistics updatedStats = 
update.apply(toHivePartitionStatistics(originParams));
+
+            Partition modifiedPartition = originPartition.deepCopy();
+            Map<String, String> newParams =
+                    updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
+            newParams.put("transient_lastDdlTime", 
String.valueOf(System.currentTimeMillis() / 1000));
+            modifiedPartition.setParameters(newParams);
+            client.client.alter_partition(dbName, tableName, 
modifiedPartition);
+        } catch (Exception e) {
+            throw new RuntimeException("failed to update table statistics for 
" + dbName + "." + tableName);
+        }
+    }
+
+    @Override
+    public void addPartitions(String dbName, String tableName, 
List<HivePartitionWithStatistics> partitions) {
+        try (ThriftHMSClient client = getClient()) {
+            List<Partition> hivePartitions = partitions.stream()
+                    .map(ThriftHMSCachedClient::toMetastoreApiPartition)
+                    .collect(Collectors.toList());
+            client.client.add_partitions(hivePartitions);
+        } catch (Exception e) {
+            throw new RuntimeException("failed to add partitions for " + 
dbName + "." + tableName, e);
+        }
+    }
+
+    @Override
+    public void dropPartition(String dbName, String tableName, List<String> 
partitionValues, boolean deleteData) {
+        try (ThriftHMSClient client = getClient()) {
+            client.client.dropPartition(dbName, tableName, partitionValues, 
deleteData);
+        } catch (Exception e) {
+            throw new RuntimeException("failed to drop partition for " + 
dbName + "." + tableName);
+        }
+    }
+
+    private static HivePartitionStatistics 
toHivePartitionStatistics(Map<String, String> params) {
+        long rowCount = 
Long.parseLong(params.getOrDefault(StatsSetupConst.ROW_COUNT, "-1"));
+        long totalSize = 
Long.parseLong(params.getOrDefault(StatsSetupConst.TOTAL_SIZE, "-1"));
+        long numFiles = 
Long.parseLong(params.getOrDefault(StatsSetupConst.NUM_FILES, "-1"));
+        return HivePartitionStatistics.fromCommonStatistics(rowCount, 
numFiles, totalSize);
+    }
+
+    private static Map<String, String> updateStatisticsParameters(
+            Map<String, String> parameters,
+            HiveCommonStatistics statistics) {
+        HashMap<String, String> result = new HashMap<>(parameters);
+
+        result.put(StatsSetupConst.NUM_FILES, 
String.valueOf(statistics.getFileCount()));
+        result.put(StatsSetupConst.ROW_COUNT, 
String.valueOf(statistics.getRowCount()));
+        result.put(StatsSetupConst.TOTAL_SIZE, 
String.valueOf(statistics.getTotalFileBytes()));
+
+        // CDH 5.16 metastore ignores stats unless 
STATS_GENERATED_VIA_STATS_TASK is set
+        // 
https://github.com/cloudera/hive/blob/cdh5.16.2-release/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L227-L231
+        if (!parameters.containsKey("STATS_GENERATED_VIA_STATS_TASK")) {
+            result.put("STATS_GENERATED_VIA_STATS_TASK", "workaround for 
potential lack of HIVE-12730");
+        }
+
+        return result;
+    }
+
+    public static Partition 
toMetastoreApiPartition(HivePartitionWithStatistics partitionWithStatistics) {
+        Partition partition =
+                
toMetastoreApiPartition(partitionWithStatistics.getPartition());
+        partition.setParameters(updateStatisticsParameters(
+                partition.getParameters(), 
partitionWithStatistics.getStatistics().getCommonStatistics()));
+        return partition;
+    }
+
+    public static Partition toMetastoreApiPartition(HivePartition 
hivePartition) {
+        Partition result = new Partition();
+        result.setDbName(hivePartition.getDbName());
+        result.setTableName(hivePartition.getTblName());
+        result.setValues(hivePartition.getPartitionValues());
+        result.setSd(makeStorageDescriptorFromHivePartition(hivePartition));
+        result.setParameters(hivePartition.getParameters());
+        return result;
+    }
+
+    private static StorageDescriptor 
makeStorageDescriptorFromHivePartition(HivePartition partition) {
+        SerDeInfo serdeInfo = new SerDeInfo();
+        serdeInfo.setName(partition.getTblName());
+        serdeInfo.setSerializationLib(partition.getSerde());
+
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setLocation(Strings.emptyToNull(partition.getPath()));
+        sd.setCols(partition.getColumns());
+        sd.setSerdeInfo(serdeInfo);
+        sd.setInputFormat(partition.getInputFormat());
+        sd.setOutputFormat(partition.getOutputFormat());
+        sd.setParameters(ImmutableMap.of());
+
+        return sd;
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
index 10e2b2b04ac..798f93a61c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
@@ -23,6 +23,9 @@ import org.apache.doris.fs.remote.RemoteFile;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * File system interface.
@@ -46,6 +49,24 @@ public interface FileSystem {
 
     Status rename(String origFilePath, String destFilePath);
 
+    default void asyncRename(Executor executor,
+                             List<CompletableFuture<?>> renameFileFutures,
+                             AtomicBoolean cancelled,
+                             String origFilePath,
+                             String destFilePath,
+                             List<String> fileNames) {
+        throw new UnsupportedOperationException("Unsupported operation async 
rename on current file system.");
+    }
+
+    default void asyncRenameDir(Executor executor,
+                        List<CompletableFuture<?>> renameFileFutures,
+                        AtomicBoolean cancelled,
+                        String origFilePath,
+                        String destFilePath,
+                        Runnable runWhenPathNotExist) {
+        throw new UnsupportedOperationException("Unsupported operation async 
rename dir on current file system.");
+    }
+
     Status delete(String remotePath);
 
     Status makeDir(String remotePath);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index e27e27ddbff..e8c645f3c9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -53,6 +53,9 @@ import java.nio.file.Paths;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class DFSFileSystem extends RemoteFileSystem {
 
@@ -192,7 +195,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             try {
                 currentStreamOffset = fsDataInputStream.getPos();
             } catch (IOException e) {
-                LOG.error("errors while get file pos from output stream", e);
+                LOG.warn("errors while get file pos from output stream", e);
                 throw new IOException("errors while get file pos from output 
stream", e);
             }
             if (currentStreamOffset != readOffset) {
@@ -230,7 +233,7 @@ public class DFSFileSystem extends RemoteFileSystem {
                 }
                 return ByteBuffer.wrap(buf, 0, readLength);
             } catch (IOException e) {
-                LOG.error("errors while read data from stream", e);
+                LOG.warn("errors while read data from stream", e);
                 throw new IOException("errors while read data from stream " + 
e.getMessage());
             }
         }
@@ -261,7 +264,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             }
             return Status.OK;
         } catch (Exception e) {
-            LOG.error("errors while check path exist " + remotePath, e);
+            LOG.warn("errors while check path exist " + remotePath, e);
             return new Status(Status.ErrCode.COMMON_ERROR,
                     "failed to check remote path exist: " + remotePath + ". 
msg: " + e.getMessage());
         }
@@ -281,7 +284,7 @@ public class DFSFileSystem extends RemoteFileSystem {
         try {
             fsDataOutputStream.writeBytes(content);
         } catch (IOException e) {
-            LOG.error("errors while write data to output stream", e);
+            LOG.warn("errors while write data to output stream", e);
             status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: 
" + e.getMessage());
         } finally {
             Status closeStatus = 
operations.closeWriter(OpParams.of(fsDataOutputStream));
@@ -324,7 +327,7 @@ public class DFSFileSystem extends RemoteFileSystem {
                 try {
                     fsDataOutputStream.write(readBuf, 0, bytesRead);
                 } catch (IOException e) {
-                    LOG.error("errors while write data to output stream", e);
+                    LOG.warn("errors while write data to output stream", e);
                     lastErrMsg = String.format(
                             "failed to write hdfs. current write offset: %d, 
write length: %d, "
                                     + "file length: %d, file: %s, msg: errors 
while write data to output stream",
@@ -377,7 +380,7 @@ public class DFSFileSystem extends RemoteFileSystem {
         } catch (UserException e) {
             return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
         } catch (IOException e) {
-            LOG.error("errors while rename path from " + srcPath + " to " + 
destPath);
+            LOG.warn("errors while rename path from " + srcPath + " to " + 
destPath);
             return new Status(Status.ErrCode.COMMON_ERROR,
                     "failed to rename remote " + srcPath + " to " + destPath + 
", msg: " + e.getMessage());
         }
@@ -385,6 +388,64 @@ public class DFSFileSystem extends RemoteFileSystem {
         return Status.OK;
     }
 
+    @Override
+    public void asyncRename(
+            Executor executor,
+            List<CompletableFuture<?>> renameFileFutures,
+            AtomicBoolean cancelled,
+            String origFilePath,
+            String destFilePath,
+            List<String> fileNames) {
+
+        for (String fileName : fileNames) {
+            Path source = new Path(origFilePath, fileName);
+            Path target = new Path(destFilePath, fileName);
+            renameFileFutures.add(CompletableFuture.runAsync(() -> {
+                if (cancelled.get()) {
+                    return;
+                }
+                Status status = rename(source.toString(), target.toString());
+                if (!status.ok()) {
+                    throw new RuntimeException(status.getErrMsg());
+                }
+            }, executor));
+        }
+    }
+
+    @Override
+    public void asyncRenameDir(Executor executor,
+                        List<CompletableFuture<?>> renameFileFutures,
+                        AtomicBoolean cancelled,
+                        String origFilePath,
+                        String destFilePath,
+                        Runnable runWhenPathNotExist) {
+        renameFileFutures.add(CompletableFuture.runAsync(() -> {
+            if (cancelled.get()) {
+                return;
+            }
+
+            Status status = exists(destFilePath);
+            if (status.ok()) {
+                throw new RuntimeException("Destination directory already 
exists: " + destFilePath);
+            }
+
+            String targetParent = new 
Path(destFilePath).getParent().toString();
+            status = exists(targetParent);
+            if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
+                makeDir(targetParent);
+            } else if (!status.ok()) {
+                throw new RuntimeException(status.getErrMsg());
+            }
+
+            runWhenPathNotExist.run();
+
+            status = rename(origFilePath, destFilePath);
+            if (!status.ok()) {
+                throw new RuntimeException(status.getErrMsg());
+            }
+        }, executor));
+    }
+
     @Override
     public Status delete(String remotePath) {
         try {
@@ -395,7 +456,7 @@ public class DFSFileSystem extends RemoteFileSystem {
         } catch (UserException e) {
             return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
         } catch (IOException e) {
-            LOG.error("errors while delete path " + remotePath);
+            LOG.warn("errors while delete path " + remotePath);
             return new Status(Status.ErrCode.COMMON_ERROR,
                     "failed to delete remote path: " + remotePath + ", msg: " 
+ e.getMessage());
         }
@@ -433,7 +494,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             LOG.info("file not found: " + e.getMessage());
             return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + 
e.getMessage());
         } catch (Exception e) {
-            LOG.error("errors while get file status ", e);
+            LOG.warn("errors while get file status ", e);
             return new Status(Status.ErrCode.COMMON_ERROR, "errors while get 
file status " + e.getMessage());
         }
         LOG.info("finish list path {}", remotePath);
@@ -442,6 +503,16 @@ public class DFSFileSystem extends RemoteFileSystem {
 
     @Override
     public Status makeDir(String remotePath) {
-        return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not 
implemented.");
+        try {
+            FileSystem fileSystem = nativeFileSystem(remotePath);
+            if (!fileSystem.mkdirs(new Path(remotePath))) {
+                LOG.warn("failed to make dir for " + remotePath);
+                return new Status(Status.ErrCode.COMMON_ERROR, "failed to make 
dir for " + remotePath);
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to make dir for " + remotePath);
+            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
+        }
+        return Status.OK;
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
new file mode 100644
index 00000000000..5f1abf12e63
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.thrift.THiveLocationParams;
+import org.apache.doris.thrift.THivePartitionUpdate;
+import org.apache.doris.thrift.TUpdateMode;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@Ignore
+public class HmsCommitTest {
+
+    private static HMSExternalCatalog hmsCatalog;
+    private static HiveMetadataOps hmsOps;
+    private static HMSCachedClient hmsClient;
+    private static final String dbName = "test_db";
+    private static final String tbWithPartition = "test_tb_with_partition";
+    private static final String tbWithoutPartition = 
"test_tb_without_partition";
+    private static Path warehousePath;
+    static String dbLocation;
+    private String inputFormat = 
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+    private String outputFormat = 
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
+    private String serde = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable {
+        warehousePath = Files.createTempDirectory("test_warehouse_");
+        dbLocation = "file://" + warehousePath.toAbsolutePath() + "/";
+        createTestHiveCatalog();
+        createTestHiveDatabase();
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        hmsClient.dropTable(dbName, tbWithPartition);
+        hmsClient.dropTable(dbName, tbWithoutPartition);
+        hmsClient.dropDatabase(dbName);
+    }
+
+    public static void createTestHiveCatalog() {
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "hms");
+        props.put("hive.metastore.uris", "thrift://127.0.0.1:9083");
+        props.put("hadoop.username", "hadoop");
+        hmsCatalog = new HMSExternalCatalog(1, "hive_catalog", null, props, 
"comment");
+        hmsCatalog.setInitialized();
+        hmsCatalog.initLocalObjectsImpl();
+        hmsOps = (HiveMetadataOps) hmsCatalog.getMetadataOps();
+        hmsClient = hmsOps.getClient();
+    }
+
+    public static void createTestHiveDatabase() {
+        // create database
+        HiveDatabaseMetadata dbMetadata = new HiveDatabaseMetadata();
+        dbMetadata.setDbName(dbName);
+        dbMetadata.setLocationUri(dbLocation);
+        hmsClient.createDatabase(dbMetadata);
+    }
+
+    @Before
+    public void before() {
+        // create table
+        List<Column> columns = new ArrayList<>();
+        columns.add(new Column("c1", PrimitiveType.INT, true));
+        columns.add(new Column("c2", PrimitiveType.STRING, true));
+        List<FieldSchema> partitionKeys = new ArrayList<>();
+        partitionKeys.add(new FieldSchema("c3", "string", "comment"));
+        HiveTableMetadata tableMetadata = new HiveTableMetadata(
+                dbName, tbWithPartition, columns, partitionKeys,
+                new HashMap<>(), inputFormat, outputFormat, serde);
+        hmsClient.createTable(tableMetadata, true);
+        HiveTableMetadata tableMetadata2 = new HiveTableMetadata(
+                dbName, tbWithoutPartition, columns, new ArrayList<>(),
+                new HashMap<>(), inputFormat, outputFormat, serde);
+        hmsClient.createTable(tableMetadata2, true);
+    }
+
+    @After
+    public void after() {
+        hmsClient.dropTable(dbName, tbWithoutPartition);
+        hmsClient.dropTable(dbName, tbWithPartition);
+    }
+
+    @Test
+    public void testNewPartitionForUnPartitionedTable() {
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomNew("a"));
+        try {
+            hmsOps.commit(dbName, tbWithoutPartition, pus);
+        } catch (Exception e) {
+            Assert.assertEquals("Not support mode:[NEW] in unPartitioned 
table", e.getMessage());
+        }
+    }
+
+    @Test
+    public void testAppendPartitionForUnPartitionedTable() {
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomAppend(""));
+        pus.add(createRandomAppend(""));
+        pus.add(createRandomAppend(""));
+        hmsOps.commit(dbName, tbWithoutPartition, pus);
+        Table table = hmsClient.getTable(dbName, tbWithoutPartition);
+        Assert.assertEquals(3, 
Long.parseLong(table.getParameters().get("numRows")));
+
+        List<THivePartitionUpdate> pus2 = new ArrayList<>();
+        pus2.add(createRandomAppend(""));
+        pus2.add(createRandomAppend(""));
+        pus2.add(createRandomAppend(""));
+        hmsOps.commit(dbName, tbWithoutPartition, pus2);
+        table = hmsClient.getTable(dbName, tbWithoutPartition);
+        Assert.assertEquals(6, 
Long.parseLong(table.getParameters().get("numRows")));
+    }
+
+    @Test
+    public void testOverwritePartitionForUnPartitionedTable() {
+        // TODO
+    }
+
+    @Test
+    public void testNewPartitionForPartitionedTable() {
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomNew("a"));
+        pus.add(createRandomNew("a"));
+        pus.add(createRandomNew("a"));
+        pus.add(createRandomNew("b"));
+        pus.add(createRandomNew("b"));
+        pus.add(createRandomNew("c"));
+        hmsOps.commit(dbName, tbWithPartition, pus);
+
+        Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
+        Assert.assertEquals(3, 
Long.parseLong(pa.getParameters().get("numRows")));
+        Partition pb = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("b"));
+        Assert.assertEquals(2, 
Long.parseLong(pb.getParameters().get("numRows")));
+        Partition pc = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("c"));
+        Assert.assertEquals(1, 
Long.parseLong(pc.getParameters().get("numRows")));
+    }
+
+    @Test
+    public void testAppendPartitionForPartitionedTable() {
+        testNewPartitionForPartitionedTable();
+
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomAppend("a"));
+        pus.add(createRandomAppend("a"));
+        pus.add(createRandomAppend("a"));
+        pus.add(createRandomAppend("b"));
+        pus.add(createRandomAppend("b"));
+        pus.add(createRandomAppend("c"));
+        hmsOps.commit(dbName, tbWithPartition, pus);
+
+        Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
+        Assert.assertEquals(6, 
Long.parseLong(pa.getParameters().get("numRows")));
+        Partition pb = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("b"));
+        Assert.assertEquals(4, 
Long.parseLong(pb.getParameters().get("numRows")));
+        Partition pc = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("c"));
+        Assert.assertEquals(2, 
Long.parseLong(pc.getParameters().get("numRows")));
+    }
+
+    @Test
+    public void testNewManyPartitionForPartitionedTable() {
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        int nums = 150;
+        for (int i = 0; i < nums; i++) {
+            pus.add(createRandomNew("" + i));
+        }
+
+        hmsOps.commit(dbName, tbWithPartition, pus);
+        for (int i = 0; i < nums; i++) {
+            Partition p = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("" + i));
+            Assert.assertEquals(1, 
Long.parseLong(p.getParameters().get("numRows")));
+        }
+
+        try {
+            hmsOps.commit(dbName, tbWithPartition, pus);
+        } catch (Exception e) {
+            Assert.assertTrue(e.getMessage().contains("failed to add 
partitions"));
+        }
+    }
+
+    public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, 
TUpdateMode mode) {
+
+        String uuid = UUID.randomUUID().toString();
+        THiveLocationParams location = new THiveLocationParams();
+        String targetPath = dbLocation + uuid;
+        location.setTargetPath(targetPath);
+        location.setWritePath(targetPath);
+
+        THivePartitionUpdate pu = new THivePartitionUpdate();
+        pu.setName(partitionValue);
+        pu.setUpdateMode(mode);
+        pu.setRowCount(1);
+        pu.setFileSize(1);
+        pu.setLocation(location);
+        pu.setFileNames(new ArrayList<String>() {
+            {
+                add(targetPath + "/f1");
+                add(targetPath + "/f2");
+                add(targetPath + "/f3");
+            }
+        });
+        return pu;
+    }
+
+    public THivePartitionUpdate createRandomNew(String partition) {
+        return genOnePartitionUpdate("c3=" + partition, TUpdateMode.NEW);
+    }
+
+    public THivePartitionUpdate createRandomAppend(String partition) {
+        return genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND);
+    }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index d96d4697880..d21c05c7ccb 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -349,6 +349,7 @@ under the License.
         
<arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier>
         <flatbuffers.version>1.12.0</flatbuffers.version>
         <jacoco.version>0.8.10</jacoco.version>
+        <airlift.version>202</airlift.version>
     </properties>
     <profiles>
         <profile>
@@ -1618,6 +1619,11 @@ under the License.
                 <version>${immutables.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>io.airlift</groupId>
+                <artifactId>concurrent</artifactId>
+                <version>${airlift.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <dependencies>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to