This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new d918b5c8137 branch-4.0: [feature](hive_write) add hive_staging_dir 
catalog properties and change default value under the writing target table. 
#60018 (#60217)
d918b5c8137 is described below

commit d918b5c81372ae6e8d0402f1ca8e5d535e2a9cc3
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jan 27 10:32:43 2026 +0800

    branch-4.0: [feature](hive_write) add hive_staging_dir catalog properties 
and change default value under the writing target table. #60018 (#60217)
    
    Cherry-picked from #60018
    
    Co-authored-by: Qi Chen <[email protected]>
---
 .../org/apache/doris/common/util/LocationPath.java |   1 +
 .../doris/datasource/hive/HMSExternalCatalog.java  |   1 +
 .../doris/datasource/hive/HMSTransaction.java      | 182 ++++++++++++--
 .../org/apache/doris/planner/HiveTableSink.java    |  10 +-
 .../datasource/hive/HMSTransactionPathTest.java    | 262 +++++++++++++++++++++
 .../hive/write/test_hive_staging_dir.out           |  11 +
 .../hive/write/test_hive_staging_dir.groovy        | 134 +++++++++++
 7 files changed, 580 insertions(+), 21 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index b41cb950ff8..9bb64d09737 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -366,6 +366,7 @@ public class LocationPath {
     }
 
     public static String getTempWritePath(String loc, String prefix) {
+        // If prefix is relative, it is resolved under loc; if absolute, it is 
used as the base path.
         Path tempRoot = new Path(loc, prefix);
         Path tempPath = new Path(tempRoot, 
UUID.randomUUID().toString().replace("-", ""));
         return tempPath.toString();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 8b5e93f113c..f8dac381cca 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -56,6 +56,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
 
     public static final String FILE_META_CACHE_TTL_SECOND = 
"file.meta.cache.ttl-second";
     public static final String PARTITION_CACHE_TTL_SECOND = 
"partition.cache.ttl-second";
+    public static final String HIVE_STAGING_DIR = "hive.staging_dir";
     // broker name for file split and query scan.
     public static final String BIND_BROKER_NAME = "broker.name";
     // Default is false, if set to true, will get table schema from 
"remoteTable" instead of from hive metastore.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 54ee9f46ec5..f7ff09dcdc0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -44,6 +44,7 @@ import org.apache.doris.thrift.TS3MPUPendingUpload;
 import org.apache.doris.thrift.TUpdateMode;
 import org.apache.doris.transaction.Transaction;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
@@ -65,6 +66,7 @@ import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.CompletedPart;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1276,27 +1278,41 @@ public class HMSTransaction implements Transaction {
             String targetPath = table.getSd().getLocation();
             String writePath = tableAndMore.getCurrentLocation();
             if (!targetPath.equals(writePath)) {
-                Path path = new Path(targetPath);
-                String oldTablePath = new Path(
-                        path.getParent(), "_temp_" + queryId + "_" + 
path.getName()).toString();
-                Status status = wrapperRenameDirWithProfileSummary(
-                        targetPath,
-                        oldTablePath,
-                        () -> renameDirectoryTasksForAbort.add(new 
RenameDirectoryTask(oldTablePath, targetPath)));
-                if (!status.ok()) {
-                    throw new RuntimeException(
-                            "Error to rename dir from " + targetPath + " to " 
+ oldTablePath + status.getErrMsg());
-                }
-                clearDirsForFinish.add(oldTablePath);
+                if (isSubDirectory(targetPath, writePath)) {
+                    String stagingRoot = getImmediateChildPath(targetPath, 
writePath);
+                    deleteTargetPathContents(targetPath, stagingRoot);
+                    ensureDirectory(targetPath);
+                    wrapperAsyncRenameWithProfileSummary(
+                            fileSystemExecutor,
+                            asyncFileSystemTaskFutures,
+                            fileSystemTaskCancelled,
+                            writePath,
+                            targetPath,
+                            tableAndMore.getFileNames());
+                } else {
+                    Path path = new Path(targetPath);
+                    String oldTablePath = new Path(
+                            path.getParent(), "_temp_" + queryId + "_" + 
path.getName()).toString();
+                    Status status = wrapperRenameDirWithProfileSummary(
+                            targetPath,
+                            oldTablePath,
+                            () -> renameDirectoryTasksForAbort.add(new 
RenameDirectoryTask(oldTablePath, targetPath)));
+                    if (!status.ok()) {
+                        throw new RuntimeException(
+                                "Error to rename dir from " + targetPath + " 
to " + oldTablePath + status.getErrMsg());
+                    }
+                    clearDirsForFinish.add(oldTablePath);
 
-                status = wrapperRenameDirWithProfileSummary(
-                        writePath,
-                        targetPath,
-                        () -> directoryCleanUpTasksForAbort.add(
-                                new DirectoryCleanUpTask(targetPath, true)));
-                if (!status.ok()) {
-                    throw new RuntimeException(
-                            "Error to rename dir from " + writePath + " to " + 
targetPath + ":" + status.getErrMsg());
+                    status = wrapperRenameDirWithProfileSummary(
+                            writePath,
+                            targetPath,
+                            () -> directoryCleanUpTasksForAbort.add(
+                                    new DirectoryCleanUpTask(targetPath, 
true)));
+                    if (!status.ok()) {
+                        throw new RuntimeException(
+                                "Error to rename dir from " + writePath + " to 
" + targetPath
+                                        + ":" + status.getErrMsg());
+                    }
                 }
             } else {
                 if 
(!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
@@ -1620,6 +1636,132 @@ public class HMSTransaction implements Transaction {
         }
     }
 
+    @VisibleForTesting
+    static boolean isSubDirectory(String parent, String child) {
+        if (parent == null || child == null) {
+            return false;
+        }
+        Path parentPath = new Path(parent);
+        Path childPath = new Path(child);
+        URI parentUri = parentPath.toUri();
+        URI childUri = childPath.toUri();
+        if (!sameFileSystem(parentUri, childUri)) {
+            return false;
+        }
+        String parentPathValue = normalizePath(parentUri.getPath());
+        String childPathValue = normalizePath(childUri.getPath());
+        if (parentPathValue.isEmpty() || childPathValue.isEmpty()) {
+            return false;
+        }
+        return !parentPathValue.equals(childPathValue)
+                && childPathValue.startsWith(parentPathValue + "/");
+    }
+
+    /**
+     * Returns the first-level child path of {@code parent} that contains 
{@code child},
+     * or null if {@code child} is not a subdirectory of {@code parent}.
+     * Example: parent=/warehouse/table, 
child=/warehouse/table/.doris_staging/user/uuid
+     * returns /warehouse/table/.doris_staging.
+     */
+    @VisibleForTesting
+    static String getImmediateChildPath(String parent, String child) {
+        if (!isSubDirectory(parent, child)) {
+            return null;
+        }
+        Path parentPath = new Path(parent);
+        URI parentUri = parentPath.toUri();
+        URI childUri = new Path(child).toUri();
+        String parentPathValue = normalizePath(parentUri.getPath());
+        String childPathValue = normalizePath(childUri.getPath());
+        String relative = childPathValue.substring(parentPathValue.length() + 
1);
+        int slashIndex = relative.indexOf("/");
+        String firstComponent = slashIndex == -1 ? relative : 
relative.substring(0, slashIndex);
+        return new Path(parentPath, firstComponent).toString();
+    }
+
+    private static boolean sameFileSystem(URI left, URI right) {
+        String leftScheme = normalizeUriPart(left.getScheme());
+        String rightScheme = normalizeUriPart(right.getScheme());
+        if (!leftScheme.isEmpty() && !rightScheme.isEmpty()
+                && !leftScheme.equalsIgnoreCase(rightScheme)) {
+            return false;
+        }
+        String leftAuthority = normalizeUriPart(left.getAuthority());
+        String rightAuthority = normalizeUriPart(right.getAuthority());
+        if (!leftAuthority.isEmpty() && !rightAuthority.isEmpty()
+                && !leftAuthority.equalsIgnoreCase(rightAuthority)) {
+            return false;
+        }
+        return true;
+    }
+
+    private static String normalizeUriPart(String value) {
+        return value == null ? "" : value;
+    }
+
+    private static String normalizePath(String path) {
+        if (path == null || path.isEmpty()) {
+            return "";
+        }
+        int end = path.length();
+        while (end > 1 && path.charAt(end - 1) == '/') {
+            end--;
+        }
+        return path.substring(0, end);
+    }
+
+    private static boolean pathsEqual(String left, String right) {
+        if (left == null || right == null) {
+            return left == null && right == null;
+        }
+        URI leftUri = new Path(left).toUri();
+        URI rightUri = new Path(right).toUri();
+        if (!sameFileSystem(leftUri, rightUri)) {
+            return false;
+        }
+        return 
normalizePath(leftUri.getPath()).equals(normalizePath(rightUri.getPath()));
+    }
+
+    @VisibleForTesting
+    void deleteTargetPathContents(String targetPath, String excludedChildPath) 
{
+        Set<String> dirs = new HashSet<>();
+        Status status = fs.listDirectories(targetPath, dirs);
+        if (!status.ok() && 
!Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
+            throw new RuntimeException(
+                    "Failed to list directories under " + targetPath + ":" + 
status.getErrMsg());
+        }
+        for (String dir : dirs) {
+            if (excludedChildPath != null && pathsEqual(dir, 
excludedChildPath)) {
+                continue;
+            }
+            Status deleteStatus = wrapperDeleteDirWithProfileSummary(dir);
+            if (!deleteStatus.ok() && 
!Status.ErrCode.NOT_FOUND.equals(deleteStatus.getErrCode())) {
+                throw new RuntimeException("Failed to delete directory " + dir 
+ ":" + deleteStatus.getErrMsg());
+            }
+        }
+
+        List<RemoteFile> files = new ArrayList<>();
+        status = fs.listFiles(targetPath, false, files);
+        if (!status.ok() && 
!Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
+            throw new RuntimeException(
+                    "Failed to list files under " + targetPath + ":" + 
status.getErrMsg());
+        }
+        for (RemoteFile file : files) {
+            Status deleteStatus = 
wrapperDeleteWithProfileSummary(file.getPath().toString());
+            if (!deleteStatus.ok() && 
!Status.ErrCode.NOT_FOUND.equals(deleteStatus.getErrCode())) {
+                throw new RuntimeException("Failed to delete file " + 
file.getPath() + ":" + deleteStatus.getErrMsg());
+            }
+        }
+    }
+
+    @VisibleForTesting
+    void ensureDirectory(String path) {
+        Status status = fs.makeDir(path);
+        if (!status.ok()) {
+            throw new RuntimeException("Failed to create directory " + path + 
":" + status.getErrMsg());
+        }
+    }
+
     public Status wrapperRenameDirWithProfileSummary(String origFilePath,
             String destFilePath,
             Runnable runWhenPathNotExist) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index 22ceca331e5..a0693768edc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -48,6 +48,7 @@ import org.apache.doris.thrift.THiveSerDeProperties;
 import org.apache.doris.thrift.THiveTableSink;
 
 import com.google.common.base.Strings;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 
@@ -168,7 +169,14 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
 
     private String createTempPath(String location) {
         String user = ConnectContext.get().getCurrentUserIdentity().getUser();
-        return LocationPath.getTempWritePath(location, "/tmp/.doris_staging/" 
+ user);
+        String defaultStagingBaseDir = ".doris_staging";
+        String stagingBaseDir = targetTable.getCatalog().getCatalogProperty()
+                .getOrDefault(HMSExternalCatalog.HIVE_STAGING_DIR, 
defaultStagingBaseDir);
+        if (Strings.isNullOrEmpty(stagingBaseDir)) {
+            stagingBaseDir = defaultStagingBaseDir;
+        }
+        String stagingDir = new Path(stagingBaseDir, user).toString();
+        return LocationPath.getTempWritePath(location, stagingDir);
     }
 
     private void setCompressType(THiveTableSink tSink, TFileFormatType 
formatType) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
new file mode 100644
index 00000000000..a1f7d146650
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.fs.FileSystem;
+import org.apache.doris.fs.FileSystemProvider;
+import org.apache.doris.fs.LocalDfsFileSystem;
+import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.SwitchingFileSystem;
+import org.apache.doris.qe.ConnectContext;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class HMSTransactionPathTest {
+    private ConnectContext connectContext;
+
+    @Before
+    public void setUp() {
+        connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+    }
+
+    @After
+    public void tearDown() {
+        ConnectContext.remove();
+        connectContext = null;
+    }
+
+    @Test
+    public void testIsSubDirectory() throws Exception {
+        Assert.assertFalse(HMSTransaction.isSubDirectory(null, "/a"));
+        Assert.assertFalse(HMSTransaction.isSubDirectory("/a", null));
+        Assert.assertFalse(HMSTransaction.isSubDirectory("/a/b", "/a/b"));
+        Assert.assertFalse(HMSTransaction.isSubDirectory("/a/b", "/a/bc"));
+        Assert.assertFalse(HMSTransaction.isSubDirectory(
+                "hdfs://host1:8020/a", "hdfs://host2:8020/a/b"));
+        Assert.assertTrue(HMSTransaction.isSubDirectory(
+                "hdfs://host:8020/a/b/", "hdfs://host:8020/a/b/c/d"));
+        Assert.assertTrue(HMSTransaction.isSubDirectory("a/b", "a/b/c"));
+    }
+
+    @Test
+    public void testGetImmediateChildPath() throws Exception {
+        String parent = "hdfs://host:8020/warehouse/table";
+        String child = 
"hdfs://host:8020/warehouse/table/.doris_staging/user/uuid";
+        Assert.assertEquals(
+                "hdfs://host:8020/warehouse/table/.doris_staging",
+                HMSTransaction.getImmediateChildPath(parent, child));
+
+        String directChild = "hdfs://host:8020/warehouse/table/part=1";
+        Assert.assertEquals(
+                "hdfs://host:8020/warehouse/table/part=1",
+                HMSTransaction.getImmediateChildPath(parent, directChild));
+
+        String notSubdir = "hdfs://host:8020/warehouse/other";
+        Assert.assertNull(HMSTransaction.getImmediateChildPath(parent, 
notSubdir));
+    }
+
+    // Ensures NOT_FOUND results from list operations are treated as no-op 
cleanup.
+    @Test
+    public void testDeleteTargetPathContentsNotFoundAllowed() throws Exception 
{
+        FakeFileSystem fakeFs = new FakeFileSystem();
+        fakeFs.listDirectoriesStatus = new Status(Status.ErrCode.NOT_FOUND, 
"missing");
+        fakeFs.listFilesStatus = new Status(Status.ErrCode.NOT_FOUND, 
"missing");
+
+        HMSTransaction transaction = createTransaction(fakeFs);
+        transaction.deleteTargetPathContents(
+                "/tmp/does_not_exist", "/tmp/does_not_exist/.doris_staging");
+        Assert.assertTrue(fakeFs.deletedDirectories.isEmpty());
+        Assert.assertTrue(fakeFs.deletedFiles.isEmpty());
+    }
+
+    // Verifies listDirectories failures surface as runtime errors.
+    @Test
+    public void testDeleteTargetPathContentsListError() throws Exception {
+        FakeFileSystem fakeFs = new FakeFileSystem();
+        fakeFs.listDirectoriesStatus = new Status(Status.ErrCode.COMMON_ERROR, 
"list failed");
+
+        HMSTransaction transaction = createTransaction(fakeFs);
+        Assert.assertThrows(RuntimeException.class, () -> 
transaction.deleteTargetPathContents(
+                "/tmp/target", "/tmp/target/.doris_staging"));
+    }
+
+    @Test
+    public void testEnsureDirectorySuccess() throws Exception {
+        LocalDfsFileSystem localFs = new LocalDfsFileSystem();
+        HMSTransaction transaction = createTransaction(localFs);
+
+        java.nio.file.Path dir = 
Files.createTempDirectory("hms_tx_ensure_").resolve("nested");
+        transaction.ensureDirectory(dir.toString());
+
+        Assert.assertTrue(Files.exists(dir));
+    }
+
+    @Test
+    public void testEnsureDirectoryError() throws Exception {
+        FakeFileSystem fakeFs = new FakeFileSystem();
+        fakeFs.makeDirStatus = new Status(Status.ErrCode.COMMON_ERROR, "mkdir 
failed");
+
+        HMSTransaction transaction = createTransaction(fakeFs);
+        Assert.assertThrows(RuntimeException.class, () -> 
transaction.ensureDirectory("/tmp/target"));
+    }
+
+    // Verifies the staging-under-target flow:
+    // 1) Detect write path nested under target.
+    // 2) Compute the immediate staging root under target.
+    // 3) Delete target contents while preserving the staging root.
+    // 4) Ensure the target directory exists after cleanup.
+    @Test
+    public void testDeleteTargetPathContentsSkipsExcludedDir() throws 
Exception {
+        LocalDfsFileSystem localFs = new LocalDfsFileSystem();
+        HMSTransaction transaction = createTransaction(localFs);
+
+        java.nio.file.Path targetDir = 
Files.createTempDirectory("hms_tx_path_test_");
+        java.nio.file.Path stagingDir = targetDir.resolve(".doris_staging");
+        java.nio.file.Path writeDir = stagingDir.resolve("user/uuid");
+        java.nio.file.Path stagingFile = stagingDir.resolve("staging.tmp");
+        java.nio.file.Path otherDir = targetDir.resolve("part=1");
+        java.nio.file.Path otherFile = targetDir.resolve("data.txt");
+
+        Files.createDirectories(stagingDir);
+        Files.createDirectories(writeDir);
+        Files.createFile(stagingFile);
+        Files.createDirectories(otherDir);
+        Files.createFile(otherFile);
+
+        String targetPath = targetDir.toString();
+        String writePath = writeDir.toString();
+        Assert.assertTrue(HMSTransaction.isSubDirectory(targetPath, 
writePath));
+        String stagingRoot = HMSTransaction.getImmediateChildPath(targetPath, 
writePath);
+        transaction.deleteTargetPathContents(targetPath, stagingRoot);
+        transaction.ensureDirectory(targetPath);
+
+        Assert.assertTrue(Files.exists(stagingDir));
+        Assert.assertTrue(Files.exists(stagingFile));
+        Assert.assertFalse(Files.exists(otherDir));
+        Assert.assertFalse(Files.exists(otherFile));
+    }
+
+    private static HMSTransaction createTransaction(FileSystem delegate) {
+        SwitchingFileSystem switchingFs = new 
TestSwitchingFileSystem(delegate);
+        FileSystemProvider provider = ctx -> switchingFs;
+        return new HMSTransaction(null, provider, Runnable::run);
+    }
+
+    private static class TestSwitchingFileSystem extends SwitchingFileSystem {
+        private final FileSystem delegate;
+
+        TestSwitchingFileSystem(FileSystem delegate) {
+            super(null, null);
+            this.delegate = delegate;
+        }
+
+        @Override
+        public FileSystem fileSystem(String location) {
+            return delegate;
+        }
+    }
+
+    private static class FakeFileSystem implements FileSystem {
+        private Status listDirectoriesStatus = Status.OK;
+        private Status listFilesStatus = Status.OK;
+        private Status makeDirStatus = Status.OK;
+        private Status deleteStatus = Status.OK;
+        private Status deleteDirStatus = Status.OK;
+
+        private final List<String> deletedDirectories = new ArrayList<>();
+        private final List<String> deletedFiles = new ArrayList<>();
+
+        @Override
+        public Status listDirectories(String remotePath, Set<String> result) {
+            if (!listDirectoriesStatus.ok()) {
+                return listDirectoriesStatus;
+            }
+            return Status.OK;
+        }
+
+        @Override
+        public Status listFiles(String remotePath, boolean recursive, 
List<RemoteFile> result) {
+            if (!listFilesStatus.ok()) {
+                return listFilesStatus;
+            }
+            return Status.OK;
+        }
+
+        @Override
+        public Status deleteDirectory(String dir) {
+            deletedDirectories.add(dir);
+            return deleteDirStatus;
+        }
+
+        @Override
+        public Status delete(String remotePath) {
+            deletedFiles.add(remotePath);
+            return deleteStatus;
+        }
+
+        @Override
+        public Status makeDir(String remotePath) {
+            return makeDirStatus;
+        }
+
+        @Override
+        public Status exists(String remotePath) {
+            return Status.OK;
+        }
+
+        @Override
+        public Status downloadWithFileSize(String remoteFilePath, String 
localFilePath, long fileSize) {
+            return Status.OK;
+        }
+
+        @Override
+        public Status upload(String localPath, String remotePath) {
+            return Status.OK;
+        }
+
+        @Override
+        public Status directUpload(String content, String remoteFile) {
+            return Status.OK;
+        }
+
+        @Override
+        public Status rename(String origFilePath, String destFilePath) {
+            return Status.OK;
+        }
+
+        @Override
+        public Status globList(String remotePath, List<RemoteFile> result, 
boolean fileNameOnly) {
+            return Status.OK;
+        }
+
+        @Override
+        public java.util.Map<String, String> getProperties() {
+            return null;
+        }
+    }
+}
diff --git 
a/regression-test/data/external_table_p0/hive/write/test_hive_staging_dir.out 
b/regression-test/data/external_table_p0/hive/write/test_hive_staging_dir.out
new file mode 100644
index 00000000000..021de9148ae
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/hive/write/test_hive_staging_dir.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q01 --
+1
+
+-- !q02 --
+1
+2
+
+-- !q03 --
+1
+
diff --git 
a/regression-test/suites/external_table_p0/hive/write/test_hive_staging_dir.groovy
 
b/regression-test/suites/external_table_p0/hive/write/test_hive_staging_dir.groovy
new file mode 100644
index 00000000000..087e05e0c1e
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/hive/write/test_hive_staging_dir.groovy
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.Hdfs
+import org.apache.hadoop.fs.Path
+
+import java.net.URI
+
+suite("test_hive_staging_dir", 
"p0,external,hive,external_docker,external_docker_hive") {
+    def getHiveTableLocation = { String db, String tbl ->
+        def rows = hive_docker """describe formatted `${db}`.`${tbl}`"""
+        for (def row : rows) {
+            if (row == null || row.size() < 2) {
+                continue
+            }
+            String key = row[0] == null ? "" : row[0].toString().trim()
+            if (key.equalsIgnoreCase("Location") || 
key.equalsIgnoreCase("Location:")) {
+                def value = row[1]
+                if (value != null && !value.toString().trim().isEmpty()) {
+                    return value.toString().trim()
+                }
+            }
+        }
+        throw new RuntimeException("Failed to find location for ${db}.${tbl}")
+    }
+
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable Hive test.")
+        return
+    }
+
+    for (String hivePrefix : ["hive3"]) {
+        setHivePrefix(hivePrefix)
+        String catalogName = "test_${hivePrefix}_staging_dir"
+        String dbName = "write_test"
+        String tableRel = "staging_dir_rel_${hivePrefix}"
+        String tableAbs = "staging_dir_abs_${hivePrefix}"
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String hmsPort = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+        String hdfsPort = context.config.otherConfigs.get(hivePrefix + 
"HdfsPort")
+
+        String currentUser = (sql "select current_user()")[0][0].toString()
+        if (currentUser.contains("@")) {
+            currentUser = currentUser.substring(0, currentUser.indexOf("@"))
+        }
+        currentUser = currentUser.replaceAll("[^A-Za-z0-9_-]", "")
+        String suffix = System.currentTimeMillis().toString()
+        String stagingRelBase = ".doris_staging_test_${suffix}"
+
+        String hdfsUser = context.config.otherConfigs.get("hdfsUser")
+        if (hdfsUser == null || hdfsUser.isEmpty()) {
+            hdfsUser = "root"
+        }
+
+        try {
+            hive_docker """create database if not exists `${dbName}`"""
+            hive_docker """drop table if exists `${dbName}`.`${tableRel}`"""
+            hive_docker """drop table if exists `${dbName}`.`${tableAbs}`"""
+            hive_docker """create table `${dbName}`.`${tableRel}` (k1 int) 
stored as orc"""
+            hive_docker """create table `${dbName}`.`${tableAbs}` (k1 int) 
stored as orc"""
+
+            String relLocation = getHiveTableLocation(dbName, tableRel)
+            URI tableUri = new URI(relLocation)
+            String hdfsUri = "hdfs://${externalEnvIp}:${hdfsPort}"
+            if (tableUri.getScheme() != null && tableUri.getAuthority() != 
null) {
+                hdfsUri = tableUri.getScheme() + "://" + 
tableUri.getAuthority()
+            }
+            String stagingAbsBase = 
"${hdfsUri}/tmp/doris_staging_abs_${suffix}"
+            String stagingRelWithUser = new Path(stagingRelBase, 
currentUser).toString()
+            String stagingAbs = new Path(stagingAbsBase, 
currentUser).toString()
+
+            Hdfs hdfs = new Hdfs(hdfsUri, hdfsUser, context.config.dataPath + 
"/")
+            def fs = hdfs.fs
+            String defaultStagingRel = ".doris_staging/${currentUser}"
+            String defaultBase = new Path(relLocation, 
defaultStagingRel).toString()
+            String relBase = new Path(relLocation, 
stagingRelWithUser).toString()
+
+            fs.delete(new Path(defaultBase), true)
+            fs.delete(new Path(relBase), true)
+            fs.delete(new Path(stagingAbs), true)
+
+            sql """drop catalog if exists ${catalogName}"""
+            sql """create catalog if not exists ${catalogName} properties (
+                'type'='hms',
+                'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
+                'fs.defaultFS' = '${hdfsUri}'
+            );"""
+            sql """refresh catalog ${catalogName}"""
+            sql """use `${catalogName}`.`${dbName}`"""
+            sql """set enable_fallback_to_original_planner=false;"""
+
+            sql """insert into `${tableRel}` values (1)"""
+            order_qt_q01 """ select * from `${tableRel}`"""
+            assertTrue(fs.exists(new Path(defaultBase)),
+                    "default staging dir not created: ${defaultBase}")
+            fs.delete(new Path(defaultBase), true)
+
+            sql """alter catalog ${catalogName} set properties 
('hive.staging_dir' = '${stagingRelBase}')"""
+            sql """refresh catalog ${catalogName}"""
+            sql """insert into `${tableRel}` values (2)"""
+            order_qt_q02 """ select * from `${tableRel}`"""
+            assertTrue(fs.exists(new Path(relBase)),
+                    "relative staging dir not created: ${relBase}")
+            fs.delete(new Path(relBase), true)
+
+            sql """alter catalog ${catalogName} set properties 
('hive.staging_dir' = '${stagingAbsBase}')"""
+            sql """refresh catalog ${catalogName}"""
+            sql """insert into `${tableAbs}` values (1)"""
+            order_qt_q03 """ select * from `${tableAbs}`"""
+            assertTrue(fs.exists(new Path(stagingAbs)),
+                    "absolute staging dir not created: ${stagingAbs}")
+            fs.delete(new Path(stagingAbs), true)
+        } finally {
+            try_hive_docker """drop table if exists 
`${dbName}`.`${tableRel}`"""
+            try_hive_docker """drop table if exists 
`${dbName}`.`${tableAbs}`"""
+            try_sql """drop catalog if exists ${catalogName}"""
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to