This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 23f9ee042fe [feature](hive)support ExternalTransaction for writing
exteral table (#32726)
23f9ee042fe is described below
commit 23f9ee042fe1f8f754865219b1b6d0d11a7aa1de
Author: wuwenchi <[email protected]>
AuthorDate: Thu Apr 4 08:10:49 2024 +0800
[feature](hive)support ExternalTransaction for writing exteral table
(#32726)
Issue #31442
Add `TransactionManager` and `Transaction`.
```
public interface Transaction {
void commit() throws UserException;
void rollback();
}
public interface TransactionManager {
long begin();
void commit(long id) throws UserException;
void rollback(long id);
Transaction getTransaction(long id);
}
```
`TransactionManager` is used to manage all external transactions:
The application layer should manage the entire transaction through this
`TransactionManager`, like:
```
transactionManager.commit();
transactionManager.rollback();
```
`Transaction` is an interface. You can implement this interface according
to the specific content, such as `HMSTransaction` currently implemented,
iceberg that may be implemented in the future, etc.
---
.../apache/doris/datasource/ExternalCatalog.java | 2 +
.../apache/doris/datasource/hive/HMSCommitter.java | 754 -----------
.../doris/datasource/hive/HMSExternalCatalog.java | 6 +-
.../doris/datasource/hive/HMSTransaction.java | 1322 ++++++++++++++++++++
.../doris/datasource/hive/HiveMetadataOps.java | 24 +-
.../datasource/hive/HivePartitionStatistics.java | 2 +-
.../hive/HivePartitionWithStatistics.java | 6 +-
.../plans/commands/insert/HiveInsertExecutor.java | 34 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 30 +-
.../doris/transaction/HiveTransactionManager.java | 79 ++
.../Transaction.java} | 25 +-
.../TransactionManager.java} | 40 +-
.../TransactionManagerFactory.java} | 25 +-
.../doris/datasource/hive/HmsCommitTest.java | 72 +-
14 files changed, 1535 insertions(+), 886 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index baa6eefbad1..f3ae62884ba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -53,6 +53,7 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterCatalogExecutor;
+import org.apache.doris.transaction.TransactionManager;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -114,6 +115,7 @@ public abstract class ExternalCatalog
private boolean objectCreated = false;
protected boolean invalidCacheInInit = true;
protected ExternalMetadataOps metadataOps;
+ protected TransactionManager transactionManager;
private ExternalSchemaCache schemaCache;
private String comment;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java
deleted file mode 100644
index af26f36d6b9..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java
+++ /dev/null
@@ -1,754 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-//
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java
-// and modified by Doris
-
-package org.apache.doris.datasource.hive;
-
-import org.apache.doris.backup.Status;
-import org.apache.doris.common.Pair;
-import org.apache.doris.fs.remote.RemoteFile;
-import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.thrift.THivePartitionUpdate;
-import org.apache.doris.thrift.TUpdateMode;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import io.airlift.concurrent.MoreFutures;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.StringJoiner;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-public class HMSCommitter {
- private static final Logger LOG = LogManager.getLogger(HMSCommitter.class);
- private final HiveMetadataOps hiveOps;
- private final RemoteFileSystem fs;
- private final Table table;
-
- // update statistics for unPartitioned table or existed partition
- private final List<UpdateStatisticsTask> updateStatisticsTasks = new
ArrayList<>();
- Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16);
-
- // add new partition
- private final AddPartitionsTask addPartitionsTask = new
AddPartitionsTask();
- private static final int PARTITION_COMMIT_BATCH_SIZE = 20;
-
- // for file system rename operation
- // whether to cancel the file system tasks
- private final AtomicBoolean fileSystemTaskCancelled = new
AtomicBoolean(false);
- // file system tasks that are executed asynchronously, including
rename_file, rename_dir
- private final List<CompletableFuture<?>> asyncFileSystemTaskFutures = new
ArrayList<>();
- // when aborted, we need to delete all files under this path, even the
current directory
- private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort =
new ConcurrentLinkedQueue<>();
- // when aborted, we need restore directory
- private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new
ArrayList<>();
- // when finished, we need clear some directories
- private final List<String> clearDirsForFinish = new ArrayList<>();
- Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
-
- public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table
table) {
- this.hiveOps = hiveOps;
- this.fs = fs;
- this.table = table;
- }
-
- public void commit(List<THivePartitionUpdate> hivePUs) {
- try {
- prepare(mergePartitions(hivePUs));
- doCommit();
- } catch (Throwable t) {
- LOG.warn("Failed to commit for {}.{}, abort it.",
table.getDbName(), table.getTableName());
- try {
- cancelUnStartedAsyncFileSystemTask();
- undoUpdateStatisticsTasks();
- undoAddPartitionsTask();
- waitForAsyncFileSystemTaskSuppressThrowable();
- runDirectoryClearUpTasksForAbort();
- runRenameDirTasksForAbort();
- } catch (Throwable e) {
- t.addSuppressed(new Exception("Failed to roll back after
commit failure", e));
- }
- throw t;
- } finally {
- runClearPathsForFinish();
- }
- }
-
- public void prepare(List<THivePartitionUpdate> hivePUs) {
-
- List<Pair<THivePartitionUpdate, HivePartitionStatistics>>
insertExistsPartitions = new ArrayList<>();
-
- for (THivePartitionUpdate pu : hivePUs) {
- TUpdateMode updateMode = pu.getUpdateMode();
- HivePartitionStatistics hivePartitionStatistics =
HivePartitionStatistics.fromCommonStatistics(
- pu.getRowCount(),
- pu.getFileNamesSize(),
- pu.getFileSize());
- if (table.getPartitionKeysSize() == 0) {
- Preconditions.checkArgument(hivePUs.size() == 1,
- "When updating a non-partitioned table, multiple
partitions should not be written");
- switch (updateMode) {
- case APPEND:
- prepareAppendTable(pu, hivePartitionStatistics);
- break;
- case OVERWRITE:
- prepareOverwriteTable(pu, hivePartitionStatistics);
- break;
- default:
- throw new RuntimeException("Not support mode:[" +
updateMode + "] in unPartitioned table");
- }
- } else {
- switch (updateMode) {
- case NEW:
- prepareCreateNewPartition(pu, hivePartitionStatistics);
- break;
- case APPEND:
- insertExistsPartitions.add(Pair.of(pu,
hivePartitionStatistics));
- break;
- case OVERWRITE:
- prepareOverwritePartition(pu, hivePartitionStatistics);
- break;
- default:
- throw new RuntimeException("Not support mode:[" +
updateMode + "] in unPartitioned table");
- }
- }
- }
-
- if (!insertExistsPartitions.isEmpty()) {
- prepareInsertExistPartition(insertExistsPartitions);
- }
- }
-
- public List<THivePartitionUpdate>
mergePartitions(List<THivePartitionUpdate> hivePUs) {
- Map<String, THivePartitionUpdate> mm = new HashMap<>();
- for (THivePartitionUpdate pu : hivePUs) {
- if (mm.containsKey(pu.getName())) {
- THivePartitionUpdate old = mm.get(pu.getName());
- old.setFileSize(old.getFileSize() + pu.getFileSize());
- old.setRowCount(old.getRowCount() + pu.getRowCount());
- old.getFileNames().addAll(pu.getFileNames());
- } else {
- mm.put(pu.getName(), pu);
- }
- }
- return new ArrayList<>(mm.values());
- }
-
- public void doCommit() {
- waitForAsyncFileSystemTasks();
- doAddPartitionsTask();
- doUpdateStatisticsTasks();
- }
-
- public void rollback() {
-
- }
-
- public void cancelUnStartedAsyncFileSystemTask() {
- fileSystemTaskCancelled.set(true);
- }
-
- private void undoUpdateStatisticsTasks() {
- ImmutableList.Builder<CompletableFuture<?>> undoUpdateFutures =
ImmutableList.builder();
- for (UpdateStatisticsTask task : updateStatisticsTasks) {
- undoUpdateFutures.add(CompletableFuture.runAsync(() -> {
- try {
- task.undo(hiveOps);
- } catch (Throwable throwable) {
- LOG.warn("Failed to rollback: {}", task.getDescription(),
throwable);
- }
- }, updateStatisticsExecutor));
- }
-
- for (CompletableFuture<?> undoUpdateFuture :
undoUpdateFutures.build()) {
- MoreFutures.getFutureValue(undoUpdateFuture);
- }
- }
-
- private void undoAddPartitionsTask() {
- if (addPartitionsTask.isEmpty()) {
- return;
- }
-
- HivePartition firstPartition =
addPartitionsTask.getPartitions().get(0).getPartition();
- String dbName = firstPartition.getDbName();
- String tableName = firstPartition.getTblName();
- List<List<String>> rollbackFailedPartitions =
addPartitionsTask.rollback(hiveOps);
- if (!rollbackFailedPartitions.isEmpty()) {
- LOG.warn("Failed to rollback: add_partition for partition values
{}.{}.{}",
- dbName, tableName, rollbackFailedPartitions);
- }
- }
-
- private void waitForAsyncFileSystemTaskSuppressThrowable() {
- for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
- try {
- future.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Throwable t) {
- // ignore
- }
- }
- }
-
- public void prepareAppendTable(THivePartitionUpdate pu,
HivePartitionStatistics ps) {
- String targetPath = pu.getLocation().getTargetPath();
- String writePath = pu.getLocation().getWritePath();
- if (!targetPath.equals(writePath)) {
- fs.asyncRename(
- fileSystemExecutor,
- asyncFileSystemTaskFutures,
- fileSystemTaskCancelled,
- writePath,
- targetPath,
- pu.getFileNames());
- }
- directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath,
false));
- updateStatisticsTasks.add(
- new UpdateStatisticsTask(
- table.getDbName(),
- table.getTableName(),
- Optional.empty(),
- ps,
- true
- ));
- }
-
- public void prepareOverwriteTable(THivePartitionUpdate pu,
HivePartitionStatistics ps) {
- String targetPath = pu.getLocation().getTargetPath();
- String writePath = pu.getLocation().getWritePath();
- if (!targetPath.equals(writePath)) {
- Path path = new Path(targetPath);
- String oldTablePath = new Path(path.getParent(), "_temp_" +
path.getName()).toString();
- Status status = fs.renameDir(
- targetPath,
- oldTablePath,
- () -> renameDirectoryTasksForAbort.add(new
RenameDirectoryTask(oldTablePath, targetPath)));
- if (!status.ok()) {
- throw new RuntimeException(
- "Error to rename dir from " + targetPath + " to " +
oldTablePath + status.getErrMsg());
- }
- clearDirsForFinish.add(oldTablePath);
-
- status = fs.renameDir(
- writePath,
- targetPath,
- () -> directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, true)));
- if (!status.ok()) {
- throw new RuntimeException(
- "Error to rename dir from " + writePath + " to " +
targetPath + ":" + status.getErrMsg());
- }
- }
- updateStatisticsTasks.add(
- new UpdateStatisticsTask(
- table.getDbName(),
- table.getTableName(),
- Optional.empty(),
- ps,
- false
- ));
- }
-
- public void prepareCreateNewPartition(THivePartitionUpdate pu,
HivePartitionStatistics ps) {
-
- String targetPath = pu.getLocation().getTargetPath();
- String writePath = pu.getLocation().getWritePath();
-
- if (!targetPath.equals(writePath)) {
- fs.asyncRenameDir(
- fileSystemExecutor,
- asyncFileSystemTaskFutures,
- fileSystemTaskCancelled,
- writePath,
- targetPath,
- () -> directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, true)));
- }
-
- StorageDescriptor sd = table.getSd();
-
- HivePartition hivePartition = new HivePartition(
- table.getDbName(),
- table.getTableName(),
- false,
- sd.getInputFormat(),
- pu.getLocation().getTargetPath(),
- HiveUtil.toPartitionValues(pu.getName()),
- Maps.newHashMap(),
- sd.getOutputFormat(),
- sd.getSerdeInfo().getSerializationLib(),
- hiveOps.getClient().getSchema(table.getDbName(),
table.getTableName())
- );
- HivePartitionWithStatistics partitionWithStats =
- new HivePartitionWithStatistics(pu.getName(), hivePartition,
ps);
- addPartitionsTask.addPartition(partitionWithStats);
- }
-
- public void prepareInsertExistPartition(List<Pair<THivePartitionUpdate,
HivePartitionStatistics>> partitions) {
- for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>>
partitionBatch :
- Iterables.partition(partitions, 100)) {
- List<String> partitionNames = partitionBatch.stream()
- .map(pair -> pair.first.getName())
- .collect(Collectors.toList());
-
- Map<String, Partition> partitionsByNamesMap =
HiveUtil.convertToNamePartitionMap(
- partitionNames,
- hiveOps.getClient().getPartitions(table.getDbName(),
table.getTableName(), partitionNames));
-
- for (int i = 0; i < partitionsByNamesMap.size(); i++) {
- String partitionName = partitionNames.get(i);
- if (partitionsByNamesMap.get(partitionName) == null) {
- // Prevent this partition from being deleted by other
engines
- throw new RuntimeException("Not found partition: " +
partitionName);
- }
-
- THivePartitionUpdate pu = partitionBatch.get(i).first;
- HivePartitionStatistics updateStats =
partitionBatch.get(i).second;
-
- String writePath = pu.getLocation().getWritePath();
- String targetPath = pu.getLocation().getTargetPath();
- directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, false));
-
- if (!targetPath.equals(writePath)) {
- fs.asyncRename(
- fileSystemExecutor,
- asyncFileSystemTaskFutures,
- fileSystemTaskCancelled,
- writePath,
- targetPath,
- pu.getFileNames());
- }
-
- updateStatisticsTasks.add(
- new UpdateStatisticsTask(
- table.getDbName(),
- table.getTableName(),
- Optional.of(pu.getName()),
- updateStats,
- true));
- }
- }
- }
-
-
- public void prepareOverwritePartition(THivePartitionUpdate pu,
HivePartitionStatistics ps) {
- String targetPath = pu.getLocation().getTargetPath();
- String writePath = pu.getLocation().getWritePath();
- if (!targetPath.equals(writePath)) {
- Path path = new Path(targetPath);
- String oldPartitionPath = new Path(path.getParent(), "_temp_" +
path.getName()).toString();
- Status status = fs.renameDir(
- targetPath,
- oldPartitionPath,
- () -> renameDirectoryTasksForAbort.add(new
RenameDirectoryTask(oldPartitionPath, targetPath)));
- if (!status.ok()) {
- throw new RuntimeException(
- "Error to rename dir from " + targetPath + " to " +
oldPartitionPath + ":" + status.getErrMsg());
- }
- clearDirsForFinish.add(oldPartitionPath);
-
- status = fs.renameDir(
- writePath,
- targetPath,
- () -> directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, true)));
- if (!status.ok()) {
- throw new RuntimeException(
- "Error to rename dir from " + writePath + " to " +
targetPath + ":" + status.getErrMsg());
- }
- }
- updateStatisticsTasks.add(
- new UpdateStatisticsTask(
- table.getDbName(),
- table.getTableName(),
- Optional.of(pu.getName()),
- ps,
- false
- ));
- }
-
-
- private void waitForAsyncFileSystemTasks() {
- for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
- MoreFutures.getFutureValue(future, RuntimeException.class);
- }
- }
-
- private void doAddPartitionsTask() {
- if (!addPartitionsTask.isEmpty()) {
- addPartitionsTask.run(hiveOps);
- }
- }
-
- private void doUpdateStatisticsTasks() {
- ImmutableList.Builder<CompletableFuture<?>> updateStatsFutures =
ImmutableList.builder();
- List<String> failedTaskDescriptions = new ArrayList<>();
- List<Throwable> suppressedExceptions = new ArrayList<>();
- for (UpdateStatisticsTask task : updateStatisticsTasks) {
- updateStatsFutures.add(CompletableFuture.runAsync(() -> {
- try {
- task.run(hiveOps);
- } catch (Throwable t) {
- synchronized (suppressedExceptions) {
- addSuppressedExceptions(suppressedExceptions, t,
failedTaskDescriptions, task.getDescription());
- }
- }
- }, updateStatisticsExecutor));
- }
-
- for (CompletableFuture<?> executeUpdateFuture :
updateStatsFutures.build()) {
- MoreFutures.getFutureValue(executeUpdateFuture);
- }
- if (!suppressedExceptions.isEmpty()) {
- StringBuilder message = new StringBuilder();
- message.append("Failed to execute some updating statistics tasks:
");
- Joiner.on("; ").appendTo(message, failedTaskDescriptions);
- RuntimeException exception = new
RuntimeException(message.toString());
- suppressedExceptions.forEach(exception::addSuppressed);
- throw exception;
- }
- }
-
- private static void addSuppressedExceptions(
- List<Throwable> suppressedExceptions,
- Throwable t,
- List<String> descriptions,
- String description) {
- descriptions.add(description);
- // A limit is needed to avoid having a huge exception object. 5 was
chosen arbitrarily.
- if (suppressedExceptions.size() < 5) {
- suppressedExceptions.add(t);
- }
- }
-
- private static class AddPartition {
-
- }
-
- private static class UpdateStatisticsTask {
- private final String dbName;
- private final String tableName;
- private final Optional<String> partitionName;
- private final HivePartitionStatistics updatePartitionStat;
- private final boolean merge;
-
- private boolean done;
-
- public UpdateStatisticsTask(String dbName, String tableName,
Optional<String> partitionName,
- HivePartitionStatistics statistics,
boolean merge) {
- this.dbName = Objects.requireNonNull(dbName, "dbName is null");
- this.tableName = Objects.requireNonNull(tableName, "tableName is
null");
- this.partitionName = Objects.requireNonNull(partitionName,
"partitionName is null");
- this.updatePartitionStat = Objects.requireNonNull(statistics,
"statistics is null");
- this.merge = merge;
- }
-
- public void run(HiveMetadataOps hiveOps) {
- if (partitionName.isPresent()) {
- hiveOps.updatePartitionStatistics(dbName, tableName,
partitionName.get(), this::updateStatistics);
- } else {
- hiveOps.updateTableStatistics(dbName, tableName,
this::updateStatistics);
- }
- done = true;
- }
-
- public void undo(HiveMetadataOps hmsOps) {
- if (!done) {
- return;
- }
- if (partitionName.isPresent()) {
- hmsOps.updatePartitionStatistics(dbName, tableName,
partitionName.get(), this::resetStatistics);
- } else {
- hmsOps.updateTableStatistics(dbName, tableName,
this::resetStatistics);
- }
- }
-
- public String getDescription() {
- if (partitionName.isPresent()) {
- return "alter partition parameters " + tableName + " " +
partitionName.get();
- } else {
- return "alter table parameters " + tableName;
- }
- }
-
- private HivePartitionStatistics
updateStatistics(HivePartitionStatistics currentStats) {
- return merge ? HivePartitionStatistics.merge(currentStats,
updatePartitionStat) : updatePartitionStat;
- }
-
- private HivePartitionStatistics
resetStatistics(HivePartitionStatistics currentStatistics) {
- return HivePartitionStatistics
- .reduce(currentStatistics, updatePartitionStat,
HivePartitionStatistics.ReduceOperator.SUBTRACT);
- }
- }
-
- public static class AddPartitionsTask {
- private final List<HivePartitionWithStatistics> partitions = new
ArrayList<>();
- private final List<List<String>> createdPartitionValues = new
ArrayList<>();
-
- public boolean isEmpty() {
- return partitions.isEmpty();
- }
-
- public List<HivePartitionWithStatistics> getPartitions() {
- return partitions;
- }
-
- public void addPartition(HivePartitionWithStatistics partition) {
- partitions.add(partition);
- }
-
- public void run(HiveMetadataOps hiveOps) {
- HivePartition firstPartition = partitions.get(0).getPartition();
- String dbName = firstPartition.getDbName();
- String tableName = firstPartition.getTblName();
- List<List<HivePartitionWithStatistics>> batchedPartitions =
- Lists.partition(partitions, PARTITION_COMMIT_BATCH_SIZE);
- for (List<HivePartitionWithStatistics> batch : batchedPartitions) {
- try {
- hiveOps.addPartitions(dbName, tableName, batch);
- for (HivePartitionWithStatistics partition : batch) {
-
createdPartitionValues.add(partition.getPartition().getPartitionValues());
- }
- } catch (Throwable t) {
- LOG.warn("Failed to add partition", t);
- throw t;
- }
- }
- partitions.clear();
- }
-
- public List<List<String>> rollback(HiveMetadataOps hiveOps) {
- HivePartition firstPartition = partitions.get(0).getPartition();
- String dbName = firstPartition.getDbName();
- String tableName = firstPartition.getTblName();
- List<List<String>> rollbackFailedPartitions = new ArrayList<>();
- for (List<String> createdPartitionValue : createdPartitionValues) {
- try {
- hiveOps.dropPartition(dbName, tableName,
createdPartitionValue, false);
- } catch (Throwable t) {
- LOG.warn("Failed to drop partition on {}.{}.{} when
rollback",
- dbName, tableName, rollbackFailedPartitions);
- rollbackFailedPartitions.add(createdPartitionValue);
- }
- }
- return rollbackFailedPartitions;
- }
- }
-
- private static class DirectoryCleanUpTask {
- private final Path path;
- private final boolean deleteEmptyDir;
-
- public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) {
- this.path = new Path(path);
- this.deleteEmptyDir = deleteEmptyDir;
- }
-
- public Path getPath() {
- return path;
- }
-
- public boolean isDeleteEmptyDir() {
- return deleteEmptyDir;
- }
-
- @Override
- public String toString() {
- return new StringJoiner(", ",
DirectoryCleanUpTask.class.getSimpleName() + "[", "]")
- .add("path=" + path)
- .add("deleteEmptyDir=" + deleteEmptyDir)
- .toString();
- }
- }
-
- public static class DeleteRecursivelyResult {
- private final boolean dirNoLongerExists;
- private final List<String> notDeletedEligibleItems;
-
- public DeleteRecursivelyResult(boolean dirNoLongerExists, List<String>
notDeletedEligibleItems) {
- this.dirNoLongerExists = dirNoLongerExists;
- this.notDeletedEligibleItems = notDeletedEligibleItems;
- }
-
- public boolean dirNotExists() {
- return dirNoLongerExists;
- }
-
- public List<String> getNotDeletedEligibleItems() {
- return notDeletedEligibleItems;
- }
- }
-
- private void runDirectoryClearUpTasksForAbort() {
- for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort)
{
- recursiveDeleteItems(cleanUpTask.getPath(),
cleanUpTask.isDeleteEmptyDir());
- }
- }
-
- private static class RenameDirectoryTask {
- private final String renameFrom;
- private final String renameTo;
-
- public RenameDirectoryTask(String renameFrom, String renameTo) {
- this.renameFrom = renameFrom;
- this.renameTo = renameTo;
- }
-
- public String getRenameFrom() {
- return renameFrom;
- }
-
- public String getRenameTo() {
- return renameTo;
- }
-
- @Override
- public String toString() {
- return new StringJoiner(", ",
RenameDirectoryTask.class.getSimpleName() + "[", "]")
- .add("renameFrom:" + renameFrom)
- .add("renameTo:" + renameTo)
- .toString();
- }
- }
-
- private void runRenameDirTasksForAbort() {
- Status status;
- for (RenameDirectoryTask task : renameDirectoryTasksForAbort) {
- status = fs.exists(task.getRenameFrom());
- if (status.ok()) {
- status = fs.renameDir(task.getRenameFrom(),
task.getRenameTo(), () -> {});
- if (!status.ok()) {
- LOG.warn("Failed to abort rename dir from {} to {}:{}",
- task.getRenameFrom(), task.getRenameTo(),
status.getErrMsg());
- }
- }
- }
- }
-
- private void runClearPathsForFinish() {
- Status status;
- for (String path : clearDirsForFinish) {
- status = fs.delete(path);
- if (!status.ok()) {
- LOG.warn("Failed to recursively delete path {}:{}", path,
status.getErrCode());
- }
- }
- }
-
-
- private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) {
- DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory,
deleteEmptyDir);
-
- if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
- LOG.warn("Failed to delete directory {}. Some eligible items can't
be deleted: {}.",
- directory.toString(),
deleteResult.getNotDeletedEligibleItems());
- } else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
- LOG.warn("Failed to delete directory {} due to dir isn't empty",
directory.toString());
- }
- }
-
- public DeleteRecursivelyResult recursiveDeleteFiles(Path directory,
boolean deleteEmptyDir) {
- try {
- if (!fs.exists(directory.getName()).ok()) {
- return new DeleteRecursivelyResult(true, ImmutableList.of());
- }
- } catch (Exception e) {
- ImmutableList.Builder<String> notDeletedEligibleItems =
ImmutableList.builder();
- notDeletedEligibleItems.add(directory.toString() + "/*");
- return new DeleteRecursivelyResult(false,
notDeletedEligibleItems.build());
- }
-
- return doRecursiveDeleteFiles(directory, deleteEmptyDir);
- }
-
- private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory,
boolean deleteEmptyDir) {
- List<RemoteFile> remoteFiles = new ArrayList<>();
-
- Status status = fs.list(directory.getName(), remoteFiles);
- if (!status.ok()) {
- ImmutableList.Builder<String> notDeletedEligibleItems =
ImmutableList.builder();
- notDeletedEligibleItems.add(directory + "/*");
- return new DeleteRecursivelyResult(false,
notDeletedEligibleItems.build());
- }
-
- boolean isEmptyDir = true;
- List<String> notDeletedEligibleItems = new ArrayList<>();
- for (RemoteFile file : remoteFiles) {
- if (file.isFile()) {
- Path filePath = file.getPath();
- isEmptyDir = false;
- // TODO Check if this file was created by this query
- if (!deleteIfExists(filePath)) {
- notDeletedEligibleItems.add(filePath.toString());
- }
- } else if (file.isDirectory()) {
- DeleteRecursivelyResult subResult =
doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir);
- if (!subResult.dirNotExists()) {
- isEmptyDir = false;
- }
- if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
-
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
- }
- } else {
- isEmptyDir = false;
- notDeletedEligibleItems.add(file.getPath().toString());
- }
- }
-
- if (isEmptyDir && deleteEmptyDir) {
- Verify.verify(notDeletedEligibleItems.isEmpty());
- if (!deleteIfExists(directory)) {
- return new DeleteRecursivelyResult(false,
ImmutableList.of(directory + "/"));
- }
- // all items of the location have been deleted.
- return new DeleteRecursivelyResult(true, ImmutableList.of());
- }
-
- return new DeleteRecursivelyResult(false, notDeletedEligibleItems);
- }
-
- public boolean deleteIfExists(Path path) {
- Status status = fs.delete(path.getName());
- if (status.ok()) {
- return true;
- }
- return !fs.exists(path.getName()).ok();
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 0f2a7bb2acb..4474e546500 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -34,6 +34,7 @@ import
org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.transaction.TransactionManagerFactory;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -145,7 +146,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB));
}
- metadataOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf,
jdbcClientConfig, this);
+ HiveMetadataOps hiveOps =
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
+ transactionManager =
TransactionManagerFactory.createHiveTransactionManager(hiveOps);
+ transactionManager.setEditLog(Env.getCurrentEnv().getEditLog());
+ metadataOps = hiveOps;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
new file mode 100644
index 00000000000..c3e8d00c5d1
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -0,0 +1,1322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java
+//
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
+// and modified by Doris
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.Pair;
+import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.thrift.THivePartitionUpdate;
+import org.apache.doris.thrift.TUpdateMode;
+import org.apache.doris.transaction.Transaction;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.airlift.concurrent.MoreFutures;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.StringJoiner;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+public class HMSTransaction implements Transaction {
+ private static final Logger LOG =
LogManager.getLogger(HMSTransaction.class);
+ private final HiveMetadataOps hiveOps;
+ private final RemoteFileSystem fs;
+ private String dbName;
+ private String tbName;
+
+ private final Map<DatabaseTableName, Action<TableAndMore>> tableActions =
new HashMap<>();
+ private final Map<DatabaseTableName, Map<List<String>,
Action<PartitionAndMore>>>
+ partitionActions = new HashMap<>();
+
+ private HmsCommitter hmsCommitter;
+ private List<THivePartitionUpdate> hivePartitionUpdates =
Lists.newArrayList();
+
+ public HMSTransaction(HiveMetadataOps hiveOps) {
+ this.hiveOps = hiveOps;
+ this.fs = hiveOps.getFs();
+ }
+
+ @Override
+ public void commit() {
+ doCommit();
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTbName() {
+ return tbName;
+ }
+
+ public List<THivePartitionUpdate>
mergePartitions(List<THivePartitionUpdate> hivePUs) {
+ Map<String, THivePartitionUpdate> mm = new HashMap<>();
+ for (THivePartitionUpdate pu : hivePUs) {
+ if (mm.containsKey(pu.getName())) {
+ THivePartitionUpdate old = mm.get(pu.getName());
+ old.setFileSize(old.getFileSize() + pu.getFileSize());
+ old.setRowCount(old.getRowCount() + pu.getRowCount());
+ old.getFileNames().addAll(pu.getFileNames());
+ } else {
+ mm.put(pu.getName(), pu);
+ }
+ }
+ return new ArrayList<>(mm.values());
+ }
+
+ @Override
+ public void rollback() {
+ if (hmsCommitter != null) {
+ hmsCommitter.rollback();
+ }
+ }
+
+ public void finishInsertTable(String dbName, String tbName) {
+ this.tbName = tbName;
+ this.dbName = dbName;
+ List<THivePartitionUpdate> mergedPUs =
mergePartitions(hivePartitionUpdates);
+ Table table = getTable(dbName, tbName);
+ List<Pair<THivePartitionUpdate, HivePartitionStatistics>>
insertExistsPartitions = new ArrayList<>();
+ for (THivePartitionUpdate pu : mergedPUs) {
+ TUpdateMode updateMode = pu.getUpdateMode();
+ HivePartitionStatistics hivePartitionStatistics =
HivePartitionStatistics.fromCommonStatistics(
+ pu.getRowCount(),
+ pu.getFileNamesSize(),
+ pu.getFileSize());
+ String writePath = pu.getLocation().getWritePath();
+ if (table.getPartitionKeysSize() == 0) {
+ Preconditions.checkArgument(mergedPUs.size() == 1,
+ "When updating a non-partitioned table, multiple
partitions should not be written");
+ switch (updateMode) {
+ case APPEND:
+ finishChangingExistingTable(
+ ActionType.INSERT_EXISTING,
+ dbName,
+ tbName,
+ writePath,
+ pu.getFileNames(),
+ hivePartitionStatistics);
+ break;
+ case OVERWRITE:
+ dropTable(dbName, tbName);
+ createTable(table, writePath, pu.getFileNames(),
hivePartitionStatistics);
+ break;
+ default:
+ throw new RuntimeException("Not support mode:[" +
updateMode + "] in unPartitioned table");
+ }
+ } else {
+ switch (updateMode) {
+ case APPEND:
+ // insert into existing partition
+ insertExistsPartitions.add(Pair.of(pu,
hivePartitionStatistics));
+ break;
+ case NEW:
+ case OVERWRITE:
+ StorageDescriptor sd = table.getSd();
+ HivePartition hivePartition = new HivePartition(
+ dbName,
+ tbName,
+ false,
+ sd.getInputFormat(),
+ pu.getLocation().getTargetPath(),
+ HiveUtil.toPartitionValues(pu.getName()),
+ Maps.newHashMap(),
+ sd.getOutputFormat(),
+ sd.getSerdeInfo().getSerializationLib(),
+ hiveOps.getClient().getSchema(dbName, tbName)
+ );
+ if (updateMode == TUpdateMode.OVERWRITE) {
+ dropPartition(dbName, tbName,
hivePartition.getPartitionValues(), true);
+ }
+ addPartition(
+ dbName, tbName, hivePartition, writePath,
+ pu.getName(), pu.getFileNames(),
hivePartitionStatistics);
+ break;
+ default:
+ throw new RuntimeException("Not support mode:[" +
updateMode + "] in partitioned table");
+ }
+ }
+ }
+
+ if (!insertExistsPartitions.isEmpty()) {
+ convertToInsertExistingPartitionAction(insertExistsPartitions);
+ }
+ }
+
+ public void doCommit() {
+ hmsCommitter = new HmsCommitter();
+
+ try {
+ for (Map.Entry<DatabaseTableName, Action<TableAndMore>> entry :
tableActions.entrySet()) {
+ Action<TableAndMore> action = entry.getValue();
+ switch (action.getType()) {
+ case INSERT_EXISTING:
+
hmsCommitter.prepareInsertExistingTable(action.getData());
+ break;
+ case ALTER:
+ hmsCommitter.prepareAlterTable(action.getData());
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported
table action type: " + action.getType());
+ }
+ }
+
+ for (Map.Entry<DatabaseTableName, Map<List<String>,
Action<PartitionAndMore>>> tableEntry
+ : partitionActions.entrySet()) {
+ for (Map.Entry<List<String>, Action<PartitionAndMore>>
partitionEntry :
+ tableEntry.getValue().entrySet()) {
+ Action<PartitionAndMore> action =
partitionEntry.getValue();
+ switch (action.getType()) {
+ case INSERT_EXISTING:
+
hmsCommitter.prepareInsertExistPartition(action.getData());
+ break;
+ case ADD:
+ hmsCommitter.prepareAddPartition(action.getData());
+ break;
+ case ALTER:
+
hmsCommitter.prepareAlterPartition(action.getData());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported partition action type: " +
action.getType());
+ }
+ }
+ }
+
+ hmsCommitter.waitForAsyncFileSystemTasks();
+ hmsCommitter.doAddPartitionsTask();
+ hmsCommitter.doUpdateStatisticsTasks();
+ } catch (Throwable t) {
+ LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName);
+ hmsCommitter.cancelUnStartedAsyncFileSystemTask();
+ hmsCommitter.undoUpdateStatisticsTasks();
+ hmsCommitter.undoAddPartitionsTask();
+ hmsCommitter.waitForAsyncFileSystemTaskSuppressThrowable();
+ hmsCommitter.runDirectoryClearUpTasksForAbort();
+ hmsCommitter.runRenameDirTasksForAbort();
+ throw t;
+ } finally {
+ hmsCommitter.runClearPathsForFinish();
+ }
+ }
+
+ public void updateHivePartitionUpdates(List<THivePartitionUpdate> pus) {
+ synchronized (this) {
+ hivePartitionUpdates.addAll(pus);
+ }
+ }
+
+ // for test
+ public void setHivePartitionUpdates(List<THivePartitionUpdate>
hivePartitionUpdates) {
+ this.hivePartitionUpdates = hivePartitionUpdates;
+ }
+
+ public long getUpdateCnt() {
+ return
hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
+ }
+
+ private void convertToInsertExistingPartitionAction(
+ List<Pair<THivePartitionUpdate, HivePartitionStatistics>>
partitions) {
+ DatabaseTableName databaseTableName = new DatabaseTableName(dbName,
tbName);
+ Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
+ partitionActions.computeIfAbsent(databaseTableName, k -> new
HashMap<>());
+
+ for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>>
partitionBatch :
+ Iterables.partition(partitions, 100)) {
+
+ List<String> partitionNames = partitionBatch
+ .stream()
+ .map(pair -> pair.first.getName())
+ .collect(Collectors.toList());
+
+ // check in partitionAction
+ Action<PartitionAndMore> oldPartitionAction =
partitionActionsForTable.get(partitionNames);
+ if (oldPartitionAction != null) {
+ switch (oldPartitionAction.getType()) {
+ case DROP:
+ case DROP_PRESERVE_DATA:
+ throw new RuntimeException(
+ "Not found partition from partition actions"
+ + "for " + databaseTableName + ",
partitions: " + partitionNames);
+ case ADD:
+ case ALTER:
+ case INSERT_EXISTING:
+ case MERGE:
+ throw new UnsupportedOperationException(
+ "Inserting into a partition that were added,
altered,"
+ + "or inserted into in the same
transaction is not supported");
+ default:
+ throw new IllegalStateException("Unknown action type:
" + oldPartitionAction.getType());
+ }
+ }
+
+ Map<String, Partition> partitionsByNamesMap =
HiveUtil.convertToNamePartitionMap(
+ partitionNames,
+ hiveOps.getClient().getPartitions(dbName, tbName,
partitionNames));
+
+ for (int i = 0; i < partitionsByNamesMap.size(); i++) {
+ String partitionName = partitionNames.get(i);
+ // check from hms
+ Partition partition = partitionsByNamesMap.get(partitionName);
+ if (partition == null) {
+ // Prevent this partition from being deleted by other
engines
+ throw new RuntimeException(
+ "Not found partition from hms for " +
databaseTableName
+ + ", partitions: " + partitionNames);
+ }
+ THivePartitionUpdate pu = partitionBatch.get(i).first;
+ HivePartitionStatistics updateStats =
partitionBatch.get(i).second;
+
+ StorageDescriptor sd = partition.getSd();
+ List<String> partitionValues =
HiveUtil.toPartitionValues(pu.getName());
+
+ HivePartition hivePartition = new HivePartition(
+ dbName,
+ tbName,
+ false,
+ sd.getInputFormat(),
+ partition.getSd().getLocation(),
+ partitionValues,
+ partition.getParameters(),
+ sd.getOutputFormat(),
+ sd.getSerdeInfo().getSerializationLib(),
+ hiveOps.getClient().getSchema(dbName, tbName)
+ );
+
+ partitionActionsForTable.put(
+ partitionValues,
+ new Action<>(
+ ActionType.INSERT_EXISTING,
+ new PartitionAndMore(
+ hivePartition,
+ pu.getLocation().getWritePath(),
+ pu.getName(),
+ pu.getFileNames(),
+ updateStats
+ ))
+ );
+ }
+ }
+ }
+
+ private static void addSuppressedExceptions(
+ List<Throwable> suppressedExceptions,
+ Throwable t,
+ List<String> descriptions,
+ String description) {
+ descriptions.add(description);
+ // A limit is needed to avoid having a huge exception object. 5 was
chosen arbitrarily.
+ if (suppressedExceptions.size() < 5) {
+ suppressedExceptions.add(t);
+ }
+ }
+
+ private static class UpdateStatisticsTask {
+ private final String dbName;
+ private final String tableName;
+ private final Optional<String> partitionName;
+ private final HivePartitionStatistics updatePartitionStat;
+ private final boolean merge;
+
+ private boolean done;
+
+ public UpdateStatisticsTask(String dbName, String tableName,
Optional<String> partitionName,
+ HivePartitionStatistics statistics,
boolean merge) {
+ this.dbName = Objects.requireNonNull(dbName, "dbName is null");
+ this.tableName = Objects.requireNonNull(tableName, "tableName is
null");
+ this.partitionName = Objects.requireNonNull(partitionName,
"partitionName is null");
+ this.updatePartitionStat = Objects.requireNonNull(statistics,
"statistics is null");
+ this.merge = merge;
+ }
+
+ public void run(HiveMetadataOps hiveOps) {
+ if (partitionName.isPresent()) {
+ hiveOps.updatePartitionStatistics(dbName, tableName,
partitionName.get(), this::updateStatistics);
+ } else {
+ hiveOps.updateTableStatistics(dbName, tableName,
this::updateStatistics);
+ }
+ done = true;
+ }
+
+ public void undo(HiveMetadataOps hmsOps) {
+ if (!done) {
+ return;
+ }
+ if (partitionName.isPresent()) {
+ hmsOps.updatePartitionStatistics(dbName, tableName,
partitionName.get(), this::resetStatistics);
+ } else {
+ hmsOps.updateTableStatistics(dbName, tableName,
this::resetStatistics);
+ }
+ }
+
+ public String getDescription() {
+ if (partitionName.isPresent()) {
+ return "alter partition parameters " + tableName + " " +
partitionName.get();
+ } else {
+ return "alter table parameters " + tableName;
+ }
+ }
+
+ private HivePartitionStatistics
updateStatistics(HivePartitionStatistics currentStats) {
+ return merge ? HivePartitionStatistics.merge(currentStats,
updatePartitionStat) : updatePartitionStat;
+ }
+
+ private HivePartitionStatistics
resetStatistics(HivePartitionStatistics currentStatistics) {
+ return HivePartitionStatistics
+ .reduce(currentStatistics, updatePartitionStat,
HivePartitionStatistics.ReduceOperator.SUBTRACT);
+ }
+ }
+
+ public static class AddPartitionsTask {
+ private final List<HivePartitionWithStatistics> partitions = new
ArrayList<>();
+ private final List<List<String>> createdPartitionValues = new
ArrayList<>();
+
+ public boolean isEmpty() {
+ return partitions.isEmpty();
+ }
+
+ public List<HivePartitionWithStatistics> getPartitions() {
+ return partitions;
+ }
+
+ public void addPartition(HivePartitionWithStatistics partition) {
+ partitions.add(partition);
+ }
+
+ public void run(HiveMetadataOps hiveOps) {
+ HivePartition firstPartition = partitions.get(0).getPartition();
+ String dbName = firstPartition.getDbName();
+ String tableName = firstPartition.getTblName();
+ List<List<HivePartitionWithStatistics>> batchedPartitions =
Lists.partition(partitions, 20);
+ for (List<HivePartitionWithStatistics> batch : batchedPartitions) {
+ try {
+ hiveOps.addPartitions(dbName, tableName, batch);
+ for (HivePartitionWithStatistics partition : batch) {
+
createdPartitionValues.add(partition.getPartition().getPartitionValues());
+ }
+ } catch (Throwable t) {
+ LOG.warn("Failed to add partition", t);
+ throw t;
+ }
+ }
+ partitions.clear();
+ }
+
+ public List<List<String>> rollback(HiveMetadataOps hiveOps) {
+ HivePartition firstPartition = partitions.get(0).getPartition();
+ String dbName = firstPartition.getDbName();
+ String tableName = firstPartition.getTblName();
+ List<List<String>> rollbackFailedPartitions = new ArrayList<>();
+ for (List<String> createdPartitionValue : createdPartitionValues) {
+ try {
+ hiveOps.dropPartition(dbName, tableName,
createdPartitionValue, false);
+ } catch (Throwable t) {
+ LOG.warn("Failed to drop partition on {}.{}.{} when
rollback",
+ dbName, tableName, rollbackFailedPartitions);
+ rollbackFailedPartitions.add(createdPartitionValue);
+ }
+ }
+ return rollbackFailedPartitions;
+ }
+ }
+
+ private static class DirectoryCleanUpTask {
+ private final Path path;
+ private final boolean deleteEmptyDir;
+
+ public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) {
+ this.path = new Path(path);
+ this.deleteEmptyDir = deleteEmptyDir;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public boolean isDeleteEmptyDir() {
+ return deleteEmptyDir;
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ",
DirectoryCleanUpTask.class.getSimpleName() + "[", "]")
+ .add("path=" + path)
+ .add("deleteEmptyDir=" + deleteEmptyDir)
+ .toString();
+ }
+ }
+
+ private static class DeleteRecursivelyResult {
+ private final boolean dirNoLongerExists;
+ private final List<String> notDeletedEligibleItems;
+
+ public DeleteRecursivelyResult(boolean dirNoLongerExists, List<String>
notDeletedEligibleItems) {
+ this.dirNoLongerExists = dirNoLongerExists;
+ this.notDeletedEligibleItems = notDeletedEligibleItems;
+ }
+
+ public boolean dirNotExists() {
+ return dirNoLongerExists;
+ }
+
+ public List<String> getNotDeletedEligibleItems() {
+ return notDeletedEligibleItems;
+ }
+ }
+
+ private static class RenameDirectoryTask {
+ private final String renameFrom;
+ private final String renameTo;
+
+ public RenameDirectoryTask(String renameFrom, String renameTo) {
+ this.renameFrom = renameFrom;
+ this.renameTo = renameTo;
+ }
+
+ public String getRenameFrom() {
+ return renameFrom;
+ }
+
+ public String getRenameTo() {
+ return renameTo;
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ",
RenameDirectoryTask.class.getSimpleName() + "[", "]")
+ .add("renameFrom:" + renameFrom)
+ .add("renameTo:" + renameTo)
+ .toString();
+ }
+ }
+
+
+
+ private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) {
+ DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory,
deleteEmptyDir);
+
+ if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
+ LOG.warn("Failed to delete directory {}. Some eligible items can't
be deleted: {}.",
+ directory.toString(),
deleteResult.getNotDeletedEligibleItems());
+ } else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
+ LOG.warn("Failed to delete directory {} due to dir isn't empty",
directory.toString());
+ }
+ }
+
+ private DeleteRecursivelyResult recursiveDeleteFiles(Path directory,
boolean deleteEmptyDir) {
+ try {
+ if (!fs.exists(directory.getName()).ok()) {
+ return new DeleteRecursivelyResult(true, ImmutableList.of());
+ }
+ } catch (Exception e) {
+ ImmutableList.Builder<String> notDeletedEligibleItems =
ImmutableList.builder();
+ notDeletedEligibleItems.add(directory.toString() + "/*");
+ return new DeleteRecursivelyResult(false,
notDeletedEligibleItems.build());
+ }
+
+ return doRecursiveDeleteFiles(directory, deleteEmptyDir);
+ }
+
+ private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory,
boolean deleteEmptyDir) {
+ List<RemoteFile> remoteFiles = new ArrayList<>();
+
+ Status status = fs.list(directory.getName(), remoteFiles);
+ if (!status.ok()) {
+ ImmutableList.Builder<String> notDeletedEligibleItems =
ImmutableList.builder();
+ notDeletedEligibleItems.add(directory + "/*");
+ return new DeleteRecursivelyResult(false,
notDeletedEligibleItems.build());
+ }
+
+ boolean isEmptyDir = true;
+ List<String> notDeletedEligibleItems = new ArrayList<>();
+ for (RemoteFile file : remoteFiles) {
+ if (file.isFile()) {
+ Path filePath = file.getPath();
+ isEmptyDir = false;
+ // TODO Check if this file was created by this query
+ if (!deleteIfExists(filePath)) {
+ notDeletedEligibleItems.add(filePath.toString());
+ }
+ } else if (file.isDirectory()) {
+ DeleteRecursivelyResult subResult =
doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir);
+ if (!subResult.dirNotExists()) {
+ isEmptyDir = false;
+ }
+ if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
+
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
+ }
+ } else {
+ isEmptyDir = false;
+ notDeletedEligibleItems.add(file.getPath().toString());
+ }
+ }
+
+ if (isEmptyDir && deleteEmptyDir) {
+ Verify.verify(notDeletedEligibleItems.isEmpty());
+ if (!deleteIfExists(directory)) {
+ return new DeleteRecursivelyResult(false,
ImmutableList.of(directory + "/"));
+ }
+ // all items of the location have been deleted.
+ return new DeleteRecursivelyResult(true, ImmutableList.of());
+ }
+
+ return new DeleteRecursivelyResult(false, notDeletedEligibleItems);
+ }
+
+ public boolean deleteIfExists(Path path) {
+ Status status = fs.delete(path.getName());
+ if (status.ok()) {
+ return true;
+ }
+ return !fs.exists(path.getName()).ok();
+ }
+
+ public static class DatabaseTableName {
+ private final String dbName;
+ private final String tbName;
+
+ public DatabaseTableName(String dbName, String tbName) {
+ this.dbName = dbName;
+ this.tbName = tbName;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ DatabaseTableName that = (DatabaseTableName) other;
+ return Objects.equals(dbName, that.dbName) &&
Objects.equals(tbName, that.tbName);
+ }
+
+ @Override
+ public String toString() {
+ return dbName + "." + tbName;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbName, tbName);
+ }
+
+ public String getTbName() {
+ return tbName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+ }
+
+ private static class TableAndMore {
+ private final Table table;
+ private final String currentLocation;
+ private final List<String> fileNames;
+ private final HivePartitionStatistics statisticsUpdate;
+
+ public TableAndMore(
+ Table table,
+ String currentLocation,
+ List<String> fileNames,
+ HivePartitionStatistics statisticsUpdate) {
+ this.table = Objects.requireNonNull(table, "table is null");
+ this.currentLocation = Objects.requireNonNull(currentLocation);
+ this.fileNames = Objects.requireNonNull(fileNames);
+ this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate,
"statisticsUpdate is null");
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ public String getCurrentLocation() {
+ return currentLocation;
+ }
+
+ public List<String> getFileNames() {
+ return fileNames;
+ }
+
+ public HivePartitionStatistics getStatisticsUpdate() {
+ return statisticsUpdate;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("table", table)
+ .add("statisticsUpdate", statisticsUpdate)
+ .toString();
+ }
+ }
+
+ private static class PartitionAndMore {
+ private final HivePartition partition;
+ private final String currentLocation;
+ private final String partitionName;
+ private final List<String> fileNames;
+ private final HivePartitionStatistics statisticsUpdate;
+
+ public PartitionAndMore(
+ HivePartition partition,
+ String currentLocation,
+ String partitionName,
+ List<String> fileNames,
+ HivePartitionStatistics statisticsUpdate) {
+ this.partition = Objects.requireNonNull(partition, "partition is
null");
+ this.currentLocation = Objects.requireNonNull(currentLocation,
"currentLocation is null");
+ this.partitionName = Objects.requireNonNull(partitionName,
"partition is null");
+ this.fileNames = Objects.requireNonNull(fileNames, "fileNames is
null");
+ this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate,
"statisticsUpdate is null");
+ }
+
+ public HivePartition getPartition() {
+ return partition;
+ }
+
+ public String getCurrentLocation() {
+ return currentLocation;
+ }
+
+ public String getPartitionName() {
+ return partitionName;
+ }
+
+ public List<String> getFileNames() {
+ return fileNames;
+ }
+
+ public HivePartitionStatistics getStatisticsUpdate() {
+ return statisticsUpdate;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("partition", partition)
+ .add("currentLocation", currentLocation)
+ .add("fileNames", fileNames)
+ .toString();
+ }
+ }
+
+ private enum ActionType {
+ // drop a table/partition
+ DROP,
+ // drop a table/partition but will preserve data
+ DROP_PRESERVE_DATA,
+ // add a table/partition
+ ADD,
+ // drop then add a table/partition, like overwrite
+ ALTER,
+ // insert into an existing table/partition
+ INSERT_EXISTING,
+ // merger into an existing table/partition
+ MERGE,
+ }
+
+ public static class Action<T> {
+ private final ActionType type;
+ private final T data;
+
+ public Action(ActionType type, T data) {
+ this.type = Objects.requireNonNull(type, "type is null");
+ if (type == ActionType.DROP || type ==
ActionType.DROP_PRESERVE_DATA) {
+ Preconditions.checkArgument(data == null, "data is not null");
+ } else {
+ Objects.requireNonNull(data, "data is null");
+ }
+ this.data = data;
+ }
+
+ public ActionType getType() {
+ return type;
+ }
+
+ public T getData() {
+ Preconditions.checkState(type != ActionType.DROP);
+ return data;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("type", type)
+ .add("data", data)
+ .toString();
+ }
+ }
+
+ public synchronized Table getTable(String databaseName, String tableName) {
+ Action<TableAndMore> tableAction = tableActions.get(new
DatabaseTableName(databaseName, tableName));
+ if (tableAction == null) {
+ return hiveOps.getClient().getTable(databaseName, tableName);
+ }
+ switch (tableAction.getType()) {
+ case ADD:
+ case ALTER:
+ case INSERT_EXISTING:
+ case MERGE:
+ return tableAction.getData().getTable();
+ case DROP:
+ case DROP_PRESERVE_DATA:
+ break;
+ default:
+ throw new IllegalStateException("Unknown action type: " +
tableAction.getType());
+ }
+ throw new RuntimeException("Not Found table: " + databaseName + "." +
tableName);
+ }
+
+ public synchronized void finishChangingExistingTable(
+ ActionType actionType,
+ String databaseName,
+ String tableName,
+ String location,
+ List<String> fileNames,
+ HivePartitionStatistics statisticsUpdate) {
+ DatabaseTableName databaseTableName = new
DatabaseTableName(databaseName, tableName);
+ Action<TableAndMore> oldTableAction =
tableActions.get(databaseTableName);
+ if (oldTableAction == null) {
+ Table table =
hiveOps.getClient().getTable(databaseTableName.getDbName(),
databaseTableName.getTbName());
+ tableActions.put(
+ databaseTableName,
+ new Action<>(
+ actionType,
+ new TableAndMore(
+ table,
+ location,
+ fileNames,
+ statisticsUpdate)));
+ return;
+ }
+
+ switch (oldTableAction.getType()) {
+ case DROP:
+ throw new RuntimeException("Not found table: " +
databaseTableName);
+ case ADD:
+ case ALTER:
+ case INSERT_EXISTING:
+ case MERGE:
+ throw new UnsupportedOperationException(
+ "Inserting into an unpartitioned table that were
added, altered,"
+ + "or inserted into in the same transaction is
not supported");
+ case DROP_PRESERVE_DATA:
+ break;
+ default:
+ throw new IllegalStateException("Unknown action type: " +
oldTableAction.getType());
+ }
+ }
+
+ public synchronized void createTable(
+ Table table, String location, List<String> fileNames,
HivePartitionStatistics statistics) {
+ // When creating a table, it should never have partition actions. This
is just a sanity check.
+ checkNoPartitionAction(dbName, tbName);
+ DatabaseTableName databaseTableName = new DatabaseTableName(dbName,
tbName);
+ Action<TableAndMore> oldTableAction =
tableActions.get(databaseTableName);
+ TableAndMore tableAndMore = new TableAndMore(table, location,
fileNames, statistics);
+ if (oldTableAction == null) {
+ tableActions.put(databaseTableName, new Action<>(ActionType.ADD,
tableAndMore));
+ return;
+ }
+ switch (oldTableAction.getType()) {
+ case DROP:
+ tableActions.put(databaseTableName, new
Action<>(ActionType.ALTER, tableAndMore));
+ return;
+
+ case ADD:
+ case ALTER:
+ case INSERT_EXISTING:
+ case MERGE:
+ throw new RuntimeException("Table already exists: " +
databaseTableName);
+ case DROP_PRESERVE_DATA:
+ break;
+ default:
+ throw new IllegalStateException("Unknown action type: " +
oldTableAction.getType());
+ }
+ }
+
+
+ public synchronized void dropTable(String databaseName, String tableName) {
+ // Dropping table with partition actions requires cleaning up staging
data, which is not implemented yet.
+ checkNoPartitionAction(databaseName, tableName);
+ DatabaseTableName databaseTableName = new
DatabaseTableName(databaseName, tableName);
+ Action<TableAndMore> oldTableAction =
tableActions.get(databaseTableName);
+ if (oldTableAction == null || oldTableAction.getType() ==
ActionType.ALTER) {
+ tableActions.put(databaseTableName, new Action<>(ActionType.DROP,
null));
+ return;
+ }
+ switch (oldTableAction.getType()) {
+ case DROP:
+ throw new RuntimeException("Not found table: " +
databaseTableName);
+ case ADD:
+ case ALTER:
+ case INSERT_EXISTING:
+ case MERGE:
+ throw new RuntimeException("Dropping a table added/modified in
the same transaction is not supported");
+ case DROP_PRESERVE_DATA:
+ break;
+ default:
+ throw new IllegalStateException("Unknown action type: " +
oldTableAction.getType());
+ }
+ }
+
+
+ private void checkNoPartitionAction(String databaseName, String tableName)
{
+ Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
+ partitionActions.get(new DatabaseTableName(databaseName,
tableName));
+ if (partitionActionsForTable != null &&
!partitionActionsForTable.isEmpty()) {
+ throw new RuntimeException(
+ "Cannot make schema changes to a table with modified
partitions in the same transaction");
+ }
+ }
+
+ public synchronized void addPartition(
+ String databaseName,
+ String tableName,
+ HivePartition partition,
+ String currentLocation,
+ String partitionName,
+ List<String> files,
+ HivePartitionStatistics statistics) {
+ Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
+ partitionActions.computeIfAbsent(new
DatabaseTableName(databaseName, tableName), k -> new HashMap<>());
+ Action<PartitionAndMore> oldPartitionAction =
partitionActionsForTable.get(partition.getPartitionValues());
+ if (oldPartitionAction == null) {
+ partitionActionsForTable.put(
+ partition.getPartitionValues(),
+ new Action<>(
+ ActionType.ADD,
+ new PartitionAndMore(partition, currentLocation,
partitionName, files, statistics))
+ );
+ return;
+ }
+ switch (oldPartitionAction.getType()) {
+ case DROP:
+ case DROP_PRESERVE_DATA:
+ partitionActionsForTable.put(
+ partition.getPartitionValues(),
+ new Action<>(
+ ActionType.ALTER,
+ new PartitionAndMore(partition,
currentLocation, partitionName, files, statistics))
+ );
+ return;
+ case ADD:
+ case ALTER:
+ case INSERT_EXISTING:
+ case MERGE:
+ throw new RuntimeException(
+ "Partition already exists for table: "
+ + databaseName + "." + tableName + ", partition
values: " + partition.getPartitionValues());
+ default:
+ throw new IllegalStateException("Unknown action type: " +
oldPartitionAction.getType());
+ }
+ }
+
+ public synchronized void dropPartition(
+ String databaseName,
+ String tableName,
+ List<String> partitionValues,
+ boolean deleteData) {
+ DatabaseTableName databaseTableName = new
DatabaseTableName(databaseName, tableName);
+ Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
+ partitionActions.computeIfAbsent(databaseTableName, k -> new
HashMap<>());
+ Action<PartitionAndMore> oldPartitionAction =
partitionActionsForTable.get(partitionValues);
+ if (oldPartitionAction == null) {
+ if (deleteData) {
+ partitionActionsForTable.put(partitionValues, new
Action<>(ActionType.DROP, null));
+ } else {
+ partitionActionsForTable.put(partitionValues, new
Action<>(ActionType.DROP_PRESERVE_DATA, null));
+ }
+ return;
+ }
+ switch (oldPartitionAction.getType()) {
+ case DROP:
+ case DROP_PRESERVE_DATA:
+ throw new RuntimeException(
+ "Not found partition from partition actions for " +
databaseTableName
+ + ", partitions: " + partitionValues);
+ case ADD:
+ case ALTER:
+ case INSERT_EXISTING:
+ case MERGE:
+ throw new RuntimeException(
+ "Dropping a partition added in the same transaction is
not supported: "
+ + databaseTableName + ", partition values: " +
partitionValues);
+ default:
+ throw new IllegalStateException("Unknown action type: " +
oldPartitionAction.getType());
+ }
+ }
+
+ class HmsCommitter {
+
+ // update statistics for unPartitioned table or existed partition
+ private final List<UpdateStatisticsTask> updateStatisticsTasks = new
ArrayList<>();
+ Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16);
+
+ // add new partition
+ private final AddPartitionsTask addPartitionsTask = new
AddPartitionsTask();
+
+ // for file system rename operation
+ // whether to cancel the file system tasks
+ private final AtomicBoolean fileSystemTaskCancelled = new
AtomicBoolean(false);
+ // file system tasks that are executed asynchronously, including
rename_file, rename_dir
+ private final List<CompletableFuture<?>> asyncFileSystemTaskFutures =
new ArrayList<>();
+ // when aborted, we need to delete all files under this path, even the
current directory
+ private final Queue<DirectoryCleanUpTask>
directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>();
+ // when aborted, we need restore directory
+ private final List<RenameDirectoryTask> renameDirectoryTasksForAbort =
new ArrayList<>();
+ // when finished, we need clear some directories
+ private final List<String> clearDirsForFinish = new ArrayList<>();
+ Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
+
+ public void cancelUnStartedAsyncFileSystemTask() {
+ fileSystemTaskCancelled.set(true);
+ }
+
+ private void undoUpdateStatisticsTasks() {
+ ImmutableList.Builder<CompletableFuture<?>> undoUpdateFutures =
ImmutableList.builder();
+ for (UpdateStatisticsTask task : updateStatisticsTasks) {
+ undoUpdateFutures.add(CompletableFuture.runAsync(() -> {
+ try {
+ task.undo(hiveOps);
+ } catch (Throwable throwable) {
+ LOG.warn("Failed to rollback: {}",
task.getDescription(), throwable);
+ }
+ }, updateStatisticsExecutor));
+ }
+
+ for (CompletableFuture<?> undoUpdateFuture :
undoUpdateFutures.build()) {
+ MoreFutures.getFutureValue(undoUpdateFuture);
+ }
+ }
+
+ private void undoAddPartitionsTask() {
+ if (addPartitionsTask.isEmpty()) {
+ return;
+ }
+
+ HivePartition firstPartition =
addPartitionsTask.getPartitions().get(0).getPartition();
+ String dbName = firstPartition.getDbName();
+ String tableName = firstPartition.getTblName();
+ List<List<String>> rollbackFailedPartitions =
addPartitionsTask.rollback(hiveOps);
+ if (!rollbackFailedPartitions.isEmpty()) {
+ LOG.warn("Failed to rollback: add_partition for partition
values {}.{}.{}",
+ dbName, tableName, rollbackFailedPartitions);
+ }
+ }
+
+ private void waitForAsyncFileSystemTaskSuppressThrowable() {
+ for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+
+ public void prepareInsertExistingTable(TableAndMore tableAndMore) {
+ Table table = tableAndMore.getTable();
+ String targetPath = table.getSd().getLocation();
+ String writePath = tableAndMore.getCurrentLocation();
+ if (!targetPath.equals(writePath)) {
+ fs.asyncRename(
+ fileSystemExecutor,
+ asyncFileSystemTaskFutures,
+ fileSystemTaskCancelled,
+ writePath,
+ targetPath,
+ tableAndMore.getFileNames());
+ }
+ directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, false));
+ updateStatisticsTasks.add(
+ new UpdateStatisticsTask(
+ dbName,
+ tbName,
+ Optional.empty(),
+ tableAndMore.getStatisticsUpdate(),
+ true
+ ));
+ }
+
+ public void prepareAlterTable(TableAndMore tableAndMore) {
+ Table table = tableAndMore.getTable();
+ String targetPath = table.getSd().getLocation();
+ String writePath = tableAndMore.getCurrentLocation();
+ if (!targetPath.equals(writePath)) {
+ Path path = new Path(targetPath);
+ String oldTablePath = new Path(path.getParent(), "_temp_" +
path.getName()).toString();
+ Status status = fs.renameDir(
+ targetPath,
+ oldTablePath,
+ () -> renameDirectoryTasksForAbort.add(new
RenameDirectoryTask(oldTablePath, targetPath)));
+ if (!status.ok()) {
+ throw new RuntimeException(
+ "Error to rename dir from " + targetPath + " to " +
oldTablePath + status.getErrMsg());
+ }
+ clearDirsForFinish.add(oldTablePath);
+
+ status = fs.renameDir(
+ writePath,
+ targetPath,
+ () -> directoryCleanUpTasksForAbort.add(
+ new DirectoryCleanUpTask(targetPath, true)));
+ if (!status.ok()) {
+ throw new RuntimeException(
+ "Error to rename dir from " + writePath + " to " +
targetPath + ":" + status.getErrMsg());
+ }
+ }
+ updateStatisticsTasks.add(
+ new UpdateStatisticsTask(
+ dbName,
+ tbName,
+ Optional.empty(),
+ tableAndMore.getStatisticsUpdate(),
+ false
+ ));
+ }
+
+ public void prepareAddPartition(PartitionAndMore partitionAndMore) {
+
+ HivePartition partition = partitionAndMore.getPartition();
+ String targetPath = partition.getPath();
+ String writePath = partitionAndMore.getCurrentLocation();
+
+ if (!targetPath.equals(writePath)) {
+ fs.asyncRenameDir(
+ fileSystemExecutor,
+ asyncFileSystemTaskFutures,
+ fileSystemTaskCancelled,
+ writePath,
+ targetPath,
+ () -> directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, true)));
+ }
+
+ StorageDescriptor sd = getTable(dbName, tbName).getSd();
+
+ HivePartition hivePartition = new HivePartition(
+ dbName,
+ tbName,
+ false,
+ sd.getInputFormat(),
+ targetPath,
+ partition.getPartitionValues(),
+ Maps.newHashMap(),
+ sd.getOutputFormat(),
+ sd.getSerdeInfo().getSerializationLib(),
+ hiveOps.getClient().getSchema(dbName, tbName)
+ );
+
+ HivePartitionWithStatistics partitionWithStats =
+ new HivePartitionWithStatistics(
+ partitionAndMore.getPartitionName(),
+ hivePartition,
+ partitionAndMore.getStatisticsUpdate());
+ addPartitionsTask.addPartition(partitionWithStats);
+ }
+
+ public void prepareInsertExistPartition(PartitionAndMore
partitionAndMore) {
+
+ HivePartition partition = partitionAndMore.getPartition();
+ String targetPath = partition.getPath();
+ String writePath = partitionAndMore.getCurrentLocation();
+ directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, false));
+
+ if (!targetPath.equals(writePath)) {
+ fs.asyncRename(
+ fileSystemExecutor,
+ asyncFileSystemTaskFutures,
+ fileSystemTaskCancelled,
+ writePath,
+ targetPath,
+ partitionAndMore.getFileNames());
+ }
+
+ updateStatisticsTasks.add(
+ new UpdateStatisticsTask(
+ dbName,
+ tbName,
+ Optional.of(partitionAndMore.getPartitionName()),
+ partitionAndMore.getStatisticsUpdate(),
+ true));
+ }
+
+ private void runDirectoryClearUpTasksForAbort() {
+ for (DirectoryCleanUpTask cleanUpTask :
directoryCleanUpTasksForAbort) {
+ recursiveDeleteItems(cleanUpTask.getPath(),
cleanUpTask.isDeleteEmptyDir());
+ }
+ }
+
+ private void runRenameDirTasksForAbort() {
+ Status status;
+ for (RenameDirectoryTask task : renameDirectoryTasksForAbort) {
+ status = fs.exists(task.getRenameFrom());
+ if (status.ok()) {
+ status = fs.renameDir(task.getRenameFrom(),
task.getRenameTo(), () -> {});
+ if (!status.ok()) {
+ LOG.warn("Failed to abort rename dir from {} to {}:{}",
+ task.getRenameFrom(), task.getRenameTo(),
status.getErrMsg());
+ }
+ }
+ }
+ }
+
+ private void runClearPathsForFinish() {
+ Status status;
+ for (String path : clearDirsForFinish) {
+ status = fs.delete(path);
+ if (!status.ok()) {
+ LOG.warn("Failed to recursively delete path {}:{}", path,
status.getErrCode());
+ }
+ }
+ }
+
+ public void prepareAlterPartition(PartitionAndMore partitionAndMore) {
+ HivePartition partition = partitionAndMore.getPartition();
+ String targetPath = partition.getPath();
+ String writePath = partitionAndMore.getCurrentLocation();
+
+ if (!targetPath.equals(writePath)) {
+ Path path = new Path(targetPath);
+ String oldPartitionPath = new Path(path.getParent(), "_temp_"
+ path.getName()).toString();
+ Status status = fs.renameDir(
+ targetPath,
+ oldPartitionPath,
+ () -> renameDirectoryTasksForAbort.add(new
RenameDirectoryTask(oldPartitionPath, targetPath)));
+ if (!status.ok()) {
+ throw new RuntimeException(
+ "Error to rename dir "
+ + "from " + targetPath
+ + " to " + oldPartitionPath + ":" +
status.getErrMsg());
+ }
+ clearDirsForFinish.add(oldPartitionPath);
+
+ status = fs.renameDir(
+ writePath,
+ targetPath,
+ () -> directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, true)));
+ if (!status.ok()) {
+ throw new RuntimeException(
+ "Error to rename dir from " + writePath + " to " +
targetPath + ":" + status.getErrMsg());
+ }
+ }
+
+ updateStatisticsTasks.add(
+ new UpdateStatisticsTask(
+ dbName,
+ tbName,
+ Optional.of(partitionAndMore.getPartitionName()),
+ partitionAndMore.getStatisticsUpdate(),
+ false
+ ));
+ }
+
+
+ private void waitForAsyncFileSystemTasks() {
+ for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
+ MoreFutures.getFutureValue(future, RuntimeException.class);
+ }
+ }
+
+ private void doAddPartitionsTask() {
+ if (!addPartitionsTask.isEmpty()) {
+ addPartitionsTask.run(hiveOps);
+ }
+ }
+
+ private void doUpdateStatisticsTasks() {
+ ImmutableList.Builder<CompletableFuture<?>> updateStatsFutures =
ImmutableList.builder();
+ List<String> failedTaskDescriptions = new ArrayList<>();
+ List<Throwable> suppressedExceptions = new ArrayList<>();
+ for (UpdateStatisticsTask task : updateStatisticsTasks) {
+ updateStatsFutures.add(CompletableFuture.runAsync(() -> {
+ try {
+ task.run(hiveOps);
+ } catch (Throwable t) {
+ synchronized (suppressedExceptions) {
+ addSuppressedExceptions(
+ suppressedExceptions, t,
failedTaskDescriptions, task.getDescription());
+ }
+ }
+ }, updateStatisticsExecutor));
+ }
+
+ for (CompletableFuture<?> executeUpdateFuture :
updateStatsFutures.build()) {
+ MoreFutures.getFutureValue(executeUpdateFuture);
+ }
+ if (!suppressedExceptions.isEmpty()) {
+ StringBuilder message = new StringBuilder();
+ message.append("Failed to execute some updating statistics
tasks: ");
+ Joiner.on("; ").appendTo(message, failedTaskDescriptions);
+ RuntimeException exception = new
RuntimeException(message.toString());
+ suppressedExceptions.forEach(exception::addSuppressed);
+ throw exception;
+ }
+ }
+
+ public void doCommit() {
+ waitForAsyncFileSystemTasks();
+ doAddPartitionsTask();
+ doUpdateStatisticsTasks();
+ }
+
+ public void rollback() {
+ cancelUnStartedAsyncFileSystemTask();
+ undoUpdateStatisticsTasks();
+ undoAddPartitionsTask();
+ waitForAsyncFileSystemTaskSuppressThrowable();
+ runDirectoryClearUpTasksForAbort();
+ runRenameDirTasksForAbort();
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index beeff694ae4..f3556d13a57 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -36,13 +36,11 @@ import
org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-import org.apache.doris.thrift.THivePartitionUpdate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -73,6 +71,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client)
{
this.catalog = catalog;
this.client = client;
+ // TODO Currently only supports DFSFileSystem, more types will be
supported in the future
this.fs = new DFSFileSystem(catalog.getProperties());
}
@@ -80,6 +79,10 @@ public class HiveMetadataOps implements ExternalMetadataOps {
return client;
}
+ public RemoteFileSystem getFs() {
+ return fs;
+ }
+
public static HMSCachedClient createCachedClient(HiveConf hiveConf, int
thriftClientPoolSize,
JdbcClientConfig
jdbcClientConfig) {
if (hiveConf != null) {
@@ -253,23 +256,6 @@ public class HiveMetadataOps implements
ExternalMetadataOps {
return client.getAllDatabases();
}
- public void commit(String dbName,
- String tableName,
- List<THivePartitionUpdate> hivePUs) {
- Table table = client.getTable(dbName, tableName);
- HMSCommitter hmsCommitter = new HMSCommitter(this, fs, table);
- hmsCommitter.commit(hivePUs);
- try {
- Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(
- dbName,
- tableName,
- catalog.getName(),
- true);
- } catch (DdlException e) {
- LOG.warn("Failed to refresh table {}.{} : {}", dbName, tableName,
e.getMessage());
- }
- }
-
public void updateTableStatistics(
String dbName,
String tableName,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
index 49b14504750..df13e6737b5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java
@@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Map;
public class HivePartitionStatistics {
- private static final HivePartitionStatistics EMPTY =
+ public static final HivePartitionStatistics EMPTY =
new HivePartitionStatistics(HiveCommonStatistics.EMPTY,
ImmutableMap.of());
private final HiveCommonStatistics commonStatistics;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
index b7c28b68ff0..e72374aa5f2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
@@ -18,9 +18,9 @@
package org.apache.doris.datasource.hive;
public class HivePartitionWithStatistics {
- private String name;
- private HivePartition partition;
- private HivePartitionStatistics statistics;
+ private final String name;
+ private final HivePartition partition;
+ private final HivePartitionStatistics statistics;
public HivePartitionWithStatistics(String name, HivePartition partition,
HivePartitionStatistics statistics) {
this.name = name;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index 76976165526..66dfe763e46 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -17,13 +17,12 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
-import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.operations.ExternalMetadataOps;
+import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
@@ -35,14 +34,13 @@ import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.thrift.THivePartitionUpdate;
+import org.apache.doris.transaction.TransactionManager;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.List;
import java.util.Optional;
/**
@@ -53,6 +51,8 @@ public class HiveInsertExecutor extends
AbstractInsertExecutor {
private static final long INVALID_TXN_ID = -1L;
private long txnId = INVALID_TXN_ID;
private TransactionStatus txnStatus = TransactionStatus.ABORTED;
+ private final TransactionManager transactionManager;
+ private final String catalogName;
/**
* constructor
@@ -61,6 +61,8 @@ public class HiveInsertExecutor extends
AbstractInsertExecutor {
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
+ catalogName = table.getCatalog().getName();
+ transactionManager = table.getCatalog().getTransactionManager();
}
public long getTxnId() {
@@ -69,7 +71,9 @@ public class HiveInsertExecutor extends
AbstractInsertExecutor {
@Override
public void beginTransaction() {
- // TODO: use hive txn rather than internal txn
+ txnId = transactionManager.begin();
+ HMSTransaction transaction = (HMSTransaction)
transactionManager.getTransaction(txnId);
+
coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates);
}
@Override
@@ -93,13 +97,18 @@ public class HiveInsertExecutor extends
AbstractInsertExecutor {
if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
} else {
- // TODO use transaction
- List<THivePartitionUpdate> ups =
coordinator.getHivePartitionUpdates();
- loadedRows =
ups.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
- ExternalCatalog catalog = ((HMSExternalTable) table).getCatalog();
- ExternalMetadataOps metadataOps = catalog.getMetadataOps();
- ((HiveMetadataOps) metadataOps).commit(((HMSExternalTable)
table).getDbName(), table.getName(), ups);
+ HMSTransaction transaction = (HMSTransaction)
transactionManager.getTransaction(txnId);
+ loadedRows = transaction.getUpdateCnt();
+ String dbName = ((HMSExternalTable) table).getDbName();
+ String tbName = table.getName();
+ transaction.finishInsertTable(dbName, tbName);
+ transactionManager.commit(txnId);
txnStatus = TransactionStatus.COMMITTED;
+ Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(
+ dbName,
+ tbName,
+ catalogName,
+ true);
}
}
@@ -117,6 +126,7 @@ public class HiveInsertExecutor extends
AbstractInsertExecutor {
}
}
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
+ transactionManager.rollback(txnId);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 6d7e4785b66..67b84796d3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -152,6 +152,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
public class Coordinator implements CoordInterface {
@@ -235,8 +236,8 @@ public class Coordinator implements CoordInterface {
private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
private final List<TErrorTabletInfo> errorTabletInfos =
Lists.newArrayList();
- // TODO moved to ExternalTransactionManager
- private final List<THivePartitionUpdate> hivePartitionUpdates =
Lists.newArrayList();
+ // Collect all hivePartitionUpdates obtained from be
+ Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc;
// Input parameter
private long jobId = -1; // job which this task belongs to
@@ -503,10 +504,6 @@ public class Coordinator implements CoordInterface {
return errorTabletInfos;
}
- public List<THivePartitionUpdate> getHivePartitionUpdates() {
- return hivePartitionUpdates;
- }
-
public Map<String, Integer> getBeToInstancesNum() {
Map<String, Integer> result = Maps.newTreeMap();
if (enablePipelineEngine) {
@@ -2442,13 +2439,8 @@ public class Coordinator implements CoordInterface {
// TODO: more ranges?
}
- private void updateHivePartitionUpdates(List<THivePartitionUpdate>
hivePartitionUpdates) {
- lock.lock();
- try {
- this.hivePartitionUpdates.addAll(hivePartitionUpdates);
- } finally {
- lock.unlock();
- }
+ public void
setHivePartitionUpdateFunc(Consumer<List<THivePartitionUpdate>>
hivePartitionUpdateFunc) {
+ this.hivePartitionUpdateFunc = hivePartitionUpdateFunc;
}
// update job progress from BE
@@ -2498,8 +2490,8 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
- if (params.isSetHivePartitionUpdates()) {
- updateHivePartitionUpdates(params.getHivePartitionUpdates());
+ if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc
!= null) {
+
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
}
Preconditions.checkArgument(params.isSetDetailedReport());
@@ -2563,8 +2555,8 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
- if (params.isSetHivePartitionUpdates()) {
-
updateHivePartitionUpdates(params.getHivePartitionUpdates());
+ if (params.isSetHivePartitionUpdates() &&
hivePartitionUpdateFunc != null) {
+
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Query {} instance {} is marked done",
@@ -2635,8 +2627,8 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
- if (params.isSetHivePartitionUpdates()) {
-
updateHivePartitionUpdates(params.getHivePartitionUpdates());
+ if (params.isSetHivePartitionUpdates() &&
hivePartitionUpdateFunc != null) {
+
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
}
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
new file mode 100644
index 00000000000..07304fb23ab
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.hive.HMSTransaction;
+import org.apache.doris.datasource.hive.HiveMetadataOps;
+import org.apache.doris.persist.EditLog;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HiveTransactionManager implements TransactionManager {
+
+ private final Map<Long, HMSTransaction> transactions = new
ConcurrentHashMap<>();
+ private final TransactionIdGenerator idGenerator = new
TransactionIdGenerator();
+ private final HiveMetadataOps ops;
+
+ public HiveTransactionManager(HiveMetadataOps ops) {
+ this.ops = ops;
+ }
+
+ public Long getNextTransactionId() {
+ return idGenerator.getNextTransactionId();
+ }
+
+ @Override
+ public void setEditLog(EditLog editLog) {
+ this.idGenerator.setEditLog(editLog);
+ }
+
+ @Override
+ public long begin() {
+ long id = idGenerator.getNextTransactionId();
+ HMSTransaction hiveTransaction = new HMSTransaction(ops);
+ transactions.put(id, hiveTransaction);
+ return id;
+ }
+
+ @Override
+ public void commit(long id) throws UserException {
+ getTransactionWithException(id).commit();
+ transactions.remove(id);
+ }
+
+ @Override
+ public void rollback(long id) {
+ getTransactionWithException(id).rollback();
+ transactions.remove(id);
+ }
+
+ @Override
+ public HMSTransaction getTransaction(long id) {
+ return getTransactionWithException(id);
+ }
+
+ public HMSTransaction getTransactionWithException(long id) {
+ HMSTransaction hiveTransaction = transactions.get(id);
+ if (hiveTransaction == null) {
+ throw new RuntimeException("Can't find transaction for " + id);
+ }
+ return hiveTransaction;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/Transaction.java
similarity index 56%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
copy to fe/fe-core/src/main/java/org/apache/doris/transaction/Transaction.java
index b7c28b68ff0..b319fb78983 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/Transaction.java
@@ -15,28 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.datasource.hive;
+package org.apache.doris.transaction;
-public class HivePartitionWithStatistics {
- private String name;
- private HivePartition partition;
- private HivePartitionStatistics statistics;
+import org.apache.doris.common.UserException;
- public HivePartitionWithStatistics(String name, HivePartition partition,
HivePartitionStatistics statistics) {
- this.name = name;
- this.partition = partition;
- this.statistics = statistics;
- }
+public interface Transaction {
- public String getName() {
- return name;
- }
+ void commit() throws UserException;
- public HivePartition getPartition() {
- return partition;
- }
-
- public HivePartitionStatistics getStatistics() {
- return statistics;
- }
+ void rollback();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
similarity index 56%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
copy to
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
index b7c28b68ff0..daacdecf152 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
@@ -15,28 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.datasource.hive;
-
-public class HivePartitionWithStatistics {
- private String name;
- private HivePartition partition;
- private HivePartitionStatistics statistics;
-
- public HivePartitionWithStatistics(String name, HivePartition partition,
HivePartitionStatistics statistics) {
- this.name = name;
- this.partition = partition;
- this.statistics = statistics;
- }
-
- public String getName() {
- return name;
- }
-
- public HivePartition getPartition() {
- return partition;
- }
-
- public HivePartitionStatistics getStatistics() {
- return statistics;
- }
+package org.apache.doris.transaction;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.persist.EditLog;
+
+public interface TransactionManager {
+
+ void setEditLog(EditLog editLog);
+
+ long begin();
+
+ void commit(long id) throws UserException;
+
+ void rollback(long id);
+
+ Transaction getTransaction(long id);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
similarity index 56%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
copy to
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
index b7c28b68ff0..334258a3f12 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
@@ -15,28 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.datasource.hive;
+package org.apache.doris.transaction;
-public class HivePartitionWithStatistics {
- private String name;
- private HivePartition partition;
- private HivePartitionStatistics statistics;
+import org.apache.doris.datasource.hive.HiveMetadataOps;
- public HivePartitionWithStatistics(String name, HivePartition partition,
HivePartitionStatistics statistics) {
- this.name = name;
- this.partition = partition;
- this.statistics = statistics;
- }
-
- public String getName() {
- return name;
- }
-
- public HivePartition getPartition() {
- return partition;
- }
+public class TransactionManagerFactory {
- public HivePartitionStatistics getStatistics() {
- return statistics;
+ public static TransactionManager
createHiveTransactionManager(HiveMetadataOps ops) {
+ return new HiveTransactionManager(ops);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index 3098d65e952..fc939625ea9 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -17,13 +17,17 @@
package org.apache.doris.datasource.hive;
+import org.apache.doris.backup.Status;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.thrift.THiveLocationParams;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TUpdateMode;
import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.After;
@@ -41,6 +45,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
@Ignore
public class HmsCommitTest {
@@ -61,6 +68,7 @@ public class HmsCommitTest {
dbLocation = "file://" + warehousePath.toAbsolutePath() + "/";
createTestHiveCatalog();
createTestHiveDatabase();
+ mockFs();
}
@AfterClass
@@ -90,22 +98,55 @@ public class HmsCommitTest {
hmsClient.createDatabase(dbMetadata);
}
+ public static void mockFs() {
+
+ new MockUp<DFSFileSystem>(DFSFileSystem.class) {
+ @Mock
+ public void asyncRenameDir(Executor executor,
+ List<CompletableFuture<?>>
renameFileFutures,
+ AtomicBoolean cancelled,
+ String origFilePath,
+ String destFilePath,
+ Runnable runWhenPathNotExist) {
+ }
+
+ @Mock
+ public void asyncRename(Executor executor,
+ List<CompletableFuture<?>>
renameFileFutures,
+ AtomicBoolean cancelled,
+ String origFilePath,
+ String destFilePath,
+ List<String> fileNames) {
+ }
+
+ @Mock
+ public Status renameDir(String origFilePath,
+ String destFilePath,
+ Runnable runWhenPathNotExist) {
+ return Status.OK;
+ }
+ };
+ }
+
@Before
public void before() {
// create table
List<Column> columns = new ArrayList<>();
columns.add(new Column("c1", PrimitiveType.INT, true));
columns.add(new Column("c2", PrimitiveType.STRING, true));
+ columns.add(new Column("c3", PrimitiveType.STRING, false));
List<String> partitionKeys = new ArrayList<>();
partitionKeys.add("c3");
HiveTableMetadata tableMetadata = new HiveTableMetadata(
dbName, tbWithPartition, columns, partitionKeys,
new HashMap<>(), fileFormat);
hmsClient.createTable(tableMetadata, true);
+
HiveTableMetadata tableMetadata2 = new HiveTableMetadata(
dbName, tbWithoutPartition, columns, new ArrayList<>(),
new HashMap<>(), fileFormat);
hmsClient.createTable(tableMetadata2, true);
+
}
@After
@@ -118,11 +159,7 @@ public class HmsCommitTest {
public void testNewPartitionForUnPartitionedTable() {
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomNew("a"));
- try {
- hmsOps.commit(dbName, tbWithoutPartition, pus);
- } catch (Exception e) {
- Assert.assertEquals("Not support mode:[NEW] in unPartitioned
table", e.getMessage());
- }
+ Assert.assertThrows(Exception.class, () -> commit(dbName,
tbWithoutPartition, pus));
}
@Test
@@ -131,7 +168,7 @@ public class HmsCommitTest {
pus.add(createRandomAppend(""));
pus.add(createRandomAppend(""));
pus.add(createRandomAppend(""));
- hmsOps.commit(dbName, tbWithoutPartition, pus);
+ commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);
@@ -139,7 +176,7 @@ public class HmsCommitTest {
pus2.add(createRandomAppend(""));
pus2.add(createRandomAppend(""));
pus2.add(createRandomAppend(""));
- hmsOps.commit(dbName, tbWithoutPartition, pus2);
+ commit(dbName, tbWithoutPartition, pus2);
table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(6, table);
}
@@ -151,7 +188,7 @@ public class HmsCommitTest {
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(""));
- hmsOps.commit(dbName, tbWithoutPartition, pus);
+ commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);
}
@@ -165,7 +202,7 @@ public class HmsCommitTest {
pus.add(createRandomNew("b"));
pus.add(createRandomNew("b"));
pus.add(createRandomNew("c"));
- hmsOps.commit(dbName, tbWithPartition, pus);
+ commit(dbName, tbWithPartition, pus);
Partition pa = hmsClient.getPartition(dbName, tbWithPartition,
Lists.newArrayList("a"));
assertNumRows(3, pa);
@@ -186,7 +223,7 @@ public class HmsCommitTest {
pus.add(createRandomAppend("b"));
pus.add(createRandomAppend("b"));
pus.add(createRandomAppend("c"));
- hmsOps.commit(dbName, tbWithPartition, pus);
+ commit(dbName, tbWithPartition, pus);
Partition pa = hmsClient.getPartition(dbName, tbWithPartition,
Lists.newArrayList("a"));
assertNumRows(6, pa);
@@ -203,7 +240,7 @@ public class HmsCommitTest {
pus.add(createRandomOverwrite("a"));
pus.add(createRandomOverwrite("b"));
pus.add(createRandomOverwrite("c"));
- hmsOps.commit(dbName, tbWithPartition, pus);
+ commit(dbName, tbWithPartition, pus);
Partition pa = hmsClient.getPartition(dbName, tbWithPartition,
Lists.newArrayList("a"));
assertNumRows(1, pa);
@@ -221,14 +258,14 @@ public class HmsCommitTest {
pus.add(createRandomNew("" + i));
}
- hmsOps.commit(dbName, tbWithPartition, pus);
+ commit(dbName, tbWithPartition, pus);
for (int i = 0; i < nums; i++) {
Partition p = hmsClient.getPartition(dbName, tbWithPartition,
Lists.newArrayList("" + i));
assertNumRows(1, p);
}
try {
- hmsOps.commit(dbName, tbWithPartition, pus);
+ commit(dbName, tbWithPartition, pus);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("failed to add
partitions"));
}
@@ -277,4 +314,13 @@ public class HmsCommitTest {
public THivePartitionUpdate createRandomOverwrite(String partition) {
return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
}
+
+ public void commit(String dbName,
+ String tableName,
+ List<THivePartitionUpdate> hivePUs) {
+ HMSTransaction hmsTransaction = new HMSTransaction(hmsOps);
+ hmsTransaction.setHivePartitionUpdates(hivePUs);
+ hmsTransaction.finishInsertTable(dbName, tableName);
+ hmsTransaction.commit();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]