This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e11db3f050ca8f633dd359c9a810365dcbb238d4
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Thu Apr 4 08:10:49 2024 +0800

    [feature](hive)support ExternalTransaction for writing exteral table 
(#32726)
    
    
    Issue #31442
    
    Add `TransactionManager` and `Transaction`.
    
    ```
    public interface Transaction {
        void commit() throws UserException;
        void rollback();
    }
    public interface TransactionManager {
        long begin();
        void commit(long id) throws UserException;
        void rollback(long id);
        Transaction getTransaction(long id);
    }
    ```
    `TransactionManager` is used to manage all external transactions:
    The application layer should manage the entire transaction through this 
`TransactionManager`, like:
    ```
    transactionManager.commit();
    transactionManager.rollback();
    ```
    
    `Transaction` is an interface. You can implement this interface according 
to the specific content, such as `HMSTransaction` currently implemented, 
iceberg that may be implemented in the future, etc.
---
 .../apache/doris/datasource/ExternalCatalog.java   |    2 +
 .../apache/doris/datasource/hive/HMSCommitter.java |  754 -----------
 .../doris/datasource/hive/HMSExternalCatalog.java  |    6 +-
 .../doris/datasource/hive/HMSTransaction.java      | 1322 ++++++++++++++++++++
 .../doris/datasource/hive/HiveMetadataOps.java     |   24 +-
 .../datasource/hive/HivePartitionStatistics.java   |    2 +-
 .../hive/HivePartitionWithStatistics.java          |    6 +-
 .../plans/commands/insert/HiveInsertExecutor.java  |   34 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |   30 +-
 .../doris/transaction/HiveTransactionManager.java  |   79 ++
 .../Transaction.java}                              |   25 +-
 .../TransactionManager.java}                       |   40 +-
 .../TransactionManagerFactory.java}                |   25 +-
 .../doris/datasource/hive/HmsCommitTest.java       |   72 +-
 14 files changed, 1535 insertions(+), 886 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 737705bd8b5..a3525321edf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -50,6 +50,7 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.MasterCatalogExecutor;
+import org.apache.doris.transaction.TransactionManager;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -111,6 +112,7 @@ public abstract class ExternalCatalog
     private boolean objectCreated = false;
     protected boolean invalidCacheInInit = true;
     protected ExternalMetadataOps metadataOps;
+    protected TransactionManager transactionManager;
 
     private ExternalSchemaCache schemaCache;
     private String comment;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java
deleted file mode 100644
index af26f36d6b9..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java
+++ /dev/null
@@ -1,754 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java
-// and modified by Doris
-
-package org.apache.doris.datasource.hive;
-
-import org.apache.doris.backup.Status;
-import org.apache.doris.common.Pair;
-import org.apache.doris.fs.remote.RemoteFile;
-import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.thrift.THivePartitionUpdate;
-import org.apache.doris.thrift.TUpdateMode;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import io.airlift.concurrent.MoreFutures;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.StringJoiner;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-public class HMSCommitter {
-    private static final Logger LOG = LogManager.getLogger(HMSCommitter.class);
-    private final HiveMetadataOps hiveOps;
-    private final RemoteFileSystem fs;
-    private final Table table;
-
-    // update statistics for unPartitioned table or existed partition
-    private final List<UpdateStatisticsTask> updateStatisticsTasks = new 
ArrayList<>();
-    Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16);
-
-    // add new partition
-    private final AddPartitionsTask addPartitionsTask = new 
AddPartitionsTask();
-    private static final int PARTITION_COMMIT_BATCH_SIZE = 20;
-
-    // for file system rename operation
-    // whether to cancel the file system tasks
-    private final AtomicBoolean fileSystemTaskCancelled = new 
AtomicBoolean(false);
-    // file system tasks that are executed asynchronously, including 
rename_file, rename_dir
-    private final List<CompletableFuture<?>> asyncFileSystemTaskFutures = new 
ArrayList<>();
-    // when aborted, we need to delete all files under this path, even the 
current directory
-    private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = 
new ConcurrentLinkedQueue<>();
-    // when aborted, we need restore directory
-    private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new 
ArrayList<>();
-    // when finished, we need clear some directories
-    private final List<String> clearDirsForFinish = new ArrayList<>();
-    Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
-
-    public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table 
table) {
-        this.hiveOps = hiveOps;
-        this.fs = fs;
-        this.table = table;
-    }
-
-    public void commit(List<THivePartitionUpdate> hivePUs) {
-        try {
-            prepare(mergePartitions(hivePUs));
-            doCommit();
-        } catch (Throwable t) {
-            LOG.warn("Failed to commit for {}.{}, abort it.", 
table.getDbName(), table.getTableName());
-            try {
-                cancelUnStartedAsyncFileSystemTask();
-                undoUpdateStatisticsTasks();
-                undoAddPartitionsTask();
-                waitForAsyncFileSystemTaskSuppressThrowable();
-                runDirectoryClearUpTasksForAbort();
-                runRenameDirTasksForAbort();
-            } catch (Throwable e) {
-                t.addSuppressed(new Exception("Failed to roll back after 
commit failure", e));
-            }
-            throw t;
-        } finally {
-            runClearPathsForFinish();
-        }
-    }
-
-    public void prepare(List<THivePartitionUpdate> hivePUs) {
-
-        List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
insertExistsPartitions = new ArrayList<>();
-
-        for (THivePartitionUpdate pu : hivePUs) {
-            TUpdateMode updateMode = pu.getUpdateMode();
-            HivePartitionStatistics hivePartitionStatistics = 
HivePartitionStatistics.fromCommonStatistics(
-                    pu.getRowCount(),
-                    pu.getFileNamesSize(),
-                    pu.getFileSize());
-            if (table.getPartitionKeysSize() == 0) {
-                Preconditions.checkArgument(hivePUs.size() == 1,
-                        "When updating a non-partitioned table, multiple 
partitions should not be written");
-                switch (updateMode) {
-                    case APPEND:
-                        prepareAppendTable(pu, hivePartitionStatistics);
-                        break;
-                    case OVERWRITE:
-                        prepareOverwriteTable(pu, hivePartitionStatistics);
-                        break;
-                    default:
-                        throw new RuntimeException("Not support mode:[" + 
updateMode + "] in unPartitioned table");
-                }
-            } else {
-                switch (updateMode) {
-                    case NEW:
-                        prepareCreateNewPartition(pu, hivePartitionStatistics);
-                        break;
-                    case APPEND:
-                        insertExistsPartitions.add(Pair.of(pu, 
hivePartitionStatistics));
-                        break;
-                    case OVERWRITE:
-                        prepareOverwritePartition(pu, hivePartitionStatistics);
-                        break;
-                    default:
-                        throw new RuntimeException("Not support mode:[" + 
updateMode + "] in unPartitioned table");
-                }
-            }
-        }
-
-        if (!insertExistsPartitions.isEmpty()) {
-            prepareInsertExistPartition(insertExistsPartitions);
-        }
-    }
-
-    public List<THivePartitionUpdate> 
mergePartitions(List<THivePartitionUpdate> hivePUs) {
-        Map<String, THivePartitionUpdate> mm = new HashMap<>();
-        for (THivePartitionUpdate pu : hivePUs) {
-            if (mm.containsKey(pu.getName())) {
-                THivePartitionUpdate old = mm.get(pu.getName());
-                old.setFileSize(old.getFileSize() + pu.getFileSize());
-                old.setRowCount(old.getRowCount() + pu.getRowCount());
-                old.getFileNames().addAll(pu.getFileNames());
-            } else {
-                mm.put(pu.getName(), pu);
-            }
-        }
-        return new ArrayList<>(mm.values());
-    }
-
-    public void doCommit() {
-        waitForAsyncFileSystemTasks();
-        doAddPartitionsTask();
-        doUpdateStatisticsTasks();
-    }
-
-    public void rollback() {
-
-    }
-
-    public void cancelUnStartedAsyncFileSystemTask() {
-        fileSystemTaskCancelled.set(true);
-    }
-
-    private void undoUpdateStatisticsTasks() {
-        ImmutableList.Builder<CompletableFuture<?>> undoUpdateFutures = 
ImmutableList.builder();
-        for (UpdateStatisticsTask task : updateStatisticsTasks) {
-            undoUpdateFutures.add(CompletableFuture.runAsync(() -> {
-                try {
-                    task.undo(hiveOps);
-                } catch (Throwable throwable) {
-                    LOG.warn("Failed to rollback: {}", task.getDescription(), 
throwable);
-                }
-            }, updateStatisticsExecutor));
-        }
-
-        for (CompletableFuture<?> undoUpdateFuture : 
undoUpdateFutures.build()) {
-            MoreFutures.getFutureValue(undoUpdateFuture);
-        }
-    }
-
-    private void undoAddPartitionsTask() {
-        if (addPartitionsTask.isEmpty()) {
-            return;
-        }
-
-        HivePartition firstPartition = 
addPartitionsTask.getPartitions().get(0).getPartition();
-        String dbName = firstPartition.getDbName();
-        String tableName = firstPartition.getTblName();
-        List<List<String>> rollbackFailedPartitions = 
addPartitionsTask.rollback(hiveOps);
-        if (!rollbackFailedPartitions.isEmpty()) {
-            LOG.warn("Failed to rollback: add_partition for partition values 
{}.{}.{}",
-                    dbName, tableName, rollbackFailedPartitions);
-        }
-    }
-
-    private void waitForAsyncFileSystemTaskSuppressThrowable() {
-        for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
-            try {
-                future.get();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            } catch (Throwable t) {
-                // ignore
-            }
-        }
-    }
-
-    public void prepareAppendTable(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
-        String targetPath = pu.getLocation().getTargetPath();
-        String writePath = pu.getLocation().getWritePath();
-        if (!targetPath.equals(writePath)) {
-            fs.asyncRename(
-                    fileSystemExecutor,
-                    asyncFileSystemTaskFutures,
-                    fileSystemTaskCancelled,
-                    writePath,
-                    targetPath,
-                    pu.getFileNames());
-        }
-        directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, 
false));
-        updateStatisticsTasks.add(
-            new UpdateStatisticsTask(
-                table.getDbName(),
-                table.getTableName(),
-                Optional.empty(),
-                ps,
-                true
-            ));
-    }
-
-    public void prepareOverwriteTable(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
-        String targetPath = pu.getLocation().getTargetPath();
-        String writePath = pu.getLocation().getWritePath();
-        if (!targetPath.equals(writePath)) {
-            Path path = new Path(targetPath);
-            String oldTablePath = new Path(path.getParent(), "_temp_" + 
path.getName()).toString();
-            Status status = fs.renameDir(
-                    targetPath,
-                    oldTablePath,
-                    () -> renameDirectoryTasksForAbort.add(new 
RenameDirectoryTask(oldTablePath, targetPath)));
-            if (!status.ok()) {
-                throw new RuntimeException(
-                    "Error to rename dir from " + targetPath + " to " + 
oldTablePath + status.getErrMsg());
-            }
-            clearDirsForFinish.add(oldTablePath);
-
-            status =  fs.renameDir(
-                writePath,
-                targetPath,
-                () -> directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, true)));
-            if (!status.ok()) {
-                throw new RuntimeException(
-                    "Error to rename dir from " + writePath + " to " + 
targetPath + ":" + status.getErrMsg());
-            }
-        }
-        updateStatisticsTasks.add(
-            new UpdateStatisticsTask(
-                table.getDbName(),
-                table.getTableName(),
-                Optional.empty(),
-                ps,
-                false
-            ));
-    }
-
-    public void prepareCreateNewPartition(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
-
-        String targetPath = pu.getLocation().getTargetPath();
-        String writePath = pu.getLocation().getWritePath();
-
-        if (!targetPath.equals(writePath)) {
-            fs.asyncRenameDir(
-                    fileSystemExecutor,
-                    asyncFileSystemTaskFutures,
-                    fileSystemTaskCancelled,
-                    writePath,
-                    targetPath,
-                    () -> directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, true)));
-        }
-
-        StorageDescriptor sd = table.getSd();
-
-        HivePartition hivePartition = new HivePartition(
-                table.getDbName(),
-                table.getTableName(),
-                false,
-                sd.getInputFormat(),
-                pu.getLocation().getTargetPath(),
-                HiveUtil.toPartitionValues(pu.getName()),
-                Maps.newHashMap(),
-                sd.getOutputFormat(),
-                sd.getSerdeInfo().getSerializationLib(),
-                hiveOps.getClient().getSchema(table.getDbName(), 
table.getTableName())
-        );
-        HivePartitionWithStatistics partitionWithStats =
-                new HivePartitionWithStatistics(pu.getName(), hivePartition, 
ps);
-        addPartitionsTask.addPartition(partitionWithStats);
-    }
-
-    public void prepareInsertExistPartition(List<Pair<THivePartitionUpdate, 
HivePartitionStatistics>> partitions) {
-        for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
partitionBatch :
-                    Iterables.partition(partitions, 100)) {
-            List<String> partitionNames = partitionBatch.stream()
-                    .map(pair -> pair.first.getName())
-                    .collect(Collectors.toList());
-
-            Map<String, Partition> partitionsByNamesMap = 
HiveUtil.convertToNamePartitionMap(
-                    partitionNames,
-                    hiveOps.getClient().getPartitions(table.getDbName(), 
table.getTableName(), partitionNames));
-
-            for (int i = 0; i < partitionsByNamesMap.size(); i++) {
-                String partitionName = partitionNames.get(i);
-                if (partitionsByNamesMap.get(partitionName) == null) {
-                    // Prevent this partition from being deleted by other 
engines
-                    throw new RuntimeException("Not found partition: " + 
partitionName);
-                }
-
-                THivePartitionUpdate pu = partitionBatch.get(i).first;
-                HivePartitionStatistics updateStats = 
partitionBatch.get(i).second;
-
-                String writePath = pu.getLocation().getWritePath();
-                String targetPath = pu.getLocation().getTargetPath();
-                directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, false));
-
-                if (!targetPath.equals(writePath)) {
-                    fs.asyncRename(
-                            fileSystemExecutor,
-                            asyncFileSystemTaskFutures,
-                            fileSystemTaskCancelled,
-                            writePath,
-                            targetPath,
-                            pu.getFileNames());
-                }
-
-                updateStatisticsTasks.add(
-                    new UpdateStatisticsTask(
-                            table.getDbName(),
-                            table.getTableName(),
-                            Optional.of(pu.getName()),
-                            updateStats,
-                            true));
-            }
-        }
-    }
-
-
-    public void prepareOverwritePartition(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
-        String targetPath = pu.getLocation().getTargetPath();
-        String writePath = pu.getLocation().getWritePath();
-        if (!targetPath.equals(writePath)) {
-            Path path = new Path(targetPath);
-            String oldPartitionPath = new Path(path.getParent(), "_temp_" + 
path.getName()).toString();
-            Status status = fs.renameDir(
-                    targetPath,
-                    oldPartitionPath,
-                    () -> renameDirectoryTasksForAbort.add(new 
RenameDirectoryTask(oldPartitionPath, targetPath)));
-            if (!status.ok()) {
-                throw new RuntimeException(
-                    "Error to rename dir from " + targetPath + " to " + 
oldPartitionPath + ":" + status.getErrMsg());
-            }
-            clearDirsForFinish.add(oldPartitionPath);
-
-            status = fs.renameDir(
-                    writePath,
-                    targetPath,
-                    () -> directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, true)));
-            if (!status.ok()) {
-                throw new RuntimeException(
-                    "Error to rename dir from " + writePath + " to " + 
targetPath + ":" + status.getErrMsg());
-            }
-        }
-        updateStatisticsTasks.add(
-            new UpdateStatisticsTask(
-                table.getDbName(),
-                table.getTableName(),
-                Optional.of(pu.getName()),
-                ps,
-                false
-            ));
-    }
-
-
-    private void waitForAsyncFileSystemTasks() {
-        for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
-            MoreFutures.getFutureValue(future, RuntimeException.class);
-        }
-    }
-
-    private void doAddPartitionsTask() {
-        if (!addPartitionsTask.isEmpty()) {
-            addPartitionsTask.run(hiveOps);
-        }
-    }
-
-    private void doUpdateStatisticsTasks() {
-        ImmutableList.Builder<CompletableFuture<?>> updateStatsFutures = 
ImmutableList.builder();
-        List<String> failedTaskDescriptions = new ArrayList<>();
-        List<Throwable> suppressedExceptions = new ArrayList<>();
-        for (UpdateStatisticsTask task : updateStatisticsTasks) {
-            updateStatsFutures.add(CompletableFuture.runAsync(() -> {
-                try {
-                    task.run(hiveOps);
-                } catch (Throwable t) {
-                    synchronized (suppressedExceptions) {
-                        addSuppressedExceptions(suppressedExceptions, t, 
failedTaskDescriptions, task.getDescription());
-                    }
-                }
-            }, updateStatisticsExecutor));
-        }
-
-        for (CompletableFuture<?> executeUpdateFuture : 
updateStatsFutures.build()) {
-            MoreFutures.getFutureValue(executeUpdateFuture);
-        }
-        if (!suppressedExceptions.isEmpty()) {
-            StringBuilder message = new StringBuilder();
-            message.append("Failed to execute some updating statistics tasks: 
");
-            Joiner.on("; ").appendTo(message, failedTaskDescriptions);
-            RuntimeException exception = new 
RuntimeException(message.toString());
-            suppressedExceptions.forEach(exception::addSuppressed);
-            throw exception;
-        }
-    }
-
-    private static void addSuppressedExceptions(
-            List<Throwable> suppressedExceptions,
-            Throwable t,
-            List<String> descriptions,
-            String description) {
-        descriptions.add(description);
-        // A limit is needed to avoid having a huge exception object. 5 was 
chosen arbitrarily.
-        if (suppressedExceptions.size() < 5) {
-            suppressedExceptions.add(t);
-        }
-    }
-
-    private static class AddPartition {
-
-    }
-
-    private static class UpdateStatisticsTask {
-        private final String dbName;
-        private final String tableName;
-        private final Optional<String> partitionName;
-        private final HivePartitionStatistics updatePartitionStat;
-        private final boolean merge;
-
-        private boolean done;
-
-        public UpdateStatisticsTask(String dbName, String tableName, 
Optional<String> partitionName,
-                                    HivePartitionStatistics statistics, 
boolean merge) {
-            this.dbName = Objects.requireNonNull(dbName, "dbName is null");
-            this.tableName = Objects.requireNonNull(tableName, "tableName is 
null");
-            this.partitionName = Objects.requireNonNull(partitionName, 
"partitionName is null");
-            this.updatePartitionStat = Objects.requireNonNull(statistics, 
"statistics is null");
-            this.merge = merge;
-        }
-
-        public void run(HiveMetadataOps hiveOps) {
-            if (partitionName.isPresent()) {
-                hiveOps.updatePartitionStatistics(dbName, tableName, 
partitionName.get(), this::updateStatistics);
-            } else {
-                hiveOps.updateTableStatistics(dbName, tableName, 
this::updateStatistics);
-            }
-            done = true;
-        }
-
-        public void undo(HiveMetadataOps hmsOps) {
-            if (!done) {
-                return;
-            }
-            if (partitionName.isPresent()) {
-                hmsOps.updatePartitionStatistics(dbName, tableName, 
partitionName.get(), this::resetStatistics);
-            } else {
-                hmsOps.updateTableStatistics(dbName, tableName, 
this::resetStatistics);
-            }
-        }
-
-        public String getDescription() {
-            if (partitionName.isPresent()) {
-                return "alter partition parameters " + tableName + " " + 
partitionName.get();
-            } else {
-                return "alter table parameters " +  tableName;
-            }
-        }
-
-        private HivePartitionStatistics 
updateStatistics(HivePartitionStatistics currentStats) {
-            return merge ? HivePartitionStatistics.merge(currentStats, 
updatePartitionStat) : updatePartitionStat;
-        }
-
-        private HivePartitionStatistics 
resetStatistics(HivePartitionStatistics currentStatistics) {
-            return HivePartitionStatistics
-                    .reduce(currentStatistics, updatePartitionStat, 
HivePartitionStatistics.ReduceOperator.SUBTRACT);
-        }
-    }
-
-    public static class AddPartitionsTask {
-        private final List<HivePartitionWithStatistics> partitions = new 
ArrayList<>();
-        private final List<List<String>> createdPartitionValues = new 
ArrayList<>();
-
-        public boolean isEmpty() {
-            return partitions.isEmpty();
-        }
-
-        public List<HivePartitionWithStatistics> getPartitions() {
-            return partitions;
-        }
-
-        public void addPartition(HivePartitionWithStatistics partition) {
-            partitions.add(partition);
-        }
-
-        public void run(HiveMetadataOps hiveOps) {
-            HivePartition firstPartition = partitions.get(0).getPartition();
-            String dbName = firstPartition.getDbName();
-            String tableName = firstPartition.getTblName();
-            List<List<HivePartitionWithStatistics>> batchedPartitions =
-                    Lists.partition(partitions, PARTITION_COMMIT_BATCH_SIZE);
-            for (List<HivePartitionWithStatistics> batch : batchedPartitions) {
-                try {
-                    hiveOps.addPartitions(dbName, tableName, batch);
-                    for (HivePartitionWithStatistics partition : batch) {
-                        
createdPartitionValues.add(partition.getPartition().getPartitionValues());
-                    }
-                } catch (Throwable t) {
-                    LOG.warn("Failed to add partition", t);
-                    throw t;
-                }
-            }
-            partitions.clear();
-        }
-
-        public List<List<String>> rollback(HiveMetadataOps hiveOps) {
-            HivePartition firstPartition = partitions.get(0).getPartition();
-            String dbName = firstPartition.getDbName();
-            String tableName = firstPartition.getTblName();
-            List<List<String>> rollbackFailedPartitions = new ArrayList<>();
-            for (List<String> createdPartitionValue : createdPartitionValues) {
-                try {
-                    hiveOps.dropPartition(dbName, tableName, 
createdPartitionValue, false);
-                } catch (Throwable t) {
-                    LOG.warn("Failed to drop partition on {}.{}.{} when 
rollback",
-                            dbName, tableName, rollbackFailedPartitions);
-                    rollbackFailedPartitions.add(createdPartitionValue);
-                }
-            }
-            return rollbackFailedPartitions;
-        }
-    }
-
-    private static class DirectoryCleanUpTask {
-        private final Path path;
-        private final boolean deleteEmptyDir;
-
-        public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) {
-            this.path = new Path(path);
-            this.deleteEmptyDir = deleteEmptyDir;
-        }
-
-        public Path getPath() {
-            return path;
-        }
-
-        public boolean isDeleteEmptyDir() {
-            return deleteEmptyDir;
-        }
-
-        @Override
-        public String toString() {
-            return new StringJoiner(", ", 
DirectoryCleanUpTask.class.getSimpleName() + "[", "]")
-                .add("path=" + path)
-                .add("deleteEmptyDir=" + deleteEmptyDir)
-                .toString();
-        }
-    }
-
-    public static class DeleteRecursivelyResult {
-        private final boolean dirNoLongerExists;
-        private final List<String> notDeletedEligibleItems;
-
-        public DeleteRecursivelyResult(boolean dirNoLongerExists, List<String> 
notDeletedEligibleItems) {
-            this.dirNoLongerExists = dirNoLongerExists;
-            this.notDeletedEligibleItems = notDeletedEligibleItems;
-        }
-
-        public boolean dirNotExists() {
-            return dirNoLongerExists;
-        }
-
-        public List<String> getNotDeletedEligibleItems() {
-            return notDeletedEligibleItems;
-        }
-    }
-
-    private void runDirectoryClearUpTasksForAbort() {
-        for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) 
{
-            recursiveDeleteItems(cleanUpTask.getPath(), 
cleanUpTask.isDeleteEmptyDir());
-        }
-    }
-
-    private static class RenameDirectoryTask {
-        private final String renameFrom;
-        private final String renameTo;
-
-        public RenameDirectoryTask(String renameFrom, String renameTo) {
-            this.renameFrom = renameFrom;
-            this.renameTo = renameTo;
-        }
-
-        public String getRenameFrom() {
-            return renameFrom;
-        }
-
-        public String getRenameTo() {
-            return renameTo;
-        }
-
-        @Override
-        public String toString() {
-            return new StringJoiner(", ", 
RenameDirectoryTask.class.getSimpleName() + "[", "]")
-                .add("renameFrom:" + renameFrom)
-                .add("renameTo:" + renameTo)
-                .toString();
-        }
-    }
-
-    private void runRenameDirTasksForAbort() {
-        Status status;
-        for (RenameDirectoryTask task : renameDirectoryTasksForAbort) {
-            status = fs.exists(task.getRenameFrom());
-            if (status.ok()) {
-                status = fs.renameDir(task.getRenameFrom(), 
task.getRenameTo(), () -> {});
-                if (!status.ok()) {
-                    LOG.warn("Failed to abort rename dir from {} to {}:{}",
-                            task.getRenameFrom(), task.getRenameTo(), 
status.getErrMsg());
-                }
-            }
-        }
-    }
-
-    private void runClearPathsForFinish() {
-        Status status;
-        for (String path : clearDirsForFinish) {
-            status = fs.delete(path);
-            if (!status.ok()) {
-                LOG.warn("Failed to recursively delete path {}:{}", path, 
status.getErrCode());
-            }
-        }
-    }
-
-
-    private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) {
-        DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, 
deleteEmptyDir);
-
-        if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
-            LOG.warn("Failed to delete directory {}. Some eligible items can't 
be deleted: {}.",
-                    directory.toString(), 
deleteResult.getNotDeletedEligibleItems());
-        } else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
-            LOG.warn("Failed to delete directory {} due to dir isn't empty", 
directory.toString());
-        }
-    }
-
-    public DeleteRecursivelyResult recursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir) {
-        try {
-            if (!fs.exists(directory.getName()).ok()) {
-                return new DeleteRecursivelyResult(true, ImmutableList.of());
-            }
-        } catch (Exception e) {
-            ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
-            notDeletedEligibleItems.add(directory.toString() + "/*");
-            return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
-        }
-
-        return doRecursiveDeleteFiles(directory, deleteEmptyDir);
-    }
-
-    private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir) {
-        List<RemoteFile> remoteFiles = new ArrayList<>();
-
-        Status status = fs.list(directory.getName(), remoteFiles);
-        if (!status.ok()) {
-            ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
-            notDeletedEligibleItems.add(directory + "/*");
-            return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
-        }
-
-        boolean isEmptyDir = true;
-        List<String> notDeletedEligibleItems = new ArrayList<>();
-        for (RemoteFile file : remoteFiles) {
-            if (file.isFile()) {
-                Path filePath = file.getPath();
-                isEmptyDir = false;
-                // TODO Check if this file was created by this query
-                if (!deleteIfExists(filePath)) {
-                    notDeletedEligibleItems.add(filePath.toString());
-                }
-            } else if (file.isDirectory()) {
-                DeleteRecursivelyResult subResult = 
doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir);
-                if (!subResult.dirNotExists()) {
-                    isEmptyDir = false;
-                }
-                if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
-                    
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
-                }
-            } else {
-                isEmptyDir = false;
-                notDeletedEligibleItems.add(file.getPath().toString());
-            }
-        }
-
-        if (isEmptyDir && deleteEmptyDir) {
-            Verify.verify(notDeletedEligibleItems.isEmpty());
-            if (!deleteIfExists(directory)) {
-                return new DeleteRecursivelyResult(false, 
ImmutableList.of(directory + "/"));
-            }
-            // all items of the location have been deleted.
-            return new DeleteRecursivelyResult(true, ImmutableList.of());
-        }
-
-        return new DeleteRecursivelyResult(false, notDeletedEligibleItems);
-    }
-
-    public boolean deleteIfExists(Path path) {
-        Status status = fs.delete(path.getName());
-        if (status.ok()) {
-            return true;
-        }
-        return !fs.exists(path.getName()).ok();
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 0f2a7bb2acb..4474e546500 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -34,6 +34,7 @@ import 
org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.operations.ExternalMetadataOperations;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.transaction.TransactionManagerFactory;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -145,7 +146,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
                     AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
                     AuthenticationConfig.HADOOP_KERBEROS_KEYTAB));
         }
-        metadataOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, 
jdbcClientConfig, this);
+        HiveMetadataOps hiveOps = 
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
+        transactionManager = 
TransactionManagerFactory.createHiveTransactionManager(hiveOps);
+        transactionManager.setEditLog(Env.getCurrentEnv().getEditLog());
+        metadataOps = hiveOps;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
new file mode 100644
index 00000000000..c3e8d00c5d1
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -0,0 +1,1322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
+// and modified by Doris
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.Pair;
+import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.thrift.THivePartitionUpdate;
+import org.apache.doris.thrift.TUpdateMode;
+import org.apache.doris.transaction.Transaction;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.airlift.concurrent.MoreFutures;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.StringJoiner;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+public class HMSTransaction implements Transaction {
+    private static final Logger LOG = 
LogManager.getLogger(HMSTransaction.class);
+    private final HiveMetadataOps hiveOps;
+    private final RemoteFileSystem fs;
+    private String dbName;
+    private String tbName;
+
+    private final Map<DatabaseTableName, Action<TableAndMore>> tableActions = 
new HashMap<>();
+    private final Map<DatabaseTableName, Map<List<String>, 
Action<PartitionAndMore>>>
+            partitionActions = new HashMap<>();
+
+    private HmsCommitter hmsCommitter;
+    private List<THivePartitionUpdate> hivePartitionUpdates = 
Lists.newArrayList();
+
+    public HMSTransaction(HiveMetadataOps hiveOps) {
+        this.hiveOps = hiveOps;
+        this.fs = hiveOps.getFs();
+    }
+
+    @Override
+    public void commit() {
+        doCommit();
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getTbName() {
+        return tbName;
+    }
+
+    public List<THivePartitionUpdate> 
mergePartitions(List<THivePartitionUpdate> hivePUs) {
+        Map<String, THivePartitionUpdate> mm = new HashMap<>();
+        for (THivePartitionUpdate pu : hivePUs) {
+            if (mm.containsKey(pu.getName())) {
+                THivePartitionUpdate old = mm.get(pu.getName());
+                old.setFileSize(old.getFileSize() + pu.getFileSize());
+                old.setRowCount(old.getRowCount() + pu.getRowCount());
+                old.getFileNames().addAll(pu.getFileNames());
+            } else {
+                mm.put(pu.getName(), pu);
+            }
+        }
+        return new ArrayList<>(mm.values());
+    }
+
+    @Override
+    public void rollback() {
+        if (hmsCommitter != null) {
+            hmsCommitter.rollback();
+        }
+    }
+
+    public void finishInsertTable(String dbName, String tbName) {
+        this.tbName = tbName;
+        this.dbName = dbName;
+        List<THivePartitionUpdate> mergedPUs = 
mergePartitions(hivePartitionUpdates);
+        Table table = getTable(dbName, tbName);
+        List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
insertExistsPartitions = new ArrayList<>();
+        for (THivePartitionUpdate pu : mergedPUs) {
+            TUpdateMode updateMode = pu.getUpdateMode();
+            HivePartitionStatistics hivePartitionStatistics = 
HivePartitionStatistics.fromCommonStatistics(
+                    pu.getRowCount(),
+                    pu.getFileNamesSize(),
+                    pu.getFileSize());
+            String writePath = pu.getLocation().getWritePath();
+            if (table.getPartitionKeysSize() == 0) {
+                Preconditions.checkArgument(mergedPUs.size() == 1,
+                        "When updating a non-partitioned table, multiple 
partitions should not be written");
+                switch (updateMode) {
+                    case APPEND:
+                        finishChangingExistingTable(
+                                ActionType.INSERT_EXISTING,
+                                dbName,
+                                tbName,
+                                writePath,
+                                pu.getFileNames(),
+                                hivePartitionStatistics);
+                        break;
+                    case OVERWRITE:
+                        dropTable(dbName, tbName);
+                        createTable(table, writePath, pu.getFileNames(), 
hivePartitionStatistics);
+                        break;
+                    default:
+                        throw new RuntimeException("Not support mode:[" + 
updateMode + "] in unPartitioned table");
+                }
+            } else {
+                switch (updateMode) {
+                    case APPEND:
+                        // insert into existing partition
+                        insertExistsPartitions.add(Pair.of(pu, 
hivePartitionStatistics));
+                        break;
+                    case NEW:
+                    case OVERWRITE:
+                        StorageDescriptor sd = table.getSd();
+                        HivePartition hivePartition = new HivePartition(
+                                dbName,
+                                tbName,
+                                false,
+                                sd.getInputFormat(),
+                                pu.getLocation().getTargetPath(),
+                                HiveUtil.toPartitionValues(pu.getName()),
+                                Maps.newHashMap(),
+                                sd.getOutputFormat(),
+                                sd.getSerdeInfo().getSerializationLib(),
+                                hiveOps.getClient().getSchema(dbName, tbName)
+                        );
+                        if (updateMode == TUpdateMode.OVERWRITE) {
+                            dropPartition(dbName, tbName, 
hivePartition.getPartitionValues(), true);
+                        }
+                        addPartition(
+                                dbName, tbName, hivePartition, writePath,
+                                pu.getName(), pu.getFileNames(), 
hivePartitionStatistics);
+                        break;
+                    default:
+                        throw new RuntimeException("Not support mode:[" + 
updateMode + "] in partitioned table");
+                }
+            }
+        }
+
+        if (!insertExistsPartitions.isEmpty()) {
+            convertToInsertExistingPartitionAction(insertExistsPartitions);
+        }
+    }
+
+    public void doCommit() {
+        hmsCommitter = new HmsCommitter();
+
+        try {
+            for (Map.Entry<DatabaseTableName, Action<TableAndMore>> entry : 
tableActions.entrySet()) {
+                Action<TableAndMore> action = entry.getValue();
+                switch (action.getType()) {
+                    case INSERT_EXISTING:
+                        
hmsCommitter.prepareInsertExistingTable(action.getData());
+                        break;
+                    case ALTER:
+                        hmsCommitter.prepareAlterTable(action.getData());
+                        break;
+                    default:
+                        throw new UnsupportedOperationException("Unsupported 
table action type: " + action.getType());
+                }
+            }
+
+            for (Map.Entry<DatabaseTableName, Map<List<String>, 
Action<PartitionAndMore>>> tableEntry
+                    : partitionActions.entrySet()) {
+                for (Map.Entry<List<String>, Action<PartitionAndMore>> 
partitionEntry :
+                        tableEntry.getValue().entrySet()) {
+                    Action<PartitionAndMore> action = 
partitionEntry.getValue();
+                    switch (action.getType()) {
+                        case INSERT_EXISTING:
+                            
hmsCommitter.prepareInsertExistPartition(action.getData());
+                            break;
+                        case ADD:
+                            hmsCommitter.prepareAddPartition(action.getData());
+                            break;
+                        case ALTER:
+                            
hmsCommitter.prepareAlterPartition(action.getData());
+                            break;
+                        default:
+                            throw new UnsupportedOperationException(
+                                "Unsupported partition action type: " + 
action.getType());
+                    }
+                }
+            }
+
+            hmsCommitter.waitForAsyncFileSystemTasks();
+            hmsCommitter.doAddPartitionsTask();
+            hmsCommitter.doUpdateStatisticsTasks();
+        } catch (Throwable t) {
+            LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName);
+            hmsCommitter.cancelUnStartedAsyncFileSystemTask();
+            hmsCommitter.undoUpdateStatisticsTasks();
+            hmsCommitter.undoAddPartitionsTask();
+            hmsCommitter.waitForAsyncFileSystemTaskSuppressThrowable();
+            hmsCommitter.runDirectoryClearUpTasksForAbort();
+            hmsCommitter.runRenameDirTasksForAbort();
+            throw t;
+        } finally {
+            hmsCommitter.runClearPathsForFinish();
+        }
+    }
+
+    public void updateHivePartitionUpdates(List<THivePartitionUpdate> pus) {
+        synchronized (this) {
+            hivePartitionUpdates.addAll(pus);
+        }
+    }
+
+    // for test
+    public void setHivePartitionUpdates(List<THivePartitionUpdate> 
hivePartitionUpdates) {
+        this.hivePartitionUpdates = hivePartitionUpdates;
+    }
+
+    public long getUpdateCnt() {
+        return 
hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
+    }
+
+    private void convertToInsertExistingPartitionAction(
+            List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
partitions) {
+        DatabaseTableName databaseTableName = new DatabaseTableName(dbName, 
tbName);
+        Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
+                partitionActions.computeIfAbsent(databaseTableName, k -> new 
HashMap<>());
+
+        for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
partitionBatch :
+                Iterables.partition(partitions, 100)) {
+
+            List<String> partitionNames = partitionBatch
+                    .stream()
+                    .map(pair -> pair.first.getName())
+                    .collect(Collectors.toList());
+
+            // check in partitionAction
+            Action<PartitionAndMore> oldPartitionAction = 
partitionActionsForTable.get(partitionNames);
+            if (oldPartitionAction != null) {
+                switch (oldPartitionAction.getType()) {
+                    case DROP:
+                    case DROP_PRESERVE_DATA:
+                        throw new RuntimeException(
+                                "Not found partition from partition actions"
+                                        + "for " + databaseTableName + ", 
partitions: " + partitionNames);
+                    case ADD:
+                    case ALTER:
+                    case INSERT_EXISTING:
+                    case MERGE:
+                        throw new UnsupportedOperationException(
+                                "Inserting into a partition that were added, 
altered,"
+                                        + "or inserted into in the same 
transaction is not supported");
+                    default:
+                        throw new IllegalStateException("Unknown action type: 
" + oldPartitionAction.getType());
+                }
+            }
+
+            Map<String, Partition> partitionsByNamesMap = 
HiveUtil.convertToNamePartitionMap(
+                    partitionNames,
+                    hiveOps.getClient().getPartitions(dbName, tbName, 
partitionNames));
+
+            for (int i = 0; i < partitionsByNamesMap.size(); i++) {
+                String partitionName = partitionNames.get(i);
+                // check from hms
+                Partition partition = partitionsByNamesMap.get(partitionName);
+                if (partition == null) {
+                    // Prevent this partition from being deleted by other 
engines
+                    throw new RuntimeException(
+                            "Not found partition from hms for " + 
databaseTableName
+                                    + ", partitions: " + partitionNames);
+                }
+                THivePartitionUpdate pu = partitionBatch.get(i).first;
+                HivePartitionStatistics updateStats = 
partitionBatch.get(i).second;
+
+                StorageDescriptor sd = partition.getSd();
+                List<String> partitionValues = 
HiveUtil.toPartitionValues(pu.getName());
+
+                HivePartition hivePartition = new HivePartition(
+                        dbName,
+                        tbName,
+                        false,
+                        sd.getInputFormat(),
+                        partition.getSd().getLocation(),
+                        partitionValues,
+                        partition.getParameters(),
+                        sd.getOutputFormat(),
+                        sd.getSerdeInfo().getSerializationLib(),
+                        hiveOps.getClient().getSchema(dbName, tbName)
+                );
+
+                partitionActionsForTable.put(
+                        partitionValues,
+                        new Action<>(
+                                ActionType.INSERT_EXISTING,
+                                new PartitionAndMore(
+                                    hivePartition,
+                                    pu.getLocation().getWritePath(),
+                                    pu.getName(),
+                                    pu.getFileNames(),
+                                    updateStats
+                                ))
+                );
+            }
+        }
+    }
+
+    private static void addSuppressedExceptions(
+            List<Throwable> suppressedExceptions,
+            Throwable t,
+            List<String> descriptions,
+            String description) {
+        descriptions.add(description);
+        // A limit is needed to avoid having a huge exception object. 5 was 
chosen arbitrarily.
+        if (suppressedExceptions.size() < 5) {
+            suppressedExceptions.add(t);
+        }
+    }
+
+    private static class UpdateStatisticsTask {
+        private final String dbName;
+        private final String tableName;
+        private final Optional<String> partitionName;
+        private final HivePartitionStatistics updatePartitionStat;
+        private final boolean merge;
+
+        private boolean done;
+
+        public UpdateStatisticsTask(String dbName, String tableName, 
Optional<String> partitionName,
+                                    HivePartitionStatistics statistics, 
boolean merge) {
+            this.dbName = Objects.requireNonNull(dbName, "dbName is null");
+            this.tableName = Objects.requireNonNull(tableName, "tableName is 
null");
+            this.partitionName = Objects.requireNonNull(partitionName, 
"partitionName is null");
+            this.updatePartitionStat = Objects.requireNonNull(statistics, 
"statistics is null");
+            this.merge = merge;
+        }
+
+        public void run(HiveMetadataOps hiveOps) {
+            if (partitionName.isPresent()) {
+                hiveOps.updatePartitionStatistics(dbName, tableName, 
partitionName.get(), this::updateStatistics);
+            } else {
+                hiveOps.updateTableStatistics(dbName, tableName, 
this::updateStatistics);
+            }
+            done = true;
+        }
+
+        public void undo(HiveMetadataOps hmsOps) {
+            if (!done) {
+                return;
+            }
+            if (partitionName.isPresent()) {
+                hmsOps.updatePartitionStatistics(dbName, tableName, 
partitionName.get(), this::resetStatistics);
+            } else {
+                hmsOps.updateTableStatistics(dbName, tableName, 
this::resetStatistics);
+            }
+        }
+
+        public String getDescription() {
+            if (partitionName.isPresent()) {
+                return "alter partition parameters " + tableName + " " + 
partitionName.get();
+            } else {
+                return "alter table parameters " +  tableName;
+            }
+        }
+
+        private HivePartitionStatistics 
updateStatistics(HivePartitionStatistics currentStats) {
+            return merge ? HivePartitionStatistics.merge(currentStats, 
updatePartitionStat) : updatePartitionStat;
+        }
+
+        private HivePartitionStatistics 
resetStatistics(HivePartitionStatistics currentStatistics) {
+            return HivePartitionStatistics
+                    .reduce(currentStatistics, updatePartitionStat, 
HivePartitionStatistics.ReduceOperator.SUBTRACT);
+        }
+    }
+
+    public static class AddPartitionsTask {
+        private final List<HivePartitionWithStatistics> partitions = new 
ArrayList<>();
+        private final List<List<String>> createdPartitionValues = new 
ArrayList<>();
+
+        public boolean isEmpty() {
+            return partitions.isEmpty();
+        }
+
+        public List<HivePartitionWithStatistics> getPartitions() {
+            return partitions;
+        }
+
+        public void addPartition(HivePartitionWithStatistics partition) {
+            partitions.add(partition);
+        }
+
+        public void run(HiveMetadataOps hiveOps) {
+            HivePartition firstPartition = partitions.get(0).getPartition();
+            String dbName = firstPartition.getDbName();
+            String tableName = firstPartition.getTblName();
+            List<List<HivePartitionWithStatistics>> batchedPartitions = 
Lists.partition(partitions, 20);
+            for (List<HivePartitionWithStatistics> batch : batchedPartitions) {
+                try {
+                    hiveOps.addPartitions(dbName, tableName, batch);
+                    for (HivePartitionWithStatistics partition : batch) {
+                        
createdPartitionValues.add(partition.getPartition().getPartitionValues());
+                    }
+                } catch (Throwable t) {
+                    LOG.warn("Failed to add partition", t);
+                    throw t;
+                }
+            }
+            partitions.clear();
+        }
+
+        public List<List<String>> rollback(HiveMetadataOps hiveOps) {
+            HivePartition firstPartition = partitions.get(0).getPartition();
+            String dbName = firstPartition.getDbName();
+            String tableName = firstPartition.getTblName();
+            List<List<String>> rollbackFailedPartitions = new ArrayList<>();
+            for (List<String> createdPartitionValue : createdPartitionValues) {
+                try {
+                    hiveOps.dropPartition(dbName, tableName, 
createdPartitionValue, false);
+                } catch (Throwable t) {
+                    LOG.warn("Failed to drop partition on {}.{}.{} when 
rollback",
+                            dbName, tableName, rollbackFailedPartitions);
+                    rollbackFailedPartitions.add(createdPartitionValue);
+                }
+            }
+            return rollbackFailedPartitions;
+        }
+    }
+
+    private static class DirectoryCleanUpTask {
+        private final Path path;
+        private final boolean deleteEmptyDir;
+
+        public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) {
+            this.path = new Path(path);
+            this.deleteEmptyDir = deleteEmptyDir;
+        }
+
+        public Path getPath() {
+            return path;
+        }
+
+        public boolean isDeleteEmptyDir() {
+            return deleteEmptyDir;
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", 
DirectoryCleanUpTask.class.getSimpleName() + "[", "]")
+                .add("path=" + path)
+                .add("deleteEmptyDir=" + deleteEmptyDir)
+                .toString();
+        }
+    }
+
+    private static class DeleteRecursivelyResult {
+        private final boolean dirNoLongerExists;
+        private final List<String> notDeletedEligibleItems;
+
+        public DeleteRecursivelyResult(boolean dirNoLongerExists, List<String> 
notDeletedEligibleItems) {
+            this.dirNoLongerExists = dirNoLongerExists;
+            this.notDeletedEligibleItems = notDeletedEligibleItems;
+        }
+
+        public boolean dirNotExists() {
+            return dirNoLongerExists;
+        }
+
+        public List<String> getNotDeletedEligibleItems() {
+            return notDeletedEligibleItems;
+        }
+    }
+
+    private static class RenameDirectoryTask {
+        private final String renameFrom;
+        private final String renameTo;
+
+        public RenameDirectoryTask(String renameFrom, String renameTo) {
+            this.renameFrom = renameFrom;
+            this.renameTo = renameTo;
+        }
+
+        public String getRenameFrom() {
+            return renameFrom;
+        }
+
+        public String getRenameTo() {
+            return renameTo;
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", 
RenameDirectoryTask.class.getSimpleName() + "[", "]")
+                .add("renameFrom:" + renameFrom)
+                .add("renameTo:" + renameTo)
+                .toString();
+        }
+    }
+
+
+
+    private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) {
+        DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, 
deleteEmptyDir);
+
+        if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
+            LOG.warn("Failed to delete directory {}. Some eligible items can't 
be deleted: {}.",
+                    directory.toString(), 
deleteResult.getNotDeletedEligibleItems());
+        } else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
+            LOG.warn("Failed to delete directory {} due to dir isn't empty", 
directory.toString());
+        }
+    }
+
+    private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir) {
+        try {
+            if (!fs.exists(directory.getName()).ok()) {
+                return new DeleteRecursivelyResult(true, ImmutableList.of());
+            }
+        } catch (Exception e) {
+            ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
+            notDeletedEligibleItems.add(directory.toString() + "/*");
+            return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
+        }
+
+        return doRecursiveDeleteFiles(directory, deleteEmptyDir);
+    }
+
+    private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir) {
+        List<RemoteFile> remoteFiles = new ArrayList<>();
+
+        Status status = fs.list(directory.getName(), remoteFiles);
+        if (!status.ok()) {
+            ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
+            notDeletedEligibleItems.add(directory + "/*");
+            return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
+        }
+
+        boolean isEmptyDir = true;
+        List<String> notDeletedEligibleItems = new ArrayList<>();
+        for (RemoteFile file : remoteFiles) {
+            if (file.isFile()) {
+                Path filePath = file.getPath();
+                isEmptyDir = false;
+                // TODO Check if this file was created by this query
+                if (!deleteIfExists(filePath)) {
+                    notDeletedEligibleItems.add(filePath.toString());
+                }
+            } else if (file.isDirectory()) {
+                DeleteRecursivelyResult subResult = 
doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir);
+                if (!subResult.dirNotExists()) {
+                    isEmptyDir = false;
+                }
+                if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
+                    
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
+                }
+            } else {
+                isEmptyDir = false;
+                notDeletedEligibleItems.add(file.getPath().toString());
+            }
+        }
+
+        if (isEmptyDir && deleteEmptyDir) {
+            Verify.verify(notDeletedEligibleItems.isEmpty());
+            if (!deleteIfExists(directory)) {
+                return new DeleteRecursivelyResult(false, 
ImmutableList.of(directory + "/"));
+            }
+            // all items of the location have been deleted.
+            return new DeleteRecursivelyResult(true, ImmutableList.of());
+        }
+
+        return new DeleteRecursivelyResult(false, notDeletedEligibleItems);
+    }
+
+    public boolean deleteIfExists(Path path) {
+        Status status = fs.delete(path.getName());
+        if (status.ok()) {
+            return true;
+        }
+        return !fs.exists(path.getName()).ok();
+    }
+
+    public static class DatabaseTableName {
+        private final String dbName;
+        private final String tbName;
+
+        public DatabaseTableName(String dbName, String tbName) {
+            this.dbName = dbName;
+            this.tbName = tbName;
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+
+            if (other == null || getClass() != other.getClass()) {
+                return false;
+            }
+
+            DatabaseTableName that = (DatabaseTableName) other;
+            return Objects.equals(dbName, that.dbName) && 
Objects.equals(tbName, that.tbName);
+        }
+
+        @Override
+        public String toString() {
+            return dbName + "." + tbName;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dbName, tbName);
+        }
+
+        public String getTbName() {
+            return tbName;
+        }
+
+        public String getDbName() {
+            return dbName;
+        }
+    }
+
+    private static class TableAndMore {
+        private final Table table;
+        private final String currentLocation;
+        private final List<String> fileNames;
+        private final HivePartitionStatistics statisticsUpdate;
+
+        public TableAndMore(
+                Table table,
+                String currentLocation,
+                List<String> fileNames,
+                HivePartitionStatistics statisticsUpdate) {
+            this.table = Objects.requireNonNull(table, "table is null");
+            this.currentLocation = Objects.requireNonNull(currentLocation);
+            this.fileNames = Objects.requireNonNull(fileNames);
+            this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate, 
"statisticsUpdate is null");
+        }
+
+        public Table getTable() {
+            return table;
+        }
+
+        public String getCurrentLocation() {
+            return currentLocation;
+        }
+
+        public List<String> getFileNames() {
+            return fileNames;
+        }
+
+        public HivePartitionStatistics getStatisticsUpdate() {
+            return statisticsUpdate;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("table", table)
+                .add("statisticsUpdate", statisticsUpdate)
+                .toString();
+        }
+    }
+
+    private static class PartitionAndMore {
+        private final HivePartition partition;
+        private final String currentLocation;
+        private final String partitionName;
+        private final List<String> fileNames;
+        private final HivePartitionStatistics statisticsUpdate;
+
+        public PartitionAndMore(
+                HivePartition partition,
+                String currentLocation,
+                String partitionName,
+                List<String> fileNames,
+                HivePartitionStatistics statisticsUpdate) {
+            this.partition = Objects.requireNonNull(partition, "partition is 
null");
+            this.currentLocation = Objects.requireNonNull(currentLocation, 
"currentLocation is null");
+            this.partitionName = Objects.requireNonNull(partitionName, 
"partition is null");
+            this.fileNames = Objects.requireNonNull(fileNames, "fileNames is 
null");
+            this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate, 
"statisticsUpdate is null");
+        }
+
+        public HivePartition getPartition() {
+            return partition;
+        }
+
+        public String getCurrentLocation() {
+            return currentLocation;
+        }
+
+        public String getPartitionName() {
+            return partitionName;
+        }
+
+        public List<String> getFileNames() {
+            return fileNames;
+        }
+
+        public HivePartitionStatistics getStatisticsUpdate() {
+            return statisticsUpdate;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("partition", partition)
+                .add("currentLocation", currentLocation)
+                .add("fileNames", fileNames)
+                .toString();
+        }
+    }
+
+    private enum ActionType {
+        // drop a table/partition
+        DROP,
+        // drop a table/partition but will preserve data
+        DROP_PRESERVE_DATA,
+        // add a table/partition
+        ADD,
+        // drop then add a table/partition, like overwrite
+        ALTER,
+        // insert into an existing table/partition
+        INSERT_EXISTING,
+        // merger into an existing table/partition
+        MERGE,
+    }
+
+    public static class Action<T> {
+        private final ActionType type;
+        private final T data;
+
+        public Action(ActionType type, T data) {
+            this.type = Objects.requireNonNull(type, "type is null");
+            if (type == ActionType.DROP || type == 
ActionType.DROP_PRESERVE_DATA) {
+                Preconditions.checkArgument(data == null, "data is not null");
+            } else {
+                Objects.requireNonNull(data, "data is null");
+            }
+            this.data = data;
+        }
+
+        public ActionType getType() {
+            return type;
+        }
+
+        public T getData() {
+            Preconditions.checkState(type != ActionType.DROP);
+            return data;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("type", type)
+                .add("data", data)
+                .toString();
+        }
+    }
+
+    public synchronized Table getTable(String databaseName, String tableName) {
+        Action<TableAndMore> tableAction = tableActions.get(new 
DatabaseTableName(databaseName, tableName));
+        if (tableAction == null) {
+            return hiveOps.getClient().getTable(databaseName, tableName);
+        }
+        switch (tableAction.getType()) {
+            case ADD:
+            case ALTER:
+            case INSERT_EXISTING:
+            case MERGE:
+                return tableAction.getData().getTable();
+            case DROP:
+            case DROP_PRESERVE_DATA:
+                break;
+            default:
+                throw new IllegalStateException("Unknown action type: " + 
tableAction.getType());
+        }
+        throw new RuntimeException("Not Found table: " + databaseName + "." + 
tableName);
+    }
+
+    public synchronized void finishChangingExistingTable(
+            ActionType actionType,
+            String databaseName,
+            String tableName,
+            String location,
+            List<String> fileNames,
+            HivePartitionStatistics statisticsUpdate) {
+        DatabaseTableName databaseTableName = new 
DatabaseTableName(databaseName, tableName);
+        Action<TableAndMore> oldTableAction = 
tableActions.get(databaseTableName);
+        if (oldTableAction == null) {
+            Table table = 
hiveOps.getClient().getTable(databaseTableName.getDbName(), 
databaseTableName.getTbName());
+            tableActions.put(
+                    databaseTableName,
+                    new Action<>(
+                        actionType,
+                            new TableAndMore(
+                                table,
+                                location,
+                                fileNames,
+                                statisticsUpdate)));
+            return;
+        }
+
+        switch (oldTableAction.getType()) {
+            case DROP:
+                throw new RuntimeException("Not found table: " + 
databaseTableName);
+            case ADD:
+            case ALTER:
+            case INSERT_EXISTING:
+            case MERGE:
+                throw new UnsupportedOperationException(
+                        "Inserting into an unpartitioned table that were 
added, altered,"
+                                + "or inserted into in the same transaction is 
not supported");
+            case DROP_PRESERVE_DATA:
+                break;
+            default:
+                throw new IllegalStateException("Unknown action type: " + 
oldTableAction.getType());
+        }
+    }
+
+    public synchronized void createTable(
+            Table table, String location, List<String> fileNames,  
HivePartitionStatistics statistics) {
+        // When creating a table, it should never have partition actions. This 
is just a sanity check.
+        checkNoPartitionAction(dbName, tbName);
+        DatabaseTableName databaseTableName = new DatabaseTableName(dbName, 
tbName);
+        Action<TableAndMore> oldTableAction = 
tableActions.get(databaseTableName);
+        TableAndMore tableAndMore = new TableAndMore(table, location, 
fileNames, statistics);
+        if (oldTableAction == null) {
+            tableActions.put(databaseTableName, new Action<>(ActionType.ADD, 
tableAndMore));
+            return;
+        }
+        switch (oldTableAction.getType()) {
+            case DROP:
+                tableActions.put(databaseTableName, new 
Action<>(ActionType.ALTER, tableAndMore));
+                return;
+
+            case ADD:
+            case ALTER:
+            case INSERT_EXISTING:
+            case MERGE:
+                throw new RuntimeException("Table already exists: " + 
databaseTableName);
+            case DROP_PRESERVE_DATA:
+                break;
+            default:
+                throw new IllegalStateException("Unknown action type: " + 
oldTableAction.getType());
+        }
+    }
+
+
+    public synchronized void dropTable(String databaseName, String tableName) {
+        // Dropping table with partition actions requires cleaning up staging 
data, which is not implemented yet.
+        checkNoPartitionAction(databaseName, tableName);
+        DatabaseTableName databaseTableName = new 
DatabaseTableName(databaseName, tableName);
+        Action<TableAndMore> oldTableAction = 
tableActions.get(databaseTableName);
+        if (oldTableAction == null || oldTableAction.getType() == 
ActionType.ALTER) {
+            tableActions.put(databaseTableName, new Action<>(ActionType.DROP, 
null));
+            return;
+        }
+        switch (oldTableAction.getType()) {
+            case DROP:
+                throw new RuntimeException("Not found table: " + 
databaseTableName);
+            case ADD:
+            case ALTER:
+            case INSERT_EXISTING:
+            case MERGE:
+                throw new RuntimeException("Dropping a table added/modified in 
the same transaction is not supported");
+            case DROP_PRESERVE_DATA:
+                break;
+            default:
+                throw new IllegalStateException("Unknown action type: " + 
oldTableAction.getType());
+        }
+    }
+
+
+    private void checkNoPartitionAction(String databaseName, String tableName) 
{
+        Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
+                partitionActions.get(new DatabaseTableName(databaseName, 
tableName));
+        if (partitionActionsForTable != null && 
!partitionActionsForTable.isEmpty()) {
+            throw new RuntimeException(
+                    "Cannot make schema changes to a table with modified 
partitions in the same transaction");
+        }
+    }
+
+    public synchronized void addPartition(
+            String databaseName,
+            String tableName,
+            HivePartition partition,
+            String currentLocation,
+            String partitionName,
+            List<String> files,
+            HivePartitionStatistics statistics) {
+        Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
+                partitionActions.computeIfAbsent(new 
DatabaseTableName(databaseName, tableName), k -> new HashMap<>());
+        Action<PartitionAndMore> oldPartitionAction = 
partitionActionsForTable.get(partition.getPartitionValues());
+        if (oldPartitionAction == null) {
+            partitionActionsForTable.put(
+                    partition.getPartitionValues(),
+                    new Action<>(
+                            ActionType.ADD,
+                            new PartitionAndMore(partition, currentLocation, 
partitionName, files, statistics))
+            );
+            return;
+        }
+        switch (oldPartitionAction.getType()) {
+            case DROP:
+            case DROP_PRESERVE_DATA:
+                partitionActionsForTable.put(
+                        partition.getPartitionValues(),
+                        new Action<>(
+                                ActionType.ALTER,
+                                new PartitionAndMore(partition, 
currentLocation, partitionName, files, statistics))
+                );
+                return;
+            case ADD:
+            case ALTER:
+            case INSERT_EXISTING:
+            case MERGE:
+                throw new RuntimeException(
+                    "Partition already exists for table: "
+                        + databaseName + "." + tableName + ", partition 
values: " + partition.getPartitionValues());
+            default:
+                throw new IllegalStateException("Unknown action type: " + 
oldPartitionAction.getType());
+        }
+    }
+
+    public synchronized void dropPartition(
+            String databaseName,
+            String tableName,
+            List<String> partitionValues,
+            boolean deleteData) {
+        DatabaseTableName databaseTableName = new 
DatabaseTableName(databaseName, tableName);
+        Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
+                partitionActions.computeIfAbsent(databaseTableName, k -> new 
HashMap<>());
+        Action<PartitionAndMore> oldPartitionAction = 
partitionActionsForTable.get(partitionValues);
+        if (oldPartitionAction == null) {
+            if (deleteData) {
+                partitionActionsForTable.put(partitionValues, new 
Action<>(ActionType.DROP, null));
+            } else {
+                partitionActionsForTable.put(partitionValues, new 
Action<>(ActionType.DROP_PRESERVE_DATA, null));
+            }
+            return;
+        }
+        switch (oldPartitionAction.getType()) {
+            case DROP:
+            case DROP_PRESERVE_DATA:
+                throw new RuntimeException(
+                        "Not found partition from partition actions for " + 
databaseTableName
+                                + ", partitions: " + partitionValues);
+            case ADD:
+            case ALTER:
+            case INSERT_EXISTING:
+            case MERGE:
+                throw new RuntimeException(
+                        "Dropping a partition added in the same transaction is 
not supported: "
+                                + databaseTableName + ", partition values: " + 
partitionValues);
+            default:
+                throw new IllegalStateException("Unknown action type: " + 
oldPartitionAction.getType());
+        }
+    }
+
+    class HmsCommitter {
+
+        // update statistics for unPartitioned table or existed partition
+        private final List<UpdateStatisticsTask> updateStatisticsTasks = new 
ArrayList<>();
+        Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16);
+
+        // add new partition
+        private final AddPartitionsTask addPartitionsTask = new 
AddPartitionsTask();
+
+        // for file system rename operation
+        // whether to cancel the file system tasks
+        private final AtomicBoolean fileSystemTaskCancelled = new 
AtomicBoolean(false);
+        // file system tasks that are executed asynchronously, including 
rename_file, rename_dir
+        private final List<CompletableFuture<?>> asyncFileSystemTaskFutures = 
new ArrayList<>();
+        // when aborted, we need to delete all files under this path, even the 
current directory
+        private final Queue<DirectoryCleanUpTask> 
directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>();
+        // when aborted, we need restore directory
+        private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = 
new ArrayList<>();
+        // when finished, we need clear some directories
+        private final List<String> clearDirsForFinish = new ArrayList<>();
+        Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
+
+        public void cancelUnStartedAsyncFileSystemTask() {
+            fileSystemTaskCancelled.set(true);
+        }
+
+        private void undoUpdateStatisticsTasks() {
+            ImmutableList.Builder<CompletableFuture<?>> undoUpdateFutures = 
ImmutableList.builder();
+            for (UpdateStatisticsTask task : updateStatisticsTasks) {
+                undoUpdateFutures.add(CompletableFuture.runAsync(() -> {
+                    try {
+                        task.undo(hiveOps);
+                    } catch (Throwable throwable) {
+                        LOG.warn("Failed to rollback: {}", 
task.getDescription(), throwable);
+                    }
+                }, updateStatisticsExecutor));
+            }
+
+            for (CompletableFuture<?> undoUpdateFuture : 
undoUpdateFutures.build()) {
+                MoreFutures.getFutureValue(undoUpdateFuture);
+            }
+        }
+
+        private void undoAddPartitionsTask() {
+            if (addPartitionsTask.isEmpty()) {
+                return;
+            }
+
+            HivePartition firstPartition = 
addPartitionsTask.getPartitions().get(0).getPartition();
+            String dbName = firstPartition.getDbName();
+            String tableName = firstPartition.getTblName();
+            List<List<String>> rollbackFailedPartitions = 
addPartitionsTask.rollback(hiveOps);
+            if (!rollbackFailedPartitions.isEmpty()) {
+                LOG.warn("Failed to rollback: add_partition for partition 
values {}.{}.{}",
+                        dbName, tableName, rollbackFailedPartitions);
+            }
+        }
+
+        private void waitForAsyncFileSystemTaskSuppressThrowable() {
+            for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
+                try {
+                    future.get();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (Throwable t) {
+                    // ignore
+                }
+            }
+        }
+
+        public void prepareInsertExistingTable(TableAndMore tableAndMore) {
+            Table table = tableAndMore.getTable();
+            String targetPath = table.getSd().getLocation();
+            String writePath = tableAndMore.getCurrentLocation();
+            if (!targetPath.equals(writePath)) {
+                fs.asyncRename(
+                        fileSystemExecutor,
+                        asyncFileSystemTaskFutures,
+                        fileSystemTaskCancelled,
+                        writePath,
+                        targetPath,
+                        tableAndMore.getFileNames());
+            }
+            directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, false));
+            updateStatisticsTasks.add(
+                new UpdateStatisticsTask(
+                        dbName,
+                        tbName,
+                        Optional.empty(),
+                        tableAndMore.getStatisticsUpdate(),
+                        true
+                    ));
+        }
+
+        public void prepareAlterTable(TableAndMore tableAndMore) {
+            Table table = tableAndMore.getTable();
+            String targetPath = table.getSd().getLocation();
+            String writePath = tableAndMore.getCurrentLocation();
+            if (!targetPath.equals(writePath)) {
+                Path path = new Path(targetPath);
+                String oldTablePath = new Path(path.getParent(), "_temp_" + 
path.getName()).toString();
+                Status status = fs.renameDir(
+                        targetPath,
+                        oldTablePath,
+                        () -> renameDirectoryTasksForAbort.add(new 
RenameDirectoryTask(oldTablePath, targetPath)));
+                if (!status.ok()) {
+                    throw new RuntimeException(
+                        "Error to rename dir from " + targetPath + " to " + 
oldTablePath + status.getErrMsg());
+                }
+                clearDirsForFinish.add(oldTablePath);
+
+                status =  fs.renameDir(
+                        writePath,
+                        targetPath,
+                        () -> directoryCleanUpTasksForAbort.add(
+                                new DirectoryCleanUpTask(targetPath, true)));
+                if (!status.ok()) {
+                    throw new RuntimeException(
+                        "Error to rename dir from " + writePath + " to " + 
targetPath + ":" + status.getErrMsg());
+                }
+            }
+            updateStatisticsTasks.add(
+                new UpdateStatisticsTask(
+                    dbName,
+                    tbName,
+                    Optional.empty(),
+                    tableAndMore.getStatisticsUpdate(),
+                    false
+                ));
+        }
+
+        public void prepareAddPartition(PartitionAndMore partitionAndMore) {
+
+            HivePartition partition = partitionAndMore.getPartition();
+            String targetPath = partition.getPath();
+            String writePath = partitionAndMore.getCurrentLocation();
+
+            if (!targetPath.equals(writePath)) {
+                fs.asyncRenameDir(
+                        fileSystemExecutor,
+                        asyncFileSystemTaskFutures,
+                        fileSystemTaskCancelled,
+                        writePath,
+                        targetPath,
+                        () -> directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, true)));
+            }
+
+            StorageDescriptor sd = getTable(dbName, tbName).getSd();
+
+            HivePartition hivePartition = new HivePartition(
+                    dbName,
+                    tbName,
+                    false,
+                    sd.getInputFormat(),
+                    targetPath,
+                    partition.getPartitionValues(),
+                    Maps.newHashMap(),
+                    sd.getOutputFormat(),
+                    sd.getSerdeInfo().getSerializationLib(),
+                    hiveOps.getClient().getSchema(dbName, tbName)
+            );
+
+            HivePartitionWithStatistics partitionWithStats =
+                    new HivePartitionWithStatistics(
+                            partitionAndMore.getPartitionName(),
+                            hivePartition,
+                            partitionAndMore.getStatisticsUpdate());
+            addPartitionsTask.addPartition(partitionWithStats);
+        }
+
+        public void prepareInsertExistPartition(PartitionAndMore 
partitionAndMore) {
+
+            HivePartition partition = partitionAndMore.getPartition();
+            String targetPath = partition.getPath();
+            String writePath = partitionAndMore.getCurrentLocation();
+            directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, false));
+
+            if (!targetPath.equals(writePath)) {
+                fs.asyncRename(
+                        fileSystemExecutor,
+                        asyncFileSystemTaskFutures,
+                        fileSystemTaskCancelled,
+                        writePath,
+                        targetPath,
+                        partitionAndMore.getFileNames());
+            }
+
+            updateStatisticsTasks.add(
+                new UpdateStatisticsTask(
+                    dbName,
+                    tbName,
+                    Optional.of(partitionAndMore.getPartitionName()),
+                    partitionAndMore.getStatisticsUpdate(),
+                    true));
+        }
+
+        private void runDirectoryClearUpTasksForAbort() {
+            for (DirectoryCleanUpTask cleanUpTask : 
directoryCleanUpTasksForAbort) {
+                recursiveDeleteItems(cleanUpTask.getPath(), 
cleanUpTask.isDeleteEmptyDir());
+            }
+        }
+
+        private void runRenameDirTasksForAbort() {
+            Status status;
+            for (RenameDirectoryTask task : renameDirectoryTasksForAbort) {
+                status = fs.exists(task.getRenameFrom());
+                if (status.ok()) {
+                    status = fs.renameDir(task.getRenameFrom(), 
task.getRenameTo(), () -> {});
+                    if (!status.ok()) {
+                        LOG.warn("Failed to abort rename dir from {} to {}:{}",
+                                task.getRenameFrom(), task.getRenameTo(), 
status.getErrMsg());
+                    }
+                }
+            }
+        }
+
+        private void runClearPathsForFinish() {
+            Status status;
+            for (String path : clearDirsForFinish) {
+                status = fs.delete(path);
+                if (!status.ok()) {
+                    LOG.warn("Failed to recursively delete path {}:{}", path, 
status.getErrCode());
+                }
+            }
+        }
+
+        public void prepareAlterPartition(PartitionAndMore partitionAndMore) {
+            HivePartition partition = partitionAndMore.getPartition();
+            String targetPath = partition.getPath();
+            String writePath = partitionAndMore.getCurrentLocation();
+
+            if (!targetPath.equals(writePath)) {
+                Path path = new Path(targetPath);
+                String oldPartitionPath = new Path(path.getParent(), "_temp_" 
+ path.getName()).toString();
+                Status status = fs.renameDir(
+                        targetPath,
+                        oldPartitionPath,
+                        () -> renameDirectoryTasksForAbort.add(new 
RenameDirectoryTask(oldPartitionPath, targetPath)));
+                if (!status.ok()) {
+                    throw new RuntimeException(
+                        "Error to rename dir "
+                                + "from " + targetPath
+                                + " to " + oldPartitionPath + ":" + 
status.getErrMsg());
+                }
+                clearDirsForFinish.add(oldPartitionPath);
+
+                status = fs.renameDir(
+                    writePath,
+                    targetPath,
+                    () -> directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, true)));
+                if (!status.ok()) {
+                    throw new RuntimeException(
+                        "Error to rename dir from " + writePath + " to " + 
targetPath + ":" + status.getErrMsg());
+                }
+            }
+
+            updateStatisticsTasks.add(
+                new UpdateStatisticsTask(
+                    dbName,
+                    tbName,
+                    Optional.of(partitionAndMore.getPartitionName()),
+                    partitionAndMore.getStatisticsUpdate(),
+                    false
+                ));
+        }
+
+
+        private void waitForAsyncFileSystemTasks() {
+            for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
+                MoreFutures.getFutureValue(future, RuntimeException.class);
+            }
+        }
+
+        private void doAddPartitionsTask() {
+            if (!addPartitionsTask.isEmpty()) {
+                addPartitionsTask.run(hiveOps);
+            }
+        }
+
+        private void doUpdateStatisticsTasks() {
+            ImmutableList.Builder<CompletableFuture<?>> updateStatsFutures = 
ImmutableList.builder();
+            List<String> failedTaskDescriptions = new ArrayList<>();
+            List<Throwable> suppressedExceptions = new ArrayList<>();
+            for (UpdateStatisticsTask task : updateStatisticsTasks) {
+                updateStatsFutures.add(CompletableFuture.runAsync(() -> {
+                    try {
+                        task.run(hiveOps);
+                    } catch (Throwable t) {
+                        synchronized (suppressedExceptions) {
+                            addSuppressedExceptions(
+                                    suppressedExceptions, t, 
failedTaskDescriptions, task.getDescription());
+                        }
+                    }
+                }, updateStatisticsExecutor));
+            }
+
+            for (CompletableFuture<?> executeUpdateFuture : 
updateStatsFutures.build()) {
+                MoreFutures.getFutureValue(executeUpdateFuture);
+            }
+            if (!suppressedExceptions.isEmpty()) {
+                StringBuilder message = new StringBuilder();
+                message.append("Failed to execute some updating statistics 
tasks: ");
+                Joiner.on("; ").appendTo(message, failedTaskDescriptions);
+                RuntimeException exception = new 
RuntimeException(message.toString());
+                suppressedExceptions.forEach(exception::addSuppressed);
+                throw exception;
+            }
+        }
+
+        public void doCommit() {
+            waitForAsyncFileSystemTasks();
+            doAddPartitionsTask();
+            doUpdateStatisticsTasks();
+        }
+
+        public void rollback() {
+            cancelUnStartedAsyncFileSystemTask();
+            undoUpdateStatisticsTasks();
+            undoAddPartitionsTask();
+            waitForAsyncFileSystemTaskSuppressThrowable();
+            runDirectoryClearUpTasksForAbort();
+            runRenameDirTasksForAbort();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index beeff694ae4..f3556d13a57 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -36,13 +36,11 @@ import 
org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.operations.ExternalMetadataOps;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-import org.apache.doris.thrift.THivePartitionUpdate;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -73,6 +71,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
     public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) 
{
         this.catalog = catalog;
         this.client = client;
+        // TODO Currently only supports DFSFileSystem, more types will be 
supported in the future
         this.fs = new DFSFileSystem(catalog.getProperties());
     }
 
@@ -80,6 +79,10 @@ public class HiveMetadataOps implements ExternalMetadataOps {
         return client;
     }
 
+    public RemoteFileSystem getFs() {
+        return fs;
+    }
+
     public static HMSCachedClient createCachedClient(HiveConf hiveConf, int 
thriftClientPoolSize,
                                                      JdbcClientConfig 
jdbcClientConfig) {
         if (hiveConf != null) {
@@ -253,23 +256,6 @@ public class HiveMetadataOps implements 
ExternalMetadataOps {
         return client.getAllDatabases();
     }
 
-    public void commit(String dbName,
-                       String tableName,
-                       List<THivePartitionUpdate> hivePUs) {
-        Table table = client.getTable(dbName, tableName);
-        HMSCommitter hmsCommitter = new HMSCommitter(this, fs, table);
-        hmsCommitter.commit(hivePUs);
-        try {
-            Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(
-                    dbName,
-                    tableName,
-                    catalog.getName(),
-                    true);
-        } catch (DdlException e) {
-            LOG.warn("Failed to refresh table {}.{} : {}", dbName, tableName, 
e.getMessage());
-        }
-    }
-
     public void updateTableStatistics(
             String dbName,
             String tableName,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
index 49b14504750..df13e6737b5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
@@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 
 public class HivePartitionStatistics {
-    private static final HivePartitionStatistics EMPTY =
+    public static final HivePartitionStatistics EMPTY =
             new HivePartitionStatistics(HiveCommonStatistics.EMPTY, 
ImmutableMap.of());
 
     private final HiveCommonStatistics commonStatistics;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
index b7c28b68ff0..e72374aa5f2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
@@ -18,9 +18,9 @@
 package org.apache.doris.datasource.hive;
 
 public class HivePartitionWithStatistics {
-    private String name;
-    private HivePartition partition;
-    private HivePartitionStatistics statistics;
+    private final String name;
+    private final HivePartition partition;
+    private final HivePartitionStatistics statistics;
 
     public HivePartitionWithStatistics(String name, HivePartition partition, 
HivePartitionStatistics statistics) {
         this.name = name;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index 76976165526..66dfe763e46 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -17,13 +17,12 @@
 
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
-import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.operations.ExternalMetadataOps;
+import org.apache.doris.datasource.hive.HMSTransaction;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.plans.Plan;
@@ -35,14 +34,13 @@ import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.thrift.THivePartitionUpdate;
+import org.apache.doris.transaction.TransactionManager;
 import org.apache.doris.transaction.TransactionStatus;
 
 import com.google.common.base.Strings;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.List;
 import java.util.Optional;
 
 /**
@@ -53,6 +51,8 @@ public class HiveInsertExecutor extends 
AbstractInsertExecutor {
     private static final long INVALID_TXN_ID = -1L;
     private long txnId = INVALID_TXN_ID;
     private TransactionStatus txnStatus = TransactionStatus.ABORTED;
+    private final TransactionManager transactionManager;
+    private final String catalogName;
 
     /**
      * constructor
@@ -61,6 +61,8 @@ public class HiveInsertExecutor extends 
AbstractInsertExecutor {
                               String labelName, NereidsPlanner planner,
                               Optional<InsertCommandContext> insertCtx) {
         super(ctx, table, labelName, planner, insertCtx);
+        catalogName = table.getCatalog().getName();
+        transactionManager = table.getCatalog().getTransactionManager();
     }
 
     public long getTxnId() {
@@ -69,7 +71,9 @@ public class HiveInsertExecutor extends 
AbstractInsertExecutor {
 
     @Override
     public void beginTransaction() {
-        // TODO: use hive txn rather than internal txn
+        txnId = transactionManager.begin();
+        HMSTransaction transaction = (HMSTransaction) 
transactionManager.getTransaction(txnId);
+        
coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates);
     }
 
     @Override
@@ -93,13 +97,18 @@ public class HiveInsertExecutor extends 
AbstractInsertExecutor {
         if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
             LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
         } else {
-            // TODO use transaction
-            List<THivePartitionUpdate> ups = 
coordinator.getHivePartitionUpdates();
-            loadedRows = 
ups.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
-            ExternalCatalog catalog = ((HMSExternalTable) table).getCatalog();
-            ExternalMetadataOps metadataOps = catalog.getMetadataOps();
-            ((HiveMetadataOps) metadataOps).commit(((HMSExternalTable) 
table).getDbName(), table.getName(), ups);
+            HMSTransaction transaction = (HMSTransaction) 
transactionManager.getTransaction(txnId);
+            loadedRows = transaction.getUpdateCnt();
+            String dbName = ((HMSExternalTable) table).getDbName();
+            String tbName = table.getName();
+            transaction.finishInsertTable(dbName, tbName);
+            transactionManager.commit(txnId);
             txnStatus = TransactionStatus.COMMITTED;
+            Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(
+                    dbName,
+                    tbName,
+                    catalogName,
+                    true);
         }
     }
 
@@ -117,6 +126,7 @@ public class HiveInsertExecutor extends 
AbstractInsertExecutor {
             }
         }
         ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
+        transactionManager.rollback(txnId);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9ac96c27c87..b00854a84ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -152,6 +152,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 public class Coordinator implements CoordInterface {
@@ -235,8 +236,8 @@ public class Coordinator implements CoordInterface {
     private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
     private final List<TErrorTabletInfo> errorTabletInfos = 
Lists.newArrayList();
 
-    // TODO moved to ExternalTransactionManager
-    private final List<THivePartitionUpdate> hivePartitionUpdates = 
Lists.newArrayList();
+    // Collect all hivePartitionUpdates obtained from be
+    Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc;
 
     // Input parameter
     private long jobId = -1; // job which this task belongs to
@@ -503,10 +504,6 @@ public class Coordinator implements CoordInterface {
         return errorTabletInfos;
     }
 
-    public List<THivePartitionUpdate> getHivePartitionUpdates() {
-        return hivePartitionUpdates;
-    }
-
     public Map<String, Integer> getBeToInstancesNum() {
         Map<String, Integer> result = Maps.newTreeMap();
         if (enablePipelineEngine) {
@@ -2456,13 +2453,8 @@ public class Coordinator implements CoordInterface {
         // TODO: more ranges?
     }
 
-    private void updateHivePartitionUpdates(List<THivePartitionUpdate> 
hivePartitionUpdates) {
-        lock.lock();
-        try {
-            this.hivePartitionUpdates.addAll(hivePartitionUpdates);
-        } finally {
-            lock.unlock();
-        }
+    public void 
setHivePartitionUpdateFunc(Consumer<List<THivePartitionUpdate>> 
hivePartitionUpdateFunc) {
+        this.hivePartitionUpdateFunc = hivePartitionUpdateFunc;
     }
 
     // update job progress from BE
@@ -2512,8 +2504,8 @@ public class Coordinator implements CoordInterface {
             if (params.isSetErrorTabletInfos()) {
                 updateErrorTabletInfos(params.getErrorTabletInfos());
             }
-            if (params.isSetHivePartitionUpdates()) {
-                updateHivePartitionUpdates(params.getHivePartitionUpdates());
+            if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc 
!= null) {
+                
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
             }
 
             Preconditions.checkArgument(params.isSetDetailedReport());
@@ -2577,8 +2569,8 @@ public class Coordinator implements CoordInterface {
                 if (params.isSetErrorTabletInfos()) {
                     updateErrorTabletInfos(params.getErrorTabletInfos());
                 }
-                if (params.isSetHivePartitionUpdates()) {
-                    
updateHivePartitionUpdates(params.getHivePartitionUpdates());
+                if (params.isSetHivePartitionUpdates() && 
hivePartitionUpdateFunc != null) {
+                    
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
                 }
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Query {} instance {} is marked done",
@@ -2649,8 +2641,8 @@ public class Coordinator implements CoordInterface {
                 if (params.isSetErrorTabletInfos()) {
                     updateErrorTabletInfos(params.getErrorTabletInfos());
                 }
-                if (params.isSetHivePartitionUpdates()) {
-                    
updateHivePartitionUpdates(params.getHivePartitionUpdates());
+                if (params.isSetHivePartitionUpdates() && 
hivePartitionUpdateFunc != null) {
+                    
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
                 }
                 
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
new file mode 100644
index 00000000000..07304fb23ab
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.hive.HMSTransaction;
+import org.apache.doris.datasource.hive.HiveMetadataOps;
+import org.apache.doris.persist.EditLog;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HiveTransactionManager implements TransactionManager {
+
+    private final Map<Long, HMSTransaction> transactions = new 
ConcurrentHashMap<>();
+    private final TransactionIdGenerator idGenerator = new 
TransactionIdGenerator();
+    private final HiveMetadataOps ops;
+
+    public HiveTransactionManager(HiveMetadataOps ops) {
+        this.ops = ops;
+    }
+
+    public Long getNextTransactionId() {
+        return idGenerator.getNextTransactionId();
+    }
+
+    @Override
+    public void setEditLog(EditLog editLog) {
+        this.idGenerator.setEditLog(editLog);
+    }
+
+    @Override
+    public long begin() {
+        long id = idGenerator.getNextTransactionId();
+        HMSTransaction hiveTransaction = new HMSTransaction(ops);
+        transactions.put(id, hiveTransaction);
+        return id;
+    }
+
+    @Override
+    public void commit(long id) throws UserException {
+        getTransactionWithException(id).commit();
+        transactions.remove(id);
+    }
+
+    @Override
+    public void rollback(long id) {
+        getTransactionWithException(id).rollback();
+        transactions.remove(id);
+    }
+
+    @Override
+    public HMSTransaction getTransaction(long id) {
+        return getTransactionWithException(id);
+    }
+
+    public HMSTransaction getTransactionWithException(long id) {
+        HMSTransaction hiveTransaction = transactions.get(id);
+        if (hiveTransaction == null) {
+            throw new RuntimeException("Can't find transaction for " + id);
+        }
+        return hiveTransaction;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
 b/fe/fe-core/src/main/java/org/apache/doris/transaction/Transaction.java
similarity index 56%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
copy to fe/fe-core/src/main/java/org/apache/doris/transaction/Transaction.java
index b7c28b68ff0..b319fb78983 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/Transaction.java
@@ -15,28 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.datasource.hive;
+package org.apache.doris.transaction;
 
-public class HivePartitionWithStatistics {
-    private String name;
-    private HivePartition partition;
-    private HivePartitionStatistics statistics;
+import org.apache.doris.common.UserException;
 
-    public HivePartitionWithStatistics(String name, HivePartition partition, 
HivePartitionStatistics statistics) {
-        this.name = name;
-        this.partition = partition;
-        this.statistics = statistics;
-    }
+public interface Transaction {
 
-    public String getName() {
-        return name;
-    }
+    void commit() throws UserException;
 
-    public HivePartition getPartition() {
-        return partition;
-    }
-
-    public HivePartitionStatistics getStatistics() {
-        return statistics;
-    }
+    void rollback();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
 b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
similarity index 56%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
index b7c28b68ff0..daacdecf152 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
@@ -15,28 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.datasource.hive;
-
-public class HivePartitionWithStatistics {
-    private String name;
-    private HivePartition partition;
-    private HivePartitionStatistics statistics;
-
-    public HivePartitionWithStatistics(String name, HivePartition partition, 
HivePartitionStatistics statistics) {
-        this.name = name;
-        this.partition = partition;
-        this.statistics = statistics;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public HivePartition getPartition() {
-        return partition;
-    }
-
-    public HivePartitionStatistics getStatistics() {
-        return statistics;
-    }
+package org.apache.doris.transaction;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.persist.EditLog;
+
+public interface TransactionManager {
+
+    void setEditLog(EditLog editLog);
+
+    long begin();
+
+    void commit(long id) throws UserException;
+
+    void rollback(long id);
+
+    Transaction getTransaction(long id);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
similarity index 56%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
index b7c28b68ff0..334258a3f12 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
@@ -15,28 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.datasource.hive;
+package org.apache.doris.transaction;
 
-public class HivePartitionWithStatistics {
-    private String name;
-    private HivePartition partition;
-    private HivePartitionStatistics statistics;
+import org.apache.doris.datasource.hive.HiveMetadataOps;
 
-    public HivePartitionWithStatistics(String name, HivePartition partition, 
HivePartitionStatistics statistics) {
-        this.name = name;
-        this.partition = partition;
-        this.statistics = statistics;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public HivePartition getPartition() {
-        return partition;
-    }
+public class TransactionManagerFactory {
 
-    public HivePartitionStatistics getStatistics() {
-        return statistics;
+    public static TransactionManager 
createHiveTransactionManager(HiveMetadataOps ops) {
+        return new HiveTransactionManager(ops);
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index 3098d65e952..fc939625ea9 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -17,13 +17,17 @@
 
 package org.apache.doris.datasource.hive;
 
+import org.apache.doris.backup.Status;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 import org.apache.doris.thrift.THiveLocationParams;
 import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.thrift.TUpdateMode;
 
 import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.After;
@@ -41,6 +45,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 @Ignore
 public class HmsCommitTest {
@@ -61,6 +68,7 @@ public class HmsCommitTest {
         dbLocation = "file://" + warehousePath.toAbsolutePath() + "/";
         createTestHiveCatalog();
         createTestHiveDatabase();
+        mockFs();
     }
 
     @AfterClass
@@ -90,22 +98,55 @@ public class HmsCommitTest {
         hmsClient.createDatabase(dbMetadata);
     }
 
+    public static void mockFs() {
+
+        new MockUp<DFSFileSystem>(DFSFileSystem.class) {
+            @Mock
+            public void asyncRenameDir(Executor executor,
+                                       List<CompletableFuture<?>> 
renameFileFutures,
+                                       AtomicBoolean cancelled,
+                                       String origFilePath,
+                                       String destFilePath,
+                                       Runnable runWhenPathNotExist) {
+            }
+
+            @Mock
+            public void asyncRename(Executor executor,
+                                    List<CompletableFuture<?>> 
renameFileFutures,
+                                    AtomicBoolean cancelled,
+                                    String origFilePath,
+                                    String destFilePath,
+                                    List<String> fileNames) {
+            }
+
+            @Mock
+            public Status renameDir(String origFilePath,
+                                    String destFilePath,
+                                    Runnable runWhenPathNotExist) {
+                return Status.OK;
+            }
+        };
+    }
+
     @Before
     public void before() {
         // create table
         List<Column> columns = new ArrayList<>();
         columns.add(new Column("c1", PrimitiveType.INT, true));
         columns.add(new Column("c2", PrimitiveType.STRING, true));
+        columns.add(new Column("c3", PrimitiveType.STRING, false));
         List<String> partitionKeys = new ArrayList<>();
         partitionKeys.add("c3");
         HiveTableMetadata tableMetadata = new HiveTableMetadata(
                 dbName, tbWithPartition, columns, partitionKeys,
                 new HashMap<>(), fileFormat);
         hmsClient.createTable(tableMetadata, true);
+
         HiveTableMetadata tableMetadata2 = new HiveTableMetadata(
                 dbName, tbWithoutPartition, columns, new ArrayList<>(),
                 new HashMap<>(), fileFormat);
         hmsClient.createTable(tableMetadata2, true);
+
     }
 
     @After
@@ -118,11 +159,7 @@ public class HmsCommitTest {
     public void testNewPartitionForUnPartitionedTable() {
         List<THivePartitionUpdate> pus = new ArrayList<>();
         pus.add(createRandomNew("a"));
-        try {
-            hmsOps.commit(dbName, tbWithoutPartition, pus);
-        } catch (Exception e) {
-            Assert.assertEquals("Not support mode:[NEW] in unPartitioned 
table", e.getMessage());
-        }
+        Assert.assertThrows(Exception.class, () -> commit(dbName, 
tbWithoutPartition, pus));
     }
 
     @Test
@@ -131,7 +168,7 @@ public class HmsCommitTest {
         pus.add(createRandomAppend(""));
         pus.add(createRandomAppend(""));
         pus.add(createRandomAppend(""));
-        hmsOps.commit(dbName, tbWithoutPartition, pus);
+        commit(dbName, tbWithoutPartition, pus);
         Table table = hmsClient.getTable(dbName, tbWithoutPartition);
         assertNumRows(3, table);
 
@@ -139,7 +176,7 @@ public class HmsCommitTest {
         pus2.add(createRandomAppend(""));
         pus2.add(createRandomAppend(""));
         pus2.add(createRandomAppend(""));
-        hmsOps.commit(dbName, tbWithoutPartition, pus2);
+        commit(dbName, tbWithoutPartition, pus2);
         table = hmsClient.getTable(dbName, tbWithoutPartition);
         assertNumRows(6, table);
     }
@@ -151,7 +188,7 @@ public class HmsCommitTest {
         pus.add(createRandomOverwrite(""));
         pus.add(createRandomOverwrite(""));
         pus.add(createRandomOverwrite(""));
-        hmsOps.commit(dbName, tbWithoutPartition, pus);
+        commit(dbName, tbWithoutPartition, pus);
         Table table = hmsClient.getTable(dbName, tbWithoutPartition);
         assertNumRows(3, table);
     }
@@ -165,7 +202,7 @@ public class HmsCommitTest {
         pus.add(createRandomNew("b"));
         pus.add(createRandomNew("b"));
         pus.add(createRandomNew("c"));
-        hmsOps.commit(dbName, tbWithPartition, pus);
+        commit(dbName, tbWithPartition, pus);
 
         Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
         assertNumRows(3, pa);
@@ -186,7 +223,7 @@ public class HmsCommitTest {
         pus.add(createRandomAppend("b"));
         pus.add(createRandomAppend("b"));
         pus.add(createRandomAppend("c"));
-        hmsOps.commit(dbName, tbWithPartition, pus);
+        commit(dbName, tbWithPartition, pus);
 
         Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
         assertNumRows(6, pa);
@@ -203,7 +240,7 @@ public class HmsCommitTest {
         pus.add(createRandomOverwrite("a"));
         pus.add(createRandomOverwrite("b"));
         pus.add(createRandomOverwrite("c"));
-        hmsOps.commit(dbName, tbWithPartition, pus);
+        commit(dbName, tbWithPartition, pus);
 
         Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
         assertNumRows(1, pa);
@@ -221,14 +258,14 @@ public class HmsCommitTest {
             pus.add(createRandomNew("" + i));
         }
 
-        hmsOps.commit(dbName, tbWithPartition, pus);
+        commit(dbName, tbWithPartition, pus);
         for (int i = 0; i < nums; i++) {
             Partition p = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("" + i));
             assertNumRows(1, p);
         }
 
         try {
-            hmsOps.commit(dbName, tbWithPartition, pus);
+            commit(dbName, tbWithPartition, pus);
         } catch (Exception e) {
             Assert.assertTrue(e.getMessage().contains("failed to add 
partitions"));
         }
@@ -277,4 +314,13 @@ public class HmsCommitTest {
     public THivePartitionUpdate createRandomOverwrite(String partition) {
         return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
     }
+
+    public void commit(String dbName,
+                       String tableName,
+                       List<THivePartitionUpdate> hivePUs) {
+        HMSTransaction hmsTransaction = new HMSTransaction(hmsOps);
+        hmsTransaction.setHivePartitionUpdates(hivePUs);
+        hmsTransaction.finishInsertTable(dbName, tableName);
+        hmsTransaction.commit();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to