This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new d918b5c8137 branch-4.0: [feature](hive_write) add hive_staging_dir
catalog properties and change default value under the writing target table.
#60018 (#60217)
d918b5c8137 is described below
commit d918b5c81372ae6e8d0402f1ca8e5d535e2a9cc3
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jan 27 10:32:43 2026 +0800
branch-4.0: [feature](hive_write) add hive_staging_dir catalog properties
and change default value under the writing target table. #60018 (#60217)
Cherry-picked from #60018
Co-authored-by: Qi Chen <[email protected]>
---
.../org/apache/doris/common/util/LocationPath.java | 1 +
.../doris/datasource/hive/HMSExternalCatalog.java | 1 +
.../doris/datasource/hive/HMSTransaction.java | 182 ++++++++++++--
.../org/apache/doris/planner/HiveTableSink.java | 10 +-
.../datasource/hive/HMSTransactionPathTest.java | 262 +++++++++++++++++++++
.../hive/write/test_hive_staging_dir.out | 11 +
.../hive/write/test_hive_staging_dir.groovy | 134 +++++++++++
7 files changed, 580 insertions(+), 21 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index b41cb950ff8..9bb64d09737 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -366,6 +366,7 @@ public class LocationPath {
}
public static String getTempWritePath(String loc, String prefix) {
+ // If prefix is relative, it is resolved under loc; if absolute, it is
used as the base path.
Path tempRoot = new Path(loc, prefix);
Path tempPath = new Path(tempRoot,
UUID.randomUUID().toString().replace("-", ""));
return tempPath.toString();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 8b5e93f113c..f8dac381cca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -56,6 +56,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
public static final String FILE_META_CACHE_TTL_SECOND =
"file.meta.cache.ttl-second";
public static final String PARTITION_CACHE_TTL_SECOND =
"partition.cache.ttl-second";
+ public static final String HIVE_STAGING_DIR = "hive.staging_dir";
// broker name for file split and query scan.
public static final String BIND_BROKER_NAME = "broker.name";
// Default is false, if set to true, will get table schema from
"remoteTable" instead of from hive metastore.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 54ee9f46ec5..f7ff09dcdc0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -44,6 +44,7 @@ import org.apache.doris.thrift.TS3MPUPendingUpload;
import org.apache.doris.thrift.TUpdateMode;
import org.apache.doris.transaction.Transaction;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
@@ -65,6 +66,7 @@ import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedPart;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -1276,27 +1278,41 @@ public class HMSTransaction implements Transaction {
String targetPath = table.getSd().getLocation();
String writePath = tableAndMore.getCurrentLocation();
if (!targetPath.equals(writePath)) {
- Path path = new Path(targetPath);
- String oldTablePath = new Path(
- path.getParent(), "_temp_" + queryId + "_" +
path.getName()).toString();
- Status status = wrapperRenameDirWithProfileSummary(
- targetPath,
- oldTablePath,
- () -> renameDirectoryTasksForAbort.add(new
RenameDirectoryTask(oldTablePath, targetPath)));
- if (!status.ok()) {
- throw new RuntimeException(
- "Error to rename dir from " + targetPath + " to "
+ oldTablePath + status.getErrMsg());
- }
- clearDirsForFinish.add(oldTablePath);
+ if (isSubDirectory(targetPath, writePath)) {
+ String stagingRoot = getImmediateChildPath(targetPath,
writePath);
+ deleteTargetPathContents(targetPath, stagingRoot);
+ ensureDirectory(targetPath);
+ wrapperAsyncRenameWithProfileSummary(
+ fileSystemExecutor,
+ asyncFileSystemTaskFutures,
+ fileSystemTaskCancelled,
+ writePath,
+ targetPath,
+ tableAndMore.getFileNames());
+ } else {
+ Path path = new Path(targetPath);
+ String oldTablePath = new Path(
+ path.getParent(), "_temp_" + queryId + "_" +
path.getName()).toString();
+ Status status = wrapperRenameDirWithProfileSummary(
+ targetPath,
+ oldTablePath,
+ () -> renameDirectoryTasksForAbort.add(new
RenameDirectoryTask(oldTablePath, targetPath)));
+ if (!status.ok()) {
+ throw new RuntimeException(
+ "Error to rename dir from " + targetPath + "
to " + oldTablePath + status.getErrMsg());
+ }
+ clearDirsForFinish.add(oldTablePath);
- status = wrapperRenameDirWithProfileSummary(
- writePath,
- targetPath,
- () -> directoryCleanUpTasksForAbort.add(
- new DirectoryCleanUpTask(targetPath, true)));
- if (!status.ok()) {
- throw new RuntimeException(
- "Error to rename dir from " + writePath + " to " +
targetPath + ":" + status.getErrMsg());
+ status = wrapperRenameDirWithProfileSummary(
+ writePath,
+ targetPath,
+ () -> directoryCleanUpTasksForAbort.add(
+ new DirectoryCleanUpTask(targetPath,
true)));
+ if (!status.ok()) {
+ throw new RuntimeException(
+ "Error to rename dir from " + writePath + " to
" + targetPath
+ + ":" + status.getErrMsg());
+ }
}
} else {
if
(!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
@@ -1620,6 +1636,132 @@ public class HMSTransaction implements Transaction {
}
}
+ @VisibleForTesting
+ static boolean isSubDirectory(String parent, String child) {
+ if (parent == null || child == null) {
+ return false;
+ }
+ Path parentPath = new Path(parent);
+ Path childPath = new Path(child);
+ URI parentUri = parentPath.toUri();
+ URI childUri = childPath.toUri();
+ if (!sameFileSystem(parentUri, childUri)) {
+ return false;
+ }
+ String parentPathValue = normalizePath(parentUri.getPath());
+ String childPathValue = normalizePath(childUri.getPath());
+ if (parentPathValue.isEmpty() || childPathValue.isEmpty()) {
+ return false;
+ }
+ return !parentPathValue.equals(childPathValue)
+ && childPathValue.startsWith(parentPathValue + "/");
+ }
+
+ /**
+ * Returns the first-level child path of {@code parent} that contains
{@code child},
+ * or null if {@code child} is not a subdirectory of {@code parent}.
+ * Example: parent=/warehouse/table,
child=/warehouse/table/.doris_staging/user/uuid
+ * returns /warehouse/table/.doris_staging.
+ */
+ @VisibleForTesting
+ static String getImmediateChildPath(String parent, String child) {
+ if (!isSubDirectory(parent, child)) {
+ return null;
+ }
+ Path parentPath = new Path(parent);
+ URI parentUri = parentPath.toUri();
+ URI childUri = new Path(child).toUri();
+ String parentPathValue = normalizePath(parentUri.getPath());
+ String childPathValue = normalizePath(childUri.getPath());
+ String relative = childPathValue.substring(parentPathValue.length() +
1);
+ int slashIndex = relative.indexOf("/");
+ String firstComponent = slashIndex == -1 ? relative :
relative.substring(0, slashIndex);
+ return new Path(parentPath, firstComponent).toString();
+ }
+
+ private static boolean sameFileSystem(URI left, URI right) {
+ String leftScheme = normalizeUriPart(left.getScheme());
+ String rightScheme = normalizeUriPart(right.getScheme());
+ if (!leftScheme.isEmpty() && !rightScheme.isEmpty()
+ && !leftScheme.equalsIgnoreCase(rightScheme)) {
+ return false;
+ }
+ String leftAuthority = normalizeUriPart(left.getAuthority());
+ String rightAuthority = normalizeUriPart(right.getAuthority());
+ if (!leftAuthority.isEmpty() && !rightAuthority.isEmpty()
+ && !leftAuthority.equalsIgnoreCase(rightAuthority)) {
+ return false;
+ }
+ return true;
+ }
+
+ private static String normalizeUriPart(String value) {
+ return value == null ? "" : value;
+ }
+
+ private static String normalizePath(String path) {
+ if (path == null || path.isEmpty()) {
+ return "";
+ }
+ int end = path.length();
+ while (end > 1 && path.charAt(end - 1) == '/') {
+ end--;
+ }
+ return path.substring(0, end);
+ }
+
+ private static boolean pathsEqual(String left, String right) {
+ if (left == null || right == null) {
+ return left == null && right == null;
+ }
+ URI leftUri = new Path(left).toUri();
+ URI rightUri = new Path(right).toUri();
+ if (!sameFileSystem(leftUri, rightUri)) {
+ return false;
+ }
+ return
normalizePath(leftUri.getPath()).equals(normalizePath(rightUri.getPath()));
+ }
+
+ @VisibleForTesting
+ void deleteTargetPathContents(String targetPath, String excludedChildPath)
{
+ Set<String> dirs = new HashSet<>();
+ Status status = fs.listDirectories(targetPath, dirs);
+ if (!status.ok() &&
!Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
+ throw new RuntimeException(
+ "Failed to list directories under " + targetPath + ":" +
status.getErrMsg());
+ }
+ for (String dir : dirs) {
+ if (excludedChildPath != null && pathsEqual(dir,
excludedChildPath)) {
+ continue;
+ }
+ Status deleteStatus = wrapperDeleteDirWithProfileSummary(dir);
+ if (!deleteStatus.ok() &&
!Status.ErrCode.NOT_FOUND.equals(deleteStatus.getErrCode())) {
+ throw new RuntimeException("Failed to delete directory " + dir
+ ":" + deleteStatus.getErrMsg());
+ }
+ }
+
+ List<RemoteFile> files = new ArrayList<>();
+ status = fs.listFiles(targetPath, false, files);
+ if (!status.ok() &&
!Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
+ throw new RuntimeException(
+ "Failed to list files under " + targetPath + ":" +
status.getErrMsg());
+ }
+ for (RemoteFile file : files) {
+ Status deleteStatus =
wrapperDeleteWithProfileSummary(file.getPath().toString());
+ if (!deleteStatus.ok() &&
!Status.ErrCode.NOT_FOUND.equals(deleteStatus.getErrCode())) {
+ throw new RuntimeException("Failed to delete file " +
file.getPath() + ":" + deleteStatus.getErrMsg());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void ensureDirectory(String path) {
+ Status status = fs.makeDir(path);
+ if (!status.ok()) {
+ throw new RuntimeException("Failed to create directory " + path +
":" + status.getErrMsg());
+ }
+ }
+
public Status wrapperRenameDirWithProfileSummary(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index 22ceca331e5..a0693768edc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -48,6 +48,7 @@ import org.apache.doris.thrift.THiveSerDeProperties;
import org.apache.doris.thrift.THiveTableSink;
import com.google.common.base.Strings;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -168,7 +169,14 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
private String createTempPath(String location) {
String user = ConnectContext.get().getCurrentUserIdentity().getUser();
- return LocationPath.getTempWritePath(location, "/tmp/.doris_staging/"
+ user);
+ String defaultStagingBaseDir = ".doris_staging";
+ String stagingBaseDir = targetTable.getCatalog().getCatalogProperty()
+ .getOrDefault(HMSExternalCatalog.HIVE_STAGING_DIR,
defaultStagingBaseDir);
+ if (Strings.isNullOrEmpty(stagingBaseDir)) {
+ stagingBaseDir = defaultStagingBaseDir;
+ }
+ String stagingDir = new Path(stagingBaseDir, user).toString();
+ return LocationPath.getTempWritePath(location, stagingDir);
}
private void setCompressType(THiveTableSink tSink, TFileFormatType
formatType) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
new file mode 100644
index 00000000000..a1f7d146650
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.fs.FileSystem;
+import org.apache.doris.fs.FileSystemProvider;
+import org.apache.doris.fs.LocalDfsFileSystem;
+import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.SwitchingFileSystem;
+import org.apache.doris.qe.ConnectContext;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class HMSTransactionPathTest {
+ private ConnectContext connectContext;
+
+ @Before
+ public void setUp() {
+ connectContext = new ConnectContext();
+ connectContext.setThreadLocalInfo();
+ }
+
+ @After
+ public void tearDown() {
+ ConnectContext.remove();
+ connectContext = null;
+ }
+
+ @Test
+ public void testIsSubDirectory() throws Exception {
+ Assert.assertFalse(HMSTransaction.isSubDirectory(null, "/a"));
+ Assert.assertFalse(HMSTransaction.isSubDirectory("/a", null));
+ Assert.assertFalse(HMSTransaction.isSubDirectory("/a/b", "/a/b"));
+ Assert.assertFalse(HMSTransaction.isSubDirectory("/a/b", "/a/bc"));
+ Assert.assertFalse(HMSTransaction.isSubDirectory(
+ "hdfs://host1:8020/a", "hdfs://host2:8020/a/b"));
+ Assert.assertTrue(HMSTransaction.isSubDirectory(
+ "hdfs://host:8020/a/b/", "hdfs://host:8020/a/b/c/d"));
+ Assert.assertTrue(HMSTransaction.isSubDirectory("a/b", "a/b/c"));
+ }
+
+ @Test
+ public void testGetImmediateChildPath() throws Exception {
+ String parent = "hdfs://host:8020/warehouse/table";
+ String child =
"hdfs://host:8020/warehouse/table/.doris_staging/user/uuid";
+ Assert.assertEquals(
+ "hdfs://host:8020/warehouse/table/.doris_staging",
+ HMSTransaction.getImmediateChildPath(parent, child));
+
+ String directChild = "hdfs://host:8020/warehouse/table/part=1";
+ Assert.assertEquals(
+ "hdfs://host:8020/warehouse/table/part=1",
+ HMSTransaction.getImmediateChildPath(parent, directChild));
+
+ String notSubdir = "hdfs://host:8020/warehouse/other";
+ Assert.assertNull(HMSTransaction.getImmediateChildPath(parent,
notSubdir));
+ }
+
+ // Ensures NOT_FOUND results from list operations are treated as no-op
cleanup.
+ @Test
+ public void testDeleteTargetPathContentsNotFoundAllowed() throws Exception
{
+ FakeFileSystem fakeFs = new FakeFileSystem();
+ fakeFs.listDirectoriesStatus = new Status(Status.ErrCode.NOT_FOUND,
"missing");
+ fakeFs.listFilesStatus = new Status(Status.ErrCode.NOT_FOUND,
"missing");
+
+ HMSTransaction transaction = createTransaction(fakeFs);
+ transaction.deleteTargetPathContents(
+ "/tmp/does_not_exist", "/tmp/does_not_exist/.doris_staging");
+ Assert.assertTrue(fakeFs.deletedDirectories.isEmpty());
+ Assert.assertTrue(fakeFs.deletedFiles.isEmpty());
+ }
+
+ // Verifies listDirectories failures surface as runtime errors.
+ @Test
+ public void testDeleteTargetPathContentsListError() throws Exception {
+ FakeFileSystem fakeFs = new FakeFileSystem();
+ fakeFs.listDirectoriesStatus = new Status(Status.ErrCode.COMMON_ERROR,
"list failed");
+
+ HMSTransaction transaction = createTransaction(fakeFs);
+ Assert.assertThrows(RuntimeException.class, () ->
transaction.deleteTargetPathContents(
+ "/tmp/target", "/tmp/target/.doris_staging"));
+ }
+
+ @Test
+ public void testEnsureDirectorySuccess() throws Exception {
+ LocalDfsFileSystem localFs = new LocalDfsFileSystem();
+ HMSTransaction transaction = createTransaction(localFs);
+
+ java.nio.file.Path dir =
Files.createTempDirectory("hms_tx_ensure_").resolve("nested");
+ transaction.ensureDirectory(dir.toString());
+
+ Assert.assertTrue(Files.exists(dir));
+ }
+
+ @Test
+ public void testEnsureDirectoryError() throws Exception {
+ FakeFileSystem fakeFs = new FakeFileSystem();
+ fakeFs.makeDirStatus = new Status(Status.ErrCode.COMMON_ERROR, "mkdir
failed");
+
+ HMSTransaction transaction = createTransaction(fakeFs);
+ Assert.assertThrows(RuntimeException.class, () ->
transaction.ensureDirectory("/tmp/target"));
+ }
+
+ // Verifies the staging-under-target flow:
+ // 1) Detect write path nested under target.
+ // 2) Compute the immediate staging root under target.
+ // 3) Delete target contents while preserving the staging root.
+ // 4) Ensure the target directory exists after cleanup.
+ @Test
+ public void testDeleteTargetPathContentsSkipsExcludedDir() throws
Exception {
+ LocalDfsFileSystem localFs = new LocalDfsFileSystem();
+ HMSTransaction transaction = createTransaction(localFs);
+
+ java.nio.file.Path targetDir =
Files.createTempDirectory("hms_tx_path_test_");
+ java.nio.file.Path stagingDir = targetDir.resolve(".doris_staging");
+ java.nio.file.Path writeDir = stagingDir.resolve("user/uuid");
+ java.nio.file.Path stagingFile = stagingDir.resolve("staging.tmp");
+ java.nio.file.Path otherDir = targetDir.resolve("part=1");
+ java.nio.file.Path otherFile = targetDir.resolve("data.txt");
+
+ Files.createDirectories(stagingDir);
+ Files.createDirectories(writeDir);
+ Files.createFile(stagingFile);
+ Files.createDirectories(otherDir);
+ Files.createFile(otherFile);
+
+ String targetPath = targetDir.toString();
+ String writePath = writeDir.toString();
+ Assert.assertTrue(HMSTransaction.isSubDirectory(targetPath,
writePath));
+ String stagingRoot = HMSTransaction.getImmediateChildPath(targetPath,
writePath);
+ transaction.deleteTargetPathContents(targetPath, stagingRoot);
+ transaction.ensureDirectory(targetPath);
+
+ Assert.assertTrue(Files.exists(stagingDir));
+ Assert.assertTrue(Files.exists(stagingFile));
+ Assert.assertFalse(Files.exists(otherDir));
+ Assert.assertFalse(Files.exists(otherFile));
+ }
+
+ private static HMSTransaction createTransaction(FileSystem delegate) {
+ SwitchingFileSystem switchingFs = new
TestSwitchingFileSystem(delegate);
+ FileSystemProvider provider = ctx -> switchingFs;
+ return new HMSTransaction(null, provider, Runnable::run);
+ }
+
+ private static class TestSwitchingFileSystem extends SwitchingFileSystem {
+ private final FileSystem delegate;
+
+ TestSwitchingFileSystem(FileSystem delegate) {
+ super(null, null);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public FileSystem fileSystem(String location) {
+ return delegate;
+ }
+ }
+
+ private static class FakeFileSystem implements FileSystem {
+ private Status listDirectoriesStatus = Status.OK;
+ private Status listFilesStatus = Status.OK;
+ private Status makeDirStatus = Status.OK;
+ private Status deleteStatus = Status.OK;
+ private Status deleteDirStatus = Status.OK;
+
+ private final List<String> deletedDirectories = new ArrayList<>();
+ private final List<String> deletedFiles = new ArrayList<>();
+
+ @Override
+ public Status listDirectories(String remotePath, Set<String> result) {
+ if (!listDirectoriesStatus.ok()) {
+ return listDirectoriesStatus;
+ }
+ return Status.OK;
+ }
+
+ @Override
+ public Status listFiles(String remotePath, boolean recursive,
List<RemoteFile> result) {
+ if (!listFilesStatus.ok()) {
+ return listFilesStatus;
+ }
+ return Status.OK;
+ }
+
+ @Override
+ public Status deleteDirectory(String dir) {
+ deletedDirectories.add(dir);
+ return deleteDirStatus;
+ }
+
+ @Override
+ public Status delete(String remotePath) {
+ deletedFiles.add(remotePath);
+ return deleteStatus;
+ }
+
+ @Override
+ public Status makeDir(String remotePath) {
+ return makeDirStatus;
+ }
+
+ @Override
+ public Status exists(String remotePath) {
+ return Status.OK;
+ }
+
+ @Override
+ public Status downloadWithFileSize(String remoteFilePath, String
localFilePath, long fileSize) {
+ return Status.OK;
+ }
+
+ @Override
+ public Status upload(String localPath, String remotePath) {
+ return Status.OK;
+ }
+
+ @Override
+ public Status directUpload(String content, String remoteFile) {
+ return Status.OK;
+ }
+
+ @Override
+ public Status rename(String origFilePath, String destFilePath) {
+ return Status.OK;
+ }
+
+ @Override
+ public Status globList(String remotePath, List<RemoteFile> result,
boolean fileNameOnly) {
+ return Status.OK;
+ }
+
+ @Override
+ public java.util.Map<String, String> getProperties() {
+ return null;
+ }
+ }
+}
diff --git
a/regression-test/data/external_table_p0/hive/write/test_hive_staging_dir.out
b/regression-test/data/external_table_p0/hive/write/test_hive_staging_dir.out
new file mode 100644
index 00000000000..021de9148ae
--- /dev/null
+++
b/regression-test/data/external_table_p0/hive/write/test_hive_staging_dir.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q01 --
+1
+
+-- !q02 --
+1
+2
+
+-- !q03 --
+1
+
diff --git
a/regression-test/suites/external_table_p0/hive/write/test_hive_staging_dir.groovy
b/regression-test/suites/external_table_p0/hive/write/test_hive_staging_dir.groovy
new file mode 100644
index 00000000000..087e05e0c1e
--- /dev/null
+++
b/regression-test/suites/external_table_p0/hive/write/test_hive_staging_dir.groovy
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.Hdfs
+import org.apache.hadoop.fs.Path
+
+import java.net.URI
+
+suite("test_hive_staging_dir",
"p0,external,hive,external_docker,external_docker_hive") {
+ def getHiveTableLocation = { String db, String tbl ->
+ def rows = hive_docker """describe formatted `${db}`.`${tbl}`"""
+ for (def row : rows) {
+ if (row == null || row.size() < 2) {
+ continue
+ }
+ String key = row[0] == null ? "" : row[0].toString().trim()
+ if (key.equalsIgnoreCase("Location") ||
key.equalsIgnoreCase("Location:")) {
+ def value = row[1]
+ if (value != null && !value.toString().trim().isEmpty()) {
+ return value.toString().trim()
+ }
+ }
+ }
+ throw new RuntimeException("Failed to find location for ${db}.${tbl}")
+ }
+
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable Hive test.")
+ return
+ }
+
+ for (String hivePrefix : ["hive3"]) {
+ setHivePrefix(hivePrefix)
+ String catalogName = "test_${hivePrefix}_staging_dir"
+ String dbName = "write_test"
+ String tableRel = "staging_dir_rel_${hivePrefix}"
+ String tableAbs = "staging_dir_abs_${hivePrefix}"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String hmsPort = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
+ String hdfsPort = context.config.otherConfigs.get(hivePrefix +
"HdfsPort")
+
+ String currentUser = (sql "select current_user()")[0][0].toString()
+ if (currentUser.contains("@")) {
+ currentUser = currentUser.substring(0, currentUser.indexOf("@"))
+ }
+ currentUser = currentUser.replaceAll("[^A-Za-z0-9_-]", "")
+ String suffix = System.currentTimeMillis().toString()
+ String stagingRelBase = ".doris_staging_test_${suffix}"
+
+ String hdfsUser = context.config.otherConfigs.get("hdfsUser")
+ if (hdfsUser == null || hdfsUser.isEmpty()) {
+ hdfsUser = "root"
+ }
+
+ try {
+ hive_docker """create database if not exists `${dbName}`"""
+ hive_docker """drop table if exists `${dbName}`.`${tableRel}`"""
+ hive_docker """drop table if exists `${dbName}`.`${tableAbs}`"""
+ hive_docker """create table `${dbName}`.`${tableRel}` (k1 int)
stored as orc"""
+ hive_docker """create table `${dbName}`.`${tableAbs}` (k1 int)
stored as orc"""
+
+ String relLocation = getHiveTableLocation(dbName, tableRel)
+ URI tableUri = new URI(relLocation)
+ String hdfsUri = "hdfs://${externalEnvIp}:${hdfsPort}"
+ if (tableUri.getScheme() != null && tableUri.getAuthority() !=
null) {
+ hdfsUri = tableUri.getScheme() + "://" +
tableUri.getAuthority()
+ }
+ String stagingAbsBase =
"${hdfsUri}/tmp/doris_staging_abs_${suffix}"
+ String stagingRelWithUser = new Path(stagingRelBase,
currentUser).toString()
+ String stagingAbs = new Path(stagingAbsBase,
currentUser).toString()
+
+ Hdfs hdfs = new Hdfs(hdfsUri, hdfsUser, context.config.dataPath +
"/")
+ def fs = hdfs.fs
+ String defaultStagingRel = ".doris_staging/${currentUser}"
+ String defaultBase = new Path(relLocation,
defaultStagingRel).toString()
+ String relBase = new Path(relLocation,
stagingRelWithUser).toString()
+
+ fs.delete(new Path(defaultBase), true)
+ fs.delete(new Path(relBase), true)
+ fs.delete(new Path(stagingAbs), true)
+
+ sql """drop catalog if exists ${catalogName}"""
+ sql """create catalog if not exists ${catalogName} properties (
+ 'type'='hms',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
+ 'fs.defaultFS' = '${hdfsUri}'
+ );"""
+ sql """refresh catalog ${catalogName}"""
+ sql """use `${catalogName}`.`${dbName}`"""
+ sql """set enable_fallback_to_original_planner=false;"""
+
+ sql """insert into `${tableRel}` values (1)"""
+ order_qt_q01 """ select * from `${tableRel}`"""
+ assertTrue(fs.exists(new Path(defaultBase)),
+ "default staging dir not created: ${defaultBase}")
+ fs.delete(new Path(defaultBase), true)
+
+ sql """alter catalog ${catalogName} set properties
('hive.staging_dir' = '${stagingRelBase}')"""
+ sql """refresh catalog ${catalogName}"""
+ sql """insert into `${tableRel}` values (2)"""
+ order_qt_q02 """ select * from `${tableRel}`"""
+ assertTrue(fs.exists(new Path(relBase)),
+ "relative staging dir not created: ${relBase}")
+ fs.delete(new Path(relBase), true)
+
+ sql """alter catalog ${catalogName} set properties
('hive.staging_dir' = '${stagingAbsBase}')"""
+ sql """refresh catalog ${catalogName}"""
+ sql """insert into `${tableAbs}` values (1)"""
+ order_qt_q03 """ select * from `${tableAbs}`"""
+ assertTrue(fs.exists(new Path(stagingAbs)),
+ "absolute staging dir not created: ${stagingAbs}")
+ fs.delete(new Path(stagingAbs), true)
+ } finally {
+ try_hive_docker """drop table if exists
`${dbName}`.`${tableRel}`"""
+ try_hive_docker """drop table if exists
`${dbName}`.`${tableAbs}`"""
+ try_sql """drop catalog if exists ${catalogName}"""
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]