This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e3170d63199 [bugfix](hive)fix after insert overwrite hive table, data
error for 2.1 (#43049) (#43127)
e3170d63199 is described below
commit e3170d63199d1b6a015ce058f2bd050140dd733d
Author: wuwenchi <[email protected]>
AuthorDate: Sun Nov 3 23:38:47 2024 +0800
[bugfix](hive)fix after insert overwrite hive table, data error for 2.1
(#43049) (#43127)
bp: #43049
---
.../doris/datasource/hive/HMSTransaction.java | 30 ++++++++++++++++------
.../doris/fs/remote/RemoteFSPhantomManager.java | 9 +++++++
.../org/apache/doris/fs/remote/S3FileSystem.java | 15 +++++++++--
.../apache/doris/fs/remote/dfs/DFSFileSystem.java | 6 ++---
.../org/apache/doris/planner/HiveTableSink.java | 2 +-
5 files changed, 48 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 3044b214ab6..8041904723a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -100,7 +100,7 @@ public class HMSTransaction implements Transaction {
private final Executor fileSystemExecutor;
private HmsCommitter hmsCommitter;
private List<THivePartitionUpdate> hivePartitionUpdates =
Lists.newArrayList();
- private String declaredIntentionsToWrite;
+ private Optional<String> stagingDirectory;
private boolean isMockedPartitionUpdate = false;
private static class UncompletedMpuPendingUpload {
@@ -177,10 +177,14 @@ public class HMSTransaction implements Transaction {
}
public void beginInsertTable(HiveInsertCommandContext ctx) {
- declaredIntentionsToWrite = ctx.getWritePath();
queryId = ctx.getQueryId();
isOverwrite = ctx.isOverwrite();
fileType = ctx.getFileType();
+ if (fileType == TFileType.FILE_S3) {
+ stagingDirectory = Optional.empty();
+ } else {
+ stagingDirectory = Optional.of(ctx.getWritePath());
+ }
}
public void finishInsertTable(SimpleTableInfo tableInfo) {
@@ -200,10 +204,12 @@ public class HMSTransaction implements Transaction {
}
});
} else {
- fs.makeDir(declaredIntentionsToWrite);
- setLocation(new THiveLocationParams() {{
- setWritePath(declaredIntentionsToWrite);
- }
+ stagingDirectory.ifPresent((v) -> {
+ fs.makeDir(v);
+ setLocation(new THiveLocationParams() {{
+ setWritePath(v);
+ }
+ });
});
}
}
@@ -636,15 +642,23 @@ public class HMSTransaction implements Transaction {
if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
LOG.warn("Failed to delete directory {}. Some eligible items can't
be deleted: {}.",
directory.toString(),
deleteResult.getNotDeletedEligibleItems());
+ throw new RuntimeException(
+ "Failed to delete directory for files: " +
deleteResult.getNotDeletedEligibleItems());
} else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
LOG.warn("Failed to delete directory {} due to dir isn't empty",
directory.toString());
+ throw new RuntimeException("Failed to delete directory for empty
dir: " + directory.toString());
}
}
private DeleteRecursivelyResult recursiveDeleteFiles(Path directory,
boolean deleteEmptyDir, boolean reverse) {
try {
- if (!fs.directoryExists(directory.toString()).ok()) {
+ Status status = fs.directoryExists(directory.toString());
+ if (status.getErrCode().equals(Status.ErrCode.NOT_FOUND)) {
return new DeleteRecursivelyResult(true, ImmutableList.of());
+ } else if (!status.ok()) {
+ ImmutableList.Builder<String> notDeletedEligibleItems =
ImmutableList.builder();
+ notDeletedEligibleItems.add(directory.toString() + "/*");
+ return new DeleteRecursivelyResult(false,
notDeletedEligibleItems.build());
}
} catch (Exception e) {
ImmutableList.Builder<String> notDeletedEligibleItems =
ImmutableList.builder();
@@ -1440,7 +1454,7 @@ public class HMSTransaction implements Transaction {
}
private void pruneAndDeleteStagingDirectories() {
- recursiveDeleteItems(new Path(declaredIntentionsToWrite), true,
false);
+ stagingDirectory.ifPresent((v) -> recursiveDeleteItems(new
Path(v), true, false));
}
private void abortMultiUploads() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
index 282361c4cb6..c0e48a13466 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
@@ -19,6 +19,7 @@ package org.apache.doris.fs.remote;
import org.apache.doris.common.CustomThreadFactory;
+import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FileSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -27,6 +28,7 @@ import java.io.IOException;
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -63,6 +65,8 @@ public class RemoteFSPhantomManager {
private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>,
FileSystem> referenceMap
= new ConcurrentHashMap<>();
+ private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet();
+
// Flag indicating whether the cleanup thread has been started
private static final AtomicBoolean isStarted = new AtomicBoolean(false);
@@ -77,9 +81,13 @@ public class RemoteFSPhantomManager {
start();
isStarted.set(true);
}
+ if (fsSet.contains(remoteFileSystem.dfsFileSystem)) {
+ throw new RuntimeException("FileSystem already exists: " +
remoteFileSystem.dfsFileSystem.getUri());
+ }
RemoteFileSystemPhantomReference phantomReference = new
RemoteFileSystemPhantomReference(remoteFileSystem,
referenceQueue);
referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem);
+ fsSet.add(remoteFileSystem.dfsFileSystem);
}
/**
@@ -102,6 +110,7 @@ public class RemoteFSPhantomManager {
if (fs != null) {
try {
fs.close();
+ fsSet.remove(fs);
LOG.info("Closed file system: {}",
fs.getUri());
} catch (IOException e) {
LOG.warn("Failed to close file system", e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index 87ba086baec..f8805bd0d4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -20,6 +20,8 @@ package org.apache.doris.fs.remote;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.security.authentication.AuthenticationConfig;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.obj.S3ObjStorage;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
@@ -34,6 +36,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -74,12 +77,20 @@ public class S3FileSystem extends ObjFileSystem {
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
.filter(entry -> entry.getKey() != null &&
entry.getValue() != null)
.forEach(entry -> conf.set(entry.getKey(),
entry.getValue()));
+ AuthenticationConfig authConfig =
AuthenticationConfig.getKerberosConfig(conf);
+ HadoopAuthenticator authenticator =
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
try {
- dfsFileSystem = FileSystem.get(new
Path(remotePath).toUri(), conf);
+ dfsFileSystem = authenticator.doAs(() -> {
+ try {
+ return FileSystem.get(new
Path(remotePath).toUri(), conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ RemoteFSPhantomManager.registerPhantomReference(this);
} catch (Exception e) {
throw new UserException("Failed to get S3 FileSystem
for " + e.getMessage(), e);
}
- RemoteFSPhantomManager.registerPhantomReference(this);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 7034641a9fc..2146472aec7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -99,11 +99,11 @@ public class DFSFileSystem extends RemoteFileSystem {
throw new RuntimeException(e);
}
});
+ operations = new HDFSFileOperations(dfsFileSystem);
+ RemoteFSPhantomManager.registerPhantomReference(this);
} catch (Exception e) {
- throw new UserException(e);
+ throw new UserException("Failed to get dfs FileSystem
for " + e.getMessage(), e);
}
- operations = new HDFSFileOperations(dfsFileSystem);
- RemoteFSPhantomManager.registerPhantomReference(this);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index 97671581b29..93774d49e37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -133,7 +133,7 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
if (insertCtx.isPresent()) {
HiveInsertCommandContext context = (HiveInsertCommandContext)
insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
- context.setWritePath(storageLocation);
+ context.setWritePath(location);
context.setFileType(fileType);
}
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]