This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 0bb6ae3a1e1 branch-4.1: [fix](outfile) handle delete_existing_files 
before parallel export #61223 (#61726)
0bb6ae3a1e1 is described below

commit 0bb6ae3a1e113e8b6e2f062b5144ea3279ee34e3
Author: Socrates <[email protected]>
AuthorDate: Thu Mar 26 09:17:04 2026 +0800

    branch-4.1: [fix](outfile) handle delete_existing_files before parallel 
export #61223 (#61726)
    
    Cherry-pick #61223 to branch-4.1
    
    ### What problem does this PR solve?
    
    - Related PR: #61223
    
    Handle `delete_existing_files=true` for remote outfile once in FE before
    parallel export, clear the delete flag before sink options are sent to
    BE, and reject `file:///` with `delete_existing_files=true` to align
    outfile behavior with export.
    
    ### Cherry-pick commit
    
    - `15766536326` - [fix](outfile) handle delete_existing_files before
    parallel export (#61223)
---
 be/src/exec/operator/result_sink_operator.h        |   4 +
 be/src/exec/sink/writer/vfile_result_writer.cpp    |   4 +-
 .../org/apache/doris/analysis/OutFileClause.java   |  10 +-
 .../org/apache/doris/common/util/BrokerUtil.java   |  12 ++
 .../main/java/org/apache/doris/load/ExportMgr.java |   5 +-
 .../commands/insert/InsertIntoTVFCommand.java      |  11 +-
 .../org/apache/doris/planner/ResultFileSink.java   |   5 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |  40 ++++++-
 .../java/org/apache/doris/qe/StmtExecutorTest.java |  25 ++++
 gensrc/thrift/DataSinks.thrift                     |   4 +-
 .../suites/export_p0/test_outfile.groovy           |  14 +++
 ...t_outfile_parallel_delete_existing_files.groovy | 131 +++++++++++++++++++++
 12 files changed, 244 insertions(+), 21 deletions(-)

diff --git a/be/src/exec/operator/result_sink_operator.h 
b/be/src/exec/operator/result_sink_operator.h
index 752e6a367c9..7fdbca0b8dc 100644
--- a/be/src/exec/operator/result_sink_operator.h
+++ b/be/src/exec/operator/result_sink_operator.h
@@ -57,6 +57,9 @@ struct ResultFileOptions {
     // TODO: we should merge 
parquet_commpression_type/orc_compression_type/compression_type
     TFileCompressType::type compression_type = TFileCompressType::PLAIN;
 
+    // Deprecated compatibility flag. New FE handles outfile 
delete_existing_files in FE
+    // and clears this field before sending the result sink to BE. Keep 
reading it here
+    // only for compatibility with older FE during rolling upgrade.
     bool delete_existing_files = false;
     std::string file_suffix;
     //Bring BOM when exporting to CSV format
@@ -70,6 +73,7 @@ struct ResultFileOptions {
         line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter : 
"\n";
         max_file_size_bytes =
                 t_opt.__isset.max_file_size_bytes ? t_opt.max_file_size_bytes 
: max_file_size_bytes;
+        // Deprecated compatibility path. New FE should already have cleared 
this flag.
         delete_existing_files =
                 t_opt.__isset.delete_existing_files ? 
t_opt.delete_existing_files : false;
         file_suffix = t_opt.file_suffix;
diff --git a/be/src/exec/sink/writer/vfile_result_writer.cpp 
b/be/src/exec/sink/writer/vfile_result_writer.cpp
index d7e4149c551..c198eaa7e21 100644
--- a/be/src/exec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/exec/sink/writer/vfile_result_writer.cpp
@@ -96,7 +96,9 @@ Status VFileResultWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
         _file_opts->orc_writer_version < 1) {
         return Status::InternalError("orc writer version is less than 1.");
     }
-    // Delete existing files
+    // Deprecated compatibility path. New FE already deletes the target 
directory in FE
+    // and clears delete_existing_files before BE execution. Keep this branch 
only for
+    // requests from older FE versions during rolling upgrade.
     if (_file_opts->delete_existing_files) {
         RETURN_IF_ERROR(_delete_dir());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index cc7ecf52c4e..c0fc647fd43 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -187,6 +187,10 @@ public class OutFileClause {
         return maxFileSizeBytes;
     }
 
+    public boolean shouldDeleteExistingFiles() {
+        return deleteExistingFiles;
+    }
+
     public BrokerDesc getBrokerDesc() {
         return brokerDesc;
     }
@@ -454,7 +458,6 @@ public class OutFileClause {
                 + ", should use " + expectType + ", but the definition type is 
" + orcType);
     }
 
-
     private void analyzeForParquetFormat(List<Expr> resultExprs, List<String> 
colLabels) throws AnalysisException {
         if (this.parquetSchemas.isEmpty()) {
             genParquetColumnName(resultExprs, colLabels);
@@ -540,6 +543,9 @@ public class OutFileClause {
                         + " To enable this feature, you need to add 
`enable_delete_existing_files=true`"
                         + " in fe.conf");
             }
+            if (deleteExistingFiles && isLocalOutput) {
+                throw new AnalysisException("Local file system does not 
support delete existing files");
+            }
             copiedProps.remove(PROP_DELETE_EXISTING_FILES);
         }
 
@@ -769,5 +775,3 @@ public class OutFileClause {
         return sinkOptions;
     }
 }
-
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index e5b7c1a7f45..e44d1d9ea4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -119,6 +119,18 @@ public class BrokerUtil {
         }
     }
 
+    public static void deleteParentDirectoryWithFileSystem(String path, 
BrokerDesc brokerDesc) throws UserException {
+        deleteDirectoryWithFileSystem(extractParentDirectory(path), 
brokerDesc);
+    }
+
+    public static String extractParentDirectory(String path) {
+        int lastSlash = path.lastIndexOf('/');
+        if (lastSlash >= 0) {
+            return path.substring(0, lastSlash + 1);
+        }
+        return path;
+    }
+
     public static String printBroker(String brokerName, TNetworkAddress 
address) {
         return brokerName + "[" + address.toString() + "]";
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 8de5a1c95c6..c2b1be2a230 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -114,9 +114,7 @@ public class ExportMgr {
         try {
             // delete existing files
             if (Boolean.parseBoolean(job.getDeleteExistingFiles())) {
-                String fullPath = job.getExportPath();
-                BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, 
fullPath.lastIndexOf('/') + 1),
-                        job.getBrokerDesc());
+                
BrokerUtil.deleteParentDirectoryWithFileSystem(job.getExportPath(), 
job.getBrokerDesc());
             }
             // ATTN: Must add task after edit log, otherwise the job may 
finish before adding job.
             for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) {
@@ -554,4 +552,3 @@ public class ExportMgr {
         return size;
     }
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
index 4847cd028fa..fae7747b5d4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.remote.RemoteFileSystem;
@@ -176,7 +177,7 @@ public class InsertIntoTVFCommand extends Command 
implements ForwardWithSync, Ex
             throws Exception {
         String filePath = props.get("file_path");
         // Extract parent directory from prefix path: 
s3://bucket/path/to/prefix_ -> s3://bucket/path/to/
-        String parentDir = extractParentDirectory(filePath);
+        String parentDir = BrokerUtil.extractParentDirectory(filePath);
         LOG.info("TVF sink: deleting existing files in directory: {}", 
parentDir);
 
         // Copy props for building StorageProperties (exclude write-specific 
params)
@@ -198,12 +199,4 @@ public class InsertIntoTVFCommand extends Command 
implements ForwardWithSync, Ex
                     + parentDir + ": " + deleteStatus.getErrMsg());
         }
     }
-
-    private static String extractParentDirectory(String prefixPath) {
-        int lastSlash = prefixPath.lastIndexOf('/');
-        if (lastSlash >= 0) {
-            return prefixPath.substring(0, lastSlash + 1);
-        }
-        return prefixPath;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index 0ba8311097a..520e34e27c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -88,6 +88,11 @@ public class ResultFileSink extends DataSink {
         fileSinkOptions.setBrokerAddresses(Lists.newArrayList(new 
TNetworkAddress(ip, port)));
     }
 
+    public void setDeleteExistingFiles(boolean deleteExistingFiles) {
+        Preconditions.checkNotNull(fileSinkOptions);
+        fileSinkOptions.setDeleteExistingFiles(deleteExistingFiles);
+    }
+
     public void resetByDataStreamSink(DataStreamSink dataStreamSink) {
         exchNodeId = dataStreamSink.getExchNodeId();
         outputPartition = dataStreamSink.getOutputPartition();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 1a7d4df0c49..b39d5b0d62b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -54,6 +54,7 @@ import org.apache.doris.common.profile.Profile;
 import org.apache.doris.common.profile.ProfileManager.ProfileType;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
 import org.apache.doris.common.util.DebugUtil;
@@ -105,8 +106,10 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
 import org.apache.doris.planner.GroupCommitScanNode;
 import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ResultFileSink;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.proto.Data;
 import org.apache.doris.proto.InternalService;
@@ -1298,8 +1301,16 @@ public class StmtExecutor {
         }
 
         coordBase.setIsProfileSafeStmt(this.isProfileSafeStmt());
+        OutFileClause outFileClause = null;
+        if (isOutfileQuery) {
+            outFileClause = queryStmt.getOutFileClause();
+            Preconditions.checkState(outFileClause != null, "OUTFILE query 
must have OutFileClause");
+        }
 
         try {
+            if (outFileClause != null) {
+                deleteExistingOutfileFilesInFe(outFileClause);
+            }
             coordBase.exec();
             
profile.getSummaryProfile().setQueryScheduleFinishTime(TimeUtils.getStartTimeMs());
             updateProfile(false);
@@ -1339,8 +1350,8 @@ public class StmtExecutor {
                             sendFields(queryStmt.getColLabels(), 
queryStmt.getFieldInfos(),
                                     getReturnTypes(queryStmt));
                         } else {
-                            if 
(!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) {
-                                
outfileWriteSuccess(queryStmt.getOutFileClause());
+                            if 
(!Strings.isNullOrEmpty(outFileClause.getSuccessFileName())) {
+                                outfileWriteSuccess(outFileClause);
                             }
                             sendFields(OutFileClause.RESULT_COL_NAMES, 
OutFileClause.RESULT_COL_TYPES);
                         }
@@ -1482,6 +1493,31 @@ public class StmtExecutor {
         }
     }
 
+    private void deleteExistingOutfileFilesInFe(OutFileClause outFileClause) 
throws UserException {
+        // Handle directory cleanup once in FE so parallel outfile writers 
never race on deletion.
+        if (!outFileClause.shouldDeleteExistingFiles()) {
+            return;
+        }
+        Preconditions.checkState(outFileClause.getBrokerDesc() != null,
+                "delete_existing_files requires a remote outfile sink");
+        Preconditions.checkState(outFileClause.getBrokerDesc().storageType() 
!= StorageType.LOCAL,
+                "delete_existing_files is not supported for local outfile 
sinks");
+        
BrokerUtil.deleteParentDirectoryWithFileSystem(outFileClause.getFilePath(), 
outFileClause.getBrokerDesc());
+        clearDeleteExistingFilesInPlan();
+    }
+
+    private void clearDeleteExistingFilesInPlan() {
+        ResultFileSink resultFileSink = null;
+        for (PlanFragment fragment : planner.getFragments()) {
+            if (fragment.getSink() instanceof ResultFileSink) {
+                Preconditions.checkState(resultFileSink == null, "OUTFILE 
query should have only one ResultFileSink");
+                resultFileSink = (ResultFileSink) fragment.getSink();
+            }
+        }
+        Preconditions.checkState(resultFileSink != null, "OUTFILE query must 
have ResultFileSink");
+        resultFileSink.setDeleteExistingFiles(false);
+    }
+
     public static void syncLoadForTablets(List<List<Backend>> backendsList, 
List<Long> allTabletIds) {
         backendsList.forEach(backends -> backends.forEach(backend -> {
             if (backend.isAlive()) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 05af66313b1..3e3f4ba9d49 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -24,6 +24,9 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlSerializer;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ResultFileSink;
 import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
 import org.apache.doris.qe.ConnectContext.ConnectType;
 import org.apache.doris.utframe.TestWithFeService;
@@ -35,6 +38,8 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -276,4 +281,24 @@ public class StmtExecutorTest extends TestWithFeService {
         StmtExecutor executor = new StmtExecutor(mockCtx, stmt, false);
         executor.sendBinaryResultRow(resultSet);
     }
+
+    @Test
+    public void testClearDeleteExistingFilesInPlan() throws Exception {
+        Planner planner = Mockito.mock(Planner.class);
+        PlanFragment fragment = Mockito.mock(PlanFragment.class);
+        ResultFileSink resultFileSink = Mockito.mock(ResultFileSink.class);
+        Mockito.when(fragment.getSink()).thenReturn(resultFileSink);
+        
Mockito.when(planner.getFragments()).thenReturn(Lists.newArrayList(fragment));
+
+        StmtExecutor executor = new StmtExecutor(connectContext, "");
+        Field plannerField = StmtExecutor.class.getDeclaredField("planner");
+        plannerField.setAccessible(true);
+        plannerField.set(executor, planner);
+
+        Method clearMethod = 
StmtExecutor.class.getDeclaredMethod("clearDeleteExistingFilesInPlan");
+        clearMethod.setAccessible(true);
+        clearMethod.invoke(executor);
+
+        Mockito.verify(resultFileSink).setDeleteExistingFiles(false);
+    }
 }
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 8add59d47af..842765273e0 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -132,7 +132,7 @@ struct TResultFileSinkOptions {
     14: optional TParquetVersion parquet_version
     15: optional string orc_schema
 
-    16: optional bool delete_existing_files;
+    16: optional bool delete_existing_files; // deprecated: FE now handles 
outfile cleanup and clears this flag before BE execution; kept for 
compatibility with older FE
     17: optional string file_suffix;
     18: optional bool with_bom;
 
@@ -480,7 +480,7 @@ struct TTVFTableSink {
     7: optional string column_separator
     8: optional string line_delimiter
     9: optional i64 max_file_size_bytes
-    10: optional bool delete_existing_files
+    10: optional bool delete_existing_files // deprecated: FE handles TVF 
cleanup before execution and always sends false
     11: optional map<string, string> hadoop_config
     12: optional PlanNodes.TFileCompressType compression_type
     13: optional i64 backend_id              // local TVF: specify BE
diff --git a/regression-test/suites/export_p0/test_outfile.groovy 
b/regression-test/suites/export_p0/test_outfile.groovy
index f33500f5883..35c5e0b681b 100644
--- a/regression-test/suites/export_p0/test_outfile.groovy
+++ b/regression-test/suites/export_p0/test_outfile.groovy
@@ -44,11 +44,15 @@ suite("test_outfile") {
     assertEquals(response.msg, "success")
     def configJson = response.data.rows
     boolean enableOutfileToLocal = false
+    boolean enableDeleteExistingFiles = false
     for (Object conf: configJson) {
         assert conf instanceof Map
         if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"enable_outfile_to_local") {
             enableOutfileToLocal = ((Map<String, String>) 
conf).get("Value").toLowerCase() == "true"
         }
+        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"enable_delete_existing_files") {
+            enableDeleteExistingFiles = ((Map<String, String>) 
conf).get("Value").toLowerCase() == "true"
+        }
     }
     if (!enableOutfileToLocal) {
         logger.warn("Please set enable_outfile_to_local to true to run 
test_outfile")
@@ -233,4 +237,14 @@ suite("test_outfile") {
             path.delete();
         }
     }
+
+    if (enableDeleteExistingFiles) {
+        test {
+            sql """
+                SELECT 1 INTO OUTFILE 
"file://${outFile}/test_outfile_delete_existing_files_${uuid}/"
+                PROPERTIES("delete_existing_files" = "true");
+            """
+            exception "Local file system does not support delete existing 
files"
+        }
+    }
 }
diff --git 
a/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy
 
b/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy
new file mode 100644
index 00000000000..655146598a9
--- /dev/null
+++ 
b/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_outfile_parallel_delete_existing_files", "p0") {
+    StringBuilder strBuilder = new StringBuilder()
+    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
+    if 
((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true"))
 ?: false) {
+        strBuilder.append(" https://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe")
+        strBuilder.append(" --cert " + 
context.config.otherConfigs.get("trustCert")
+                + " --cacert " + context.config.otherConfigs.get("trustCACert")
+                + " --key " + context.config.otherConfigs.get("trustCAKey"))
+    } else {
+        strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe")
+    }
+
+    def process = strBuilder.toString().execute()
+    def code = process.waitFor()
+    def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())))
+    def out = process.getText()
+    logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" 
+ err)
+    assertEquals(code, 0)
+    def response = parseJson(out.trim())
+    assertEquals(response.code, 0)
+    assertEquals(response.msg, "success")
+    boolean enableDeleteExistingFiles = false
+    for (Object conf : response.data.rows) {
+        assert conf instanceof Map
+        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"enable_delete_existing_files") {
+            enableDeleteExistingFiles = ((Map<String, String>) 
conf).get("Value").toLowerCase() == "true"
+        }
+    }
+    if (!enableDeleteExistingFiles) {
+        logger.warn("Please set enable_delete_existing_files to true to run 
test_outfile_parallel_delete_existing_files")
+        return
+    }
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3Endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName")
+    String provider = getS3Provider()
+    String tableName = "test_outfile_parallel_delete_existing_files"
+    String uuid = UUID.randomUUID().toString()
+    String outFilePath = 
"${bucket}/outfile/parallel_delete_existing_files/${uuid}/exp_"
+
+    def exportToS3 = { String filterSql, boolean deleteExistingFiles ->
+        String deleteProperty = deleteExistingFiles ? 
"\"delete_existing_files\" = \"true\"," : ""
+        sql """
+            SELECT * FROM ${tableName} ${filterSql}
+            INTO OUTFILE "s3://${outFilePath}"
+            FORMAT AS csv
+            PROPERTIES (
+                ${deleteProperty}
+                "column_separator" = ",",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key" = "${sk}",
+                "s3.access_key" = "${ak}",
+                "provider" = "${provider}"
+            );
+        """
+    }
+
+    try {
+        sql """ set enable_parallel_outfile = true """
+        sql """ set parallel_pipeline_task_num = 8 """
+
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE ${tableName} (
+                id INT NOT NULL,
+                name STRING NOT NULL,
+                score INT NOT NULL
+            )
+            DUPLICATE KEY(id)
+            DISTRIBUTED BY HASH(id) BUCKETS 16
+            PROPERTIES("replication_num" = "1");
+        """
+
+        sql """
+            INSERT INTO ${tableName}
+            SELECT
+                number AS id,
+                concat('name_', cast(number AS string)) AS name,
+                cast(number % 97 AS int) AS score
+            FROM numbers("number" = "20000");
+        """
+
+        def expected = sql """ SELECT count(*), sum(id), sum(score) FROM 
${tableName}; """
+
+        exportToS3("WHERE id < 5000", false)
+        exportToS3("", true)
+
+        def actual = sql """
+            SELECT count(*), sum(id), sum(score) FROM S3(
+                "uri" = "s3://${outFilePath}*",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key" = "${sk}",
+                "s3.access_key" = "${ak}",
+                "provider" = "${provider}",
+                "format" = "csv",
+                "column_separator" = ",",
+                "csv_schema" = "id:int;name:string;score:int"
+            );
+        """
+
+        assertEquals(expected[0][0], actual[0][0])
+        assertEquals(expected[0][1], actual[0][1])
+        assertEquals(expected[0][2], actual[0][2])
+    } finally {
+        try_sql(""" set enable_parallel_outfile = false """)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to