This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 15766536326 [fix](outfile) handle delete_existing_files before 
parallel export (#61223)
15766536326 is described below

commit 15766536326d9b47828a0894f6e2df8ca1dd612f
Author: Socrates <[email protected]>
AuthorDate: Mon Mar 23 10:40:16 2026 +0800

    [fix](outfile) handle delete_existing_files before parallel export (#61223)
    
    ### What problem does this PR solve?
    
    Issue Number: N/A
    
    Related PR: https://github.com/apache/doris/pull/38400
    
    Problem Summary:
    When `select ... into outfile` uses `delete_existing_files=true`,
    parallel outfile writers can race on directory cleanup and delete files
    uploaded by other writers. This PR follows the same FE-side cleanup
    pattern used by export in #38400: remote outfile cleanup is executed
    once in FE before query execution, and the delete flag is cleared before
    sink options are sent to BE.
    
    This PR also aligns local outfile behavior with export: `file:///` does
    not support `delete_existing_files=true`, so FE rejects that combination
    during analysis instead of relying on BE-side cleanup.
    
    To reduce duplicated logic, the FE-side parent-directory cleanup used by
    export/outfile/TVF is refactored into shared `BrokerUtil` helpers.
---
 be/src/exec/operator/result_sink_operator.h        |   4 +
 be/src/exec/sink/writer/vfile_result_writer.cpp    |   4 +-
 .../org/apache/doris/analysis/OutFileClause.java   |   9 +-
 .../org/apache/doris/common/util/BrokerUtil.java   |  12 ++
 .../main/java/org/apache/doris/load/ExportMgr.java |   5 +-
 .../commands/insert/InsertIntoTVFCommand.java      |  11 +-
 .../org/apache/doris/planner/ResultFileSink.java   |   5 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |  40 ++++++-
 .../java/org/apache/doris/qe/StmtExecutorTest.java |  25 ++++
 gensrc/thrift/DataSinks.thrift                     |   4 +-
 .../suites/export_p0/test_outfile.groovy           |  14 +++
 ...t_outfile_parallel_delete_existing_files.groovy | 131 +++++++++++++++++++++
 12 files changed, 244 insertions(+), 20 deletions(-)

diff --git a/be/src/exec/operator/result_sink_operator.h 
b/be/src/exec/operator/result_sink_operator.h
index 752e6a367c9..7fdbca0b8dc 100644
--- a/be/src/exec/operator/result_sink_operator.h
+++ b/be/src/exec/operator/result_sink_operator.h
@@ -57,6 +57,9 @@ struct ResultFileOptions {
     // TODO: we should merge 
parquet_commpression_type/orc_compression_type/compression_type
     TFileCompressType::type compression_type = TFileCompressType::PLAIN;
 
+    // Deprecated compatibility flag. New FE handles outfile 
delete_existing_files in FE
+    // and clears this field before sending the result sink to BE. Keep 
reading it here
+    // only for compatibility with older FE during rolling upgrade.
     bool delete_existing_files = false;
     std::string file_suffix;
     //Bring BOM when exporting to CSV format
@@ -70,6 +73,7 @@ struct ResultFileOptions {
         line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter : 
"\n";
         max_file_size_bytes =
                 t_opt.__isset.max_file_size_bytes ? t_opt.max_file_size_bytes 
: max_file_size_bytes;
+        // Deprecated compatibility path. New FE should already have cleared 
this flag.
         delete_existing_files =
                 t_opt.__isset.delete_existing_files ? 
t_opt.delete_existing_files : false;
         file_suffix = t_opt.file_suffix;
diff --git a/be/src/exec/sink/writer/vfile_result_writer.cpp 
b/be/src/exec/sink/writer/vfile_result_writer.cpp
index d7e4149c551..c198eaa7e21 100644
--- a/be/src/exec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/exec/sink/writer/vfile_result_writer.cpp
@@ -96,7 +96,9 @@ Status VFileResultWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
         _file_opts->orc_writer_version < 1) {
         return Status::InternalError("orc writer version is less than 1.");
     }
-    // Delete existing files
+    // Deprecated compatibility path. New FE already deletes the target 
directory in FE
+    // and clears delete_existing_files before BE execution. Keep this branch 
only for
+    // requests from older FE versions during rolling upgrade.
     if (_file_opts->delete_existing_files) {
         RETURN_IF_ERROR(_delete_dir());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index d92c7311a3d..0ac88301dce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -184,6 +184,10 @@ public class OutFileClause {
         return maxFileSizeBytes;
     }
 
+    public boolean shouldDeleteExistingFiles() {
+        return deleteExistingFiles;
+    }
+
     public BrokerDesc getBrokerDesc() {
         return brokerDesc;
     }
@@ -451,7 +455,6 @@ public class OutFileClause {
                 + ", should use " + expectType + ", but the definition type is 
" + orcType);
     }
 
-
     private void analyzeForParquetFormat(List<Expr> resultExprs, List<String> 
colLabels) throws AnalysisException {
         if (this.parquetSchemas.isEmpty()) {
             genParquetColumnName(resultExprs, colLabels);
@@ -537,6 +540,9 @@ public class OutFileClause {
                         + " To enable this feature, you need to add 
`enable_delete_existing_files=true`"
                         + " in fe.conf");
             }
+            if (deleteExistingFiles && isLocalOutput) {
+                throw new AnalysisException("Local file system does not 
support delete existing files");
+            }
             copiedProps.remove(PROP_DELETE_EXISTING_FILES);
         }
 
@@ -767,4 +773,3 @@ public class OutFileClause {
         return sinkOptions;
     }
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 452cc3296e8..6e4f44add93 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -119,6 +119,18 @@ public class BrokerUtil {
         }
     }
 
+    public static void deleteParentDirectoryWithFileSystem(String path, 
BrokerDesc brokerDesc) throws UserException {
+        deleteDirectoryWithFileSystem(extractParentDirectory(path), 
brokerDesc);
+    }
+
+    public static String extractParentDirectory(String path) {
+        int lastSlash = path.lastIndexOf('/');
+        if (lastSlash >= 0) {
+            return path.substring(0, lastSlash + 1);
+        }
+        return path;
+    }
+
     public static String printBroker(String brokerName, TNetworkAddress 
address) {
         return brokerName + "[" + address.toString() + "]";
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 0a7d223799b..d08db981b7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -114,9 +114,7 @@ public class ExportMgr {
         try {
             // delete existing files
             if (Boolean.parseBoolean(job.getDeleteExistingFiles())) {
-                String fullPath = job.getExportPath();
-                BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, 
fullPath.lastIndexOf('/') + 1),
-                        job.getBrokerDesc());
+                
BrokerUtil.deleteParentDirectoryWithFileSystem(job.getExportPath(), 
job.getBrokerDesc());
             }
             // ATTN: Must add task after edit log, otherwise the job may 
finish before adding job.
             for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) {
@@ -554,4 +552,3 @@ public class ExportMgr {
         return size;
     }
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
index 4847cd028fa..fae7747b5d4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.remote.RemoteFileSystem;
@@ -176,7 +177,7 @@ public class InsertIntoTVFCommand extends Command 
implements ForwardWithSync, Ex
             throws Exception {
         String filePath = props.get("file_path");
         // Extract parent directory from prefix path: 
s3://bucket/path/to/prefix_ -> s3://bucket/path/to/
-        String parentDir = extractParentDirectory(filePath);
+        String parentDir = BrokerUtil.extractParentDirectory(filePath);
         LOG.info("TVF sink: deleting existing files in directory: {}", 
parentDir);
 
         // Copy props for building StorageProperties (exclude write-specific 
params)
@@ -198,12 +199,4 @@ public class InsertIntoTVFCommand extends Command 
implements ForwardWithSync, Ex
                     + parentDir + ": " + deleteStatus.getErrMsg());
         }
     }
-
-    private static String extractParentDirectory(String prefixPath) {
-        int lastSlash = prefixPath.lastIndexOf('/');
-        if (lastSlash >= 0) {
-            return prefixPath.substring(0, lastSlash + 1);
-        }
-        return prefixPath;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index 0ba8311097a..520e34e27c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -88,6 +88,11 @@ public class ResultFileSink extends DataSink {
         fileSinkOptions.setBrokerAddresses(Lists.newArrayList(new 
TNetworkAddress(ip, port)));
     }
 
+    public void setDeleteExistingFiles(boolean deleteExistingFiles) {
+        Preconditions.checkNotNull(fileSinkOptions);
+        fileSinkOptions.setDeleteExistingFiles(deleteExistingFiles);
+    }
+
     public void resetByDataStreamSink(DataStreamSink dataStreamSink) {
         exchNodeId = dataStreamSink.getExchNodeId();
         outputPartition = dataStreamSink.getOutputPartition();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index e3cc5ea106c..b980edbcdec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -57,6 +57,7 @@ import org.apache.doris.common.profile.Profile;
 import org.apache.doris.common.profile.ProfileManager.ProfileType;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
 import org.apache.doris.common.util.DebugUtil;
@@ -109,8 +110,10 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
 import org.apache.doris.planner.GroupCommitScanNode;
 import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ResultFileSink;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.proto.Data;
 import org.apache.doris.proto.InternalService;
@@ -1369,8 +1372,16 @@ public class StmtExecutor {
         }
 
         coordBase.setIsProfileSafeStmt(this.isProfileSafeStmt());
+        OutFileClause outFileClause = null;
+        if (isOutfileQuery) {
+            outFileClause = queryStmt.getOutFileClause();
+            Preconditions.checkState(outFileClause != null, "OUTFILE query 
must have OutFileClause");
+        }
 
         try {
+            if (outFileClause != null) {
+                deleteExistingOutfileFilesInFe(outFileClause);
+            }
             coordBase.exec();
             
profile.getSummaryProfile().setQueryScheduleFinishTime(TimeUtils.getStartTimeMs());
             updateProfile(false);
@@ -1404,8 +1415,8 @@ public class StmtExecutor {
                             sendFields(queryStmt.getColLabels(), 
queryStmt.getFieldInfos(),
                                     getReturnTypes(queryStmt));
                         } else {
-                            if 
(!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) {
-                                
outfileWriteSuccess(queryStmt.getOutFileClause());
+                            if 
(!Strings.isNullOrEmpty(outFileClause.getSuccessFileName())) {
+                                outfileWriteSuccess(outFileClause);
                             }
                             sendFields(OutFileClause.RESULT_COL_NAMES, 
OutFileClause.RESULT_COL_TYPES);
                         }
@@ -1547,6 +1558,31 @@ public class StmtExecutor {
         }
     }
 
+    private void deleteExistingOutfileFilesInFe(OutFileClause outFileClause) 
throws UserException {
+        // Handle directory cleanup once in FE so parallel outfile writers 
never race on deletion.
+        if (!outFileClause.shouldDeleteExistingFiles()) {
+            return;
+        }
+        Preconditions.checkState(outFileClause.getBrokerDesc() != null,
+                "delete_existing_files requires a remote outfile sink");
+        Preconditions.checkState(outFileClause.getBrokerDesc().storageType() 
!= StorageType.LOCAL,
+                "delete_existing_files is not supported for local outfile 
sinks");
+        
BrokerUtil.deleteParentDirectoryWithFileSystem(outFileClause.getFilePath(), 
outFileClause.getBrokerDesc());
+        clearDeleteExistingFilesInPlan();
+    }
+
+    private void clearDeleteExistingFilesInPlan() {
+        ResultFileSink resultFileSink = null;
+        for (PlanFragment fragment : planner.getFragments()) {
+            if (fragment.getSink() instanceof ResultFileSink) {
+                Preconditions.checkState(resultFileSink == null, "OUTFILE 
query should have only one ResultFileSink");
+                resultFileSink = (ResultFileSink) fragment.getSink();
+            }
+        }
+        Preconditions.checkState(resultFileSink != null, "OUTFILE query must 
have ResultFileSink");
+        resultFileSink.setDeleteExistingFiles(false);
+    }
+
     public static void syncLoadForTablets(List<List<Backend>> backendsList, 
List<Long> allTabletIds) {
         backendsList.forEach(backends -> backends.forEach(backend -> {
             if (backend.isAlive()) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 05af66313b1..3e3f4ba9d49 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -24,6 +24,9 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlSerializer;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ResultFileSink;
 import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
 import org.apache.doris.qe.ConnectContext.ConnectType;
 import org.apache.doris.utframe.TestWithFeService;
@@ -35,6 +38,8 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -276,4 +281,24 @@ public class StmtExecutorTest extends TestWithFeService {
         StmtExecutor executor = new StmtExecutor(mockCtx, stmt, false);
         executor.sendBinaryResultRow(resultSet);
     }
+
+    @Test
+    public void testClearDeleteExistingFilesInPlan() throws Exception {
+        Planner planner = Mockito.mock(Planner.class);
+        PlanFragment fragment = Mockito.mock(PlanFragment.class);
+        ResultFileSink resultFileSink = Mockito.mock(ResultFileSink.class);
+        Mockito.when(fragment.getSink()).thenReturn(resultFileSink);
+        
Mockito.when(planner.getFragments()).thenReturn(Lists.newArrayList(fragment));
+
+        StmtExecutor executor = new StmtExecutor(connectContext, "");
+        Field plannerField = StmtExecutor.class.getDeclaredField("planner");
+        plannerField.setAccessible(true);
+        plannerField.set(executor, planner);
+
+        Method clearMethod = 
StmtExecutor.class.getDeclaredMethod("clearDeleteExistingFilesInPlan");
+        clearMethod.setAccessible(true);
+        clearMethod.invoke(executor);
+
+        Mockito.verify(resultFileSink).setDeleteExistingFiles(false);
+    }
 }
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 02b064a4aa1..9cb9c0925b6 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -132,7 +132,7 @@ struct TResultFileSinkOptions {
     14: optional TParquetVersion parquet_version
     15: optional string orc_schema
 
-    16: optional bool delete_existing_files;
+    16: optional bool delete_existing_files; // deprecated: FE now handles 
outfile cleanup and clears this flag before BE execution; kept for 
compatibility with older FE
     17: optional string file_suffix;
     18: optional bool with_bom;
 
@@ -480,7 +480,7 @@ struct TTVFTableSink {
     7: optional string column_separator
     8: optional string line_delimiter
     9: optional i64 max_file_size_bytes
-    10: optional bool delete_existing_files
+    10: optional bool delete_existing_files // deprecated: FE handles TVF 
cleanup before execution and always sends false
     11: optional map<string, string> hadoop_config
     12: optional PlanNodes.TFileCompressType compression_type
     13: optional i64 backend_id              // local TVF: specify BE
diff --git a/regression-test/suites/export_p0/test_outfile.groovy 
b/regression-test/suites/export_p0/test_outfile.groovy
index f33500f5883..35c5e0b681b 100644
--- a/regression-test/suites/export_p0/test_outfile.groovy
+++ b/regression-test/suites/export_p0/test_outfile.groovy
@@ -44,11 +44,15 @@ suite("test_outfile") {
     assertEquals(response.msg, "success")
     def configJson = response.data.rows
     boolean enableOutfileToLocal = false
+    boolean enableDeleteExistingFiles = false
     for (Object conf: configJson) {
         assert conf instanceof Map
         if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"enable_outfile_to_local") {
             enableOutfileToLocal = ((Map<String, String>) 
conf).get("Value").toLowerCase() == "true"
         }
+        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"enable_delete_existing_files") {
+            enableDeleteExistingFiles = ((Map<String, String>) 
conf).get("Value").toLowerCase() == "true"
+        }
     }
     if (!enableOutfileToLocal) {
         logger.warn("Please set enable_outfile_to_local to true to run 
test_outfile")
@@ -233,4 +237,14 @@ suite("test_outfile") {
             path.delete();
         }
     }
+
+    if (enableDeleteExistingFiles) {
+        test {
+            sql """
+                SELECT 1 INTO OUTFILE 
"file://${outFile}/test_outfile_delete_existing_files_${uuid}/"
+                PROPERTIES("delete_existing_files" = "true");
+            """
+            exception "Local file system does not support delete existing 
files"
+        }
+    }
 }
diff --git 
a/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy
 
b/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy
new file mode 100644
index 00000000000..655146598a9
--- /dev/null
+++ 
b/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_outfile_parallel_delete_existing_files", "p0") {
+    StringBuilder strBuilder = new StringBuilder()
+    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
+    if 
((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true"))
 ?: false) {
+        strBuilder.append(" https://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe")
+        strBuilder.append(" --cert " + 
context.config.otherConfigs.get("trustCert")
+                + " --cacert " + context.config.otherConfigs.get("trustCACert")
+                + " --key " + context.config.otherConfigs.get("trustCAKey"))
+    } else {
+        strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe")
+    }
+
+    def process = strBuilder.toString().execute()
+    def code = process.waitFor()
+    def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())))
+    def out = process.getText()
+    logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" 
+ err)
+    assertEquals(code, 0)
+    def response = parseJson(out.trim())
+    assertEquals(response.code, 0)
+    assertEquals(response.msg, "success")
+    boolean enableDeleteExistingFiles = false
+    for (Object conf : response.data.rows) {
+        assert conf instanceof Map
+        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"enable_delete_existing_files") {
+            enableDeleteExistingFiles = ((Map<String, String>) 
conf).get("Value").toLowerCase() == "true"
+        }
+    }
+    if (!enableDeleteExistingFiles) {
+        logger.warn("Please set enable_delete_existing_files to true to run 
test_outfile_parallel_delete_existing_files")
+        return
+    }
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3Endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName")
+    String provider = getS3Provider()
+    String tableName = "test_outfile_parallel_delete_existing_files"
+    String uuid = UUID.randomUUID().toString()
+    String outFilePath = 
"${bucket}/outfile/parallel_delete_existing_files/${uuid}/exp_"
+
+    def exportToS3 = { String filterSql, boolean deleteExistingFiles ->
+        String deleteProperty = deleteExistingFiles ? 
"\"delete_existing_files\" = \"true\"," : ""
+        sql """
+            SELECT * FROM ${tableName} ${filterSql}
+            INTO OUTFILE "s3://${outFilePath}"
+            FORMAT AS csv
+            PROPERTIES (
+                ${deleteProperty}
+                "column_separator" = ",",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key" = "${sk}",
+                "s3.access_key" = "${ak}",
+                "provider" = "${provider}"
+            );
+        """
+    }
+
+    try {
+        sql """ set enable_parallel_outfile = true """
+        sql """ set parallel_pipeline_task_num = 8 """
+
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE ${tableName} (
+                id INT NOT NULL,
+                name STRING NOT NULL,
+                score INT NOT NULL
+            )
+            DUPLICATE KEY(id)
+            DISTRIBUTED BY HASH(id) BUCKETS 16
+            PROPERTIES("replication_num" = "1");
+        """
+
+        sql """
+            INSERT INTO ${tableName}
+            SELECT
+                number AS id,
+                concat('name_', cast(number AS string)) AS name,
+                cast(number % 97 AS int) AS score
+            FROM numbers("number" = "20000");
+        """
+
+        def expected = sql """ SELECT count(*), sum(id), sum(score) FROM 
${tableName}; """
+
+        exportToS3("WHERE id < 5000", false)
+        exportToS3("", true)
+
+        def actual = sql """
+            SELECT count(*), sum(id), sum(score) FROM S3(
+                "uri" = "s3://${outFilePath}*",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key" = "${sk}",
+                "s3.access_key" = "${ak}",
+                "provider" = "${provider}",
+                "format" = "csv",
+                "column_separator" = ",",
+                "csv_schema" = "id:int;name:string;score:int"
+            );
+        """
+
+        assertEquals(expected[0][0], actual[0][0])
+        assertEquals(expected[0][1], actual[0][1])
+        assertEquals(expected[0][2], actual[0][2])
+    } finally {
+        try_sql(""" set enable_parallel_outfile = false """)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to