This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new e97cdc615f8 branch-3.1 [fix](Export) change the export logical and 
more info in show export(#48022 #48144 #48488 ) (#51937)
e97cdc615f8 is described below

commit e97cdc615f861056e39d807a2c980ee20d28aead
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Jun 19 18:17:47 2025 +0800

    branch-3.1 [fix](Export) change the export logical and more info in show 
export(#48022 #48144 #48488 ) (#51937)
    
    bp #48022 #48144 #48488
    
    ---------
    
    Co-authored-by: Tiewei Fang <[email protected]>
---
 be/src/vec/sink/writer/vfile_result_writer.cpp     |  78 ++++++++---
 be/src/vec/sink/writer/vfile_result_writer.h       |   2 +
 .../main/java/org/apache/doris/common/Config.java  |   5 -
 .../org/apache/doris/analysis/OutFileClause.java   |   6 +
 .../main/java/org/apache/doris/load/ExportJob.java | 142 ++++++++-------------
 .../org/apache/doris/load/ExportTaskExecutor.java  |  51 ++++----
 .../java/org/apache/doris/load/OutfileInfo.java    |   6 +
 .../org/apache/doris/planner/ResultFileSink.java   |  45 +++----
 .../java/org/apache/doris/qe/ConnectContext.java   |   8 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   2 +-
 .../analysis/ExportToOutfileLogicalPlanTest.java   | 127 +++++++-----------
 .../data/export_p0/outfile/test_outfile_result.out | Bin 0 -> 225 bytes
 .../export_p0/outfile/test_outfile_result.groovy   |  89 +++++++++++++
 13 files changed, 299 insertions(+), 262 deletions(-)

diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 5161cf2928d..bb6a54c4693 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -17,6 +17,7 @@
 
 #include "vfile_result_writer.h"
 
+#include <fmt/format.h>
 #include <gen_cpp/Data_types.h>
 #include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/PaloInternalService_types.h>
@@ -39,6 +40,7 @@
 #include "io/hdfs_builder.h"
 #include "pipeline/exec/result_sink_operator.h"
 #include "runtime/buffer_control_block.h"
+#include "runtime/decimalv2_value.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
 #include "runtime/large_int_value.h"
@@ -60,6 +62,8 @@
 
 namespace doris::vectorized {
 
+static double nons_to_second = 1000000000.00;
+
 VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs,
                                      std::shared_ptr<pipeline::Dependency> dep,
                                      std::shared_ptr<pipeline::Dependency> 
fin_dep)
@@ -271,19 +275,32 @@ Status VFileResultWriter::_send_result() {
     _is_result_sent = true;
 
     // The final stat result include:
-    // FileNumber, TotalRows, FileSize and URL
-    // The type of these field should be consistent with types defined
-    // in OutFileClause.java of FE.
+    // | FileNumber      | Int     |
+    // | TotalRows       | Bigint  |
+    // | FileSize        | Bigint  |
+    // | URL             | Varchar |
+    // | WriteTimeSec    | Varchar |
+    // | WriteSpeedKB    | Varchar |
+    // The type of these field should be consistent with types defined in 
OutFileClause.java of FE.
     MysqlRowBuffer<> row_buffer;
-    row_buffer.push_int(_file_idx);                         // file number
-    row_buffer.push_bigint(_written_rows_counter->value()); // total rows
-    row_buffer.push_bigint(_written_data_bytes->value());   // file size
+    row_buffer.push_int(_file_idx);                         // FileNumber
+    row_buffer.push_bigint(_written_rows_counter->value()); // TotalRows
+    row_buffer.push_bigint(_written_data_bytes->value());   // FileSize
     std::string file_url;
     _get_file_url(&file_url);
     std::stringstream ss;
     ss << file_url << "*";
     file_url = ss.str();
-    row_buffer.push_string(file_url.c_str(), file_url.length()); // url
+    row_buffer.push_string(file_url.c_str(), file_url.length()); // URL
+    double write_time = _file_write_timer->value() / nons_to_second;
+    std::string formatted_write_time = fmt::format("{:.3f}", write_time);
+    row_buffer.push_string(formatted_write_time.c_str(),
+                           formatted_write_time.length()); // WriteTimeSec
+
+    double write_speed = _get_write_speed(_written_data_bytes->value(), 
_file_write_timer->value());
+    std::string formatted_write_speed = fmt::format("{:.2f}", write_speed);
+    row_buffer.push_string(formatted_write_speed.c_str(),
+                           formatted_write_speed.length()); // WriteSpeedKB
 
     std::unique_ptr<TFetchDataResult> result = 
std::make_unique<TFetchDataResult>();
     result->result_batch.rows.resize(1);
@@ -295,6 +312,8 @@ Status VFileResultWriter::_send_result() {
             std::make_pair("TotalRows", 
std::to_string(_written_rows_counter->value())));
     attach_infos.insert(std::make_pair("FileSize", 
std::to_string(_written_data_bytes->value())));
     attach_infos.insert(std::make_pair("URL", file_url));
+    attach_infos.insert(std::make_pair("WriteTimeSec", formatted_write_time));
+    attach_infos.insert(std::make_pair("WriteSpeedKB", formatted_write_speed));
 
     result->result_batch.__set_attached_infos(attach_infos);
     RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result),
@@ -309,20 +328,29 @@ Status VFileResultWriter::_fill_result_block() {
     _is_result_sent = true;
 
 #ifndef INSERT_TO_COLUMN
-#define INSERT_TO_COLUMN                                                       
     \
-    if (i == 0) {                                                              
     \
-        column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0);     
     \
-    } else if (i == 1) {                                                       
     \
-        int64_t written_rows = _written_rows_counter->value();                 
     \
-        column->insert_data(reinterpret_cast<const char*>(&written_rows), 0);  
     \
-    } else if (i == 2) {                                                       
     \
-        int64_t written_data_bytes = _written_data_bytes->value();             
     \
-        column->insert_data(reinterpret_cast<const 
char*>(&written_data_bytes), 0); \
-    } else if (i == 3) {                                                       
     \
-        std::string file_url;                                                  
     \
-        static_cast<void>(_get_file_url(&file_url));                           
     \
-        column->insert_data(file_url.c_str(), file_url.size());                
     \
-    }                                                                          
     \
+#define INSERT_TO_COLUMN                                                       
             \
+    if (i == 0) {                                                              
             \
+        column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0);     
             \
+    } else if (i == 1) {                                                       
             \
+        int64_t written_rows = _written_rows_counter->value();                 
             \
+        column->insert_data(reinterpret_cast<const char*>(&written_rows), 0);  
             \
+    } else if (i == 2) {                                                       
             \
+        int64_t written_data_bytes = _written_data_bytes->value();             
             \
+        column->insert_data(reinterpret_cast<const 
char*>(&written_data_bytes), 0);         \
+    } else if (i == 3) {                                                       
             \
+        std::string file_url;                                                  
             \
+        static_cast<void>(_get_file_url(&file_url));                           
             \
+        column->insert_data(file_url.c_str(), file_url.size());                
             \
+    } else if (i == 4) {                                                       
             \
+        double write_time = _file_write_timer->value() / nons_to_second;       
             \
+        std::string formatted_write_time = fmt::format("{:.3f}", write_time);  
             \
+        column->insert_data(formatted_write_time.c_str(), 
formatted_write_time.size());     \
+    } else if (i == 5) {                                                       
             \
+        double write_speed =                                                   
             \
+                _get_write_speed(_written_data_bytes->value(), 
_file_write_timer->value()); \
+        std::string formatted_write_speed = fmt::format("{:.2f}", 
write_speed);             \
+        column->insert_data(formatted_write_speed.c_str(), 
formatted_write_speed.size());   \
+    }                                                                          
             \
     _output_block->replace_by_position(i, std::move(column));
 #endif
 
@@ -387,6 +415,14 @@ Status VFileResultWriter::_delete_dir() {
     }
 }
 
+double VFileResultWriter::_get_write_speed(int64_t write_bytes, int64_t 
write_time) {
+    if (write_time <= 0) {
+        return 0;
+    }
+    // KB / s
+    return ((write_bytes * nons_to_second) / (write_time)) / 1024;
+}
+
 Status VFileResultWriter::close(Status exec_status) {
     Status st = exec_status;
     if (st.ok()) {
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
index bf0a5d3e9e2..8b611d7ceef 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -21,6 +21,7 @@
 #include <stddef.h>
 #include <stdint.h>
 
+#include <cstdint>
 #include <iosfwd>
 #include <memory>
 #include <string>
@@ -99,6 +100,7 @@ private:
     Status _fill_result_block();
     // delete the dir of file_path
     Status _delete_dir();
+    double _get_write_speed(int64_t write_bytes, int64_t write_time);
 
     RuntimeState* _state; // not owned, set when init
     const pipeline::ResultFileOptions* _file_opts = nullptr;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6925a2d59c1..e6eaf35b3ab 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2571,11 +2571,6 @@ public class Config extends ConfigBase {
             "The maximum parallelism allowed by Export job"})
     public static int maximum_parallelism_of_export_job = 50;
 
-    @ConfField(mutable = true, description = {
-            "ExportExecutorTask任务中一个OutFile语句允许的最大tablets数量",
-            "The maximum number of tablets allowed by an OutfileStatement in 
an ExportExecutorTask"})
-    public static int maximum_tablets_of_outfile_in_export = 10;
-
     @ConfField(mutable = true, description = {
             "是否用 mysql 的 bigint 类型来返回 Doris 的 largeint 类型",
             "Whether to use mysql's bigint type to return Doris's largeint 
type"})
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index d4a7b25ed5a..adf601da1ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -79,17 +79,23 @@ public class OutFileClause {
     public static final String TOTAL_ROWS = "TotalRows";
     public static final String FILE_SIZE = "FileSize";
     public static final String URL = "URL";
+    public static final String WRITE_TIME_SEC = "WriteTimeSec";
+    public static final String WRITE_SPEED_KB = "WriteSpeedKB";
 
     static {
         RESULT_COL_NAMES.add(FILE_NUMBER);
         RESULT_COL_NAMES.add(TOTAL_ROWS);
         RESULT_COL_NAMES.add(FILE_SIZE);
         RESULT_COL_NAMES.add(URL);
+        RESULT_COL_NAMES.add(WRITE_TIME_SEC);
+        RESULT_COL_NAMES.add(WRITE_SPEED_KB);
 
         RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.INT));
         RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT));
         RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT));
         RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
+        RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
+        RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
 
         PARQUET_REPETITION_TYPE_MAP.put("required", 
TParquetRepetitionType.REQUIRED);
         PARQUET_REPETITION_TYPE_MAP.put("repeated", 
TParquetRepetitionType.REPEATED);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index ecde36f7fd5..b9e1ca2ead3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -106,8 +106,6 @@ public class ExportJob implements Writable {
 
     private static final String BROKER_PROPERTY_PREFIXES = "broker.";
 
-    private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT = 
Config.maximum_tablets_of_outfile_in_export;
-
     public static final String CONSISTENT_NONE = "none";
     public static final String CONSISTENT_PARTITION = "partition";
 
@@ -189,10 +187,9 @@ public class ExportJob implements Writable {
     /**
      * Each parallel has an associated Outfile list
      * which are organized into a two-dimensional list.
-     * Therefore, we can access the selectStmtListPerParallel
      * to get the outfile logical plans list responsible for each parallel 
task.
      */
-    private List<List<StatementBase>> selectStmtListPerParallel = 
Lists.newArrayList();
+    private List<Optional<StatementBase>> selectStmtPerParallel = 
Lists.newArrayList();
 
     private List<String> exportColumns = Lists.newArrayList();
 
@@ -255,8 +252,6 @@ public class ExportJob implements Writable {
      * according to the 'parallelism' set by the user.
      * The tablets which will be exported by this ExportJob are divided into 
'parallelism' copies,
      * and each ExportTaskExecutor is responsible for a list of tablets.
-     * The tablets responsible for an ExportTaskExecutor will be assigned to 
multiple OutfileStmt
-     * according to the 'TABLETS_NUM_PER_OUTFILE_IN_EXPORT'.
      *
      * @throws UserException
      */
@@ -282,14 +277,10 @@ public class ExportJob implements Writable {
 
             // debug LOG output
             if (LOG.isDebugEnabled()) {
-                for (int i = 0; i < selectStmtListPerParallel.size(); ++i) {
+                for (int i = 0; i < selectStmtPerParallel.size(); ++i) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("ExportTaskExecutor {} is responsible for 
outfile:", i);
-                    }
-                    for (StatementBase outfile : 
selectStmtListPerParallel.get(i)) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("outfile sql: [{}]", outfile.toSql());
-                        }
+                        LOG.debug("outfile sql: [{}]", 
selectStmtPerParallel.get(i).get().toSql());
                     }
                 }
             }
@@ -312,21 +303,16 @@ public class ExportJob implements Writable {
         }
 
         // get all tablets
-        List<List<List<Long>>> tabletsListPerParallel = splitTablets();
+        List<List<Long>> tabletsListPerParallel = splitTablets();
 
         // Each Outfile clause responsible for 
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT tablets
-        for (List<List<Long>> tabletsList : tabletsListPerParallel) {
-            List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
-            for (List<Long> tabletIds : tabletsList) {
-                // generate LogicalPlan
-                LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName, 
tabletIds,
-                        this.partitionNames, selectLists);
-                // generate  LogicalPlanAdapter
-                StatementBase statementBase = generateLogicalPlanAdapter(plan);
-
-                logicalPlanAdapters.add(statementBase);
-            }
-            selectStmtListPerParallel.add(logicalPlanAdapters);
+        for (List<Long> tabletsList : tabletsListPerParallel) {
+            // generate LogicalPlan
+            LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName, 
tabletsList,
+                    this.partitionNames, selectLists);
+            // generate  LogicalPlanAdapter
+            StatementBase statementBase = generateLogicalPlanAdapter(plan);
+            selectStmtPerParallel.add(Optional.of(statementBase));
         }
     }
 
@@ -352,16 +338,13 @@ public class ExportJob implements Writable {
             });
         }
 
-        List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
-
         // generate LogicalPlan
         LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName, 
ImmutableList.of(),
                 ImmutableList.of(), selectLists);
         // generate  LogicalPlanAdapter
         StatementBase statementBase = generateLogicalPlanAdapter(plan);
 
-        logicalPlanAdapters.add(statementBase);
-        selectStmtListPerParallel.add(logicalPlanAdapters);
+        selectStmtPerParallel.add(Optional.of(statementBase));
     }
 
     private LogicalPlan generateOneLogicalPlan(List<String> 
qualifiedTableName, List<Long> tabletIds,
@@ -401,15 +384,15 @@ public class ExportJob implements Writable {
 
     private void generateExportJobExecutor() {
         jobExecutorList = Lists.newArrayList();
-        for (List<StatementBase> selectStmts : selectStmtListPerParallel) {
-            ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts, 
this);
+        for (Optional<StatementBase> selectStmt : selectStmtPerParallel) {
+            ExportTaskExecutor executor = new ExportTaskExecutor(selectStmt, 
this);
             jobExecutorList.add(executor);
         }
 
         // add empty task to make export job could be finished finally if 
jobExecutorList is empty
         // which means that export table without data
         if (jobExecutorList.isEmpty()) {
-            ExportTaskExecutor executor = new 
ExportTaskExecutor(Lists.newArrayList(), this);
+            ExportTaskExecutor executor = new 
ExportTaskExecutor(Optional.empty(), this);
             jobExecutorList.add(executor);
         }
     }
@@ -433,77 +416,59 @@ public class ExportJob implements Writable {
             }
         }
 
-        List<List<TableRef>> tableRefListPerParallel = 
getTableRefListPerParallel();
-        LOG.info("Export Job [{}] is split into {} Export Task Executor.", id, 
tableRefListPerParallel.size());
+        List<TableRef> tableRefPerParallel = getTableRefListPerParallel();
+        LOG.info("Export Job [{}] is split into {} Export Task Executor.", id, 
tableRefPerParallel.size());
 
         // debug LOG output
         if (LOG.isDebugEnabled()) {
-            for (int i = 0; i < tableRefListPerParallel.size(); i++) {
+            for (int i = 0; i < tableRefPerParallel.size(); i++) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("ExportTaskExecutor {} is responsible for 
tablets:", i);
-                }
-                for (TableRef tableRef : tableRefListPerParallel.get(i)) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Tablet id: [{}]", 
tableRef.getSampleTabletIds());
-                    }
+                    LOG.debug("Tablet id: [{}]", 
tableRefPerParallel.get(i).getSampleTabletIds());
                 }
             }
         }
 
         // generate 'select..outfile..' statement
-        for (List<TableRef> tableRefList : tableRefListPerParallel) {
-            List<StatementBase> selectStmtLists = Lists.newArrayList();
-            for (TableRef tableRef : tableRefList) {
-                List<TableRef> tmpTableRefList = Lists.newArrayList(tableRef);
-                FromClause fromClause = new FromClause(tmpTableRefList);
-                // generate outfile clause
-                OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
-                SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
-                        null, null, LimitElement.NO_LIMIT);
-                selectStmt.setOutFileClause(outfile);
-                selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 
0));
-                selectStmtLists.add(selectStmt);
-            }
-            selectStmtListPerParallel.add(selectStmtLists);
+        for (TableRef tableReferences : tableRefPerParallel) {
+            FromClause fromClause = new 
FromClause(Lists.newArrayList(tableReferences));
+            // generate outfile clause
+            OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
+            SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
+                    null, null, LimitElement.NO_LIMIT);
+            selectStmt.setOutFileClause(outfile);
+            selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
+            selectStmtPerParallel.add(Optional.of(selectStmt));
         }
 
         // debug LOG output
         if (LOG.isDebugEnabled()) {
-            for (int i = 0; i < selectStmtListPerParallel.size(); ++i) {
+            for (int i = 0; i < selectStmtPerParallel.size(); ++i) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("ExportTaskExecutor {} is responsible for 
outfile:", i);
-                }
-                for (StatementBase outfile : selectStmtListPerParallel.get(i)) 
{
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("outfile sql: [{}]", outfile.toSql());
-                    }
+                    LOG.debug("outfile sql: [{}]", 
selectStmtPerParallel.get(i).get().toSql());
                 }
             }
         }
     }
 
-    private List<List<TableRef>> getTableRefListPerParallel() throws 
UserException {
-        List<List<List<Long>>> tabletsListPerParallel = splitTablets();
-
-        List<List<TableRef>> tableRefListPerParallel = Lists.newArrayList();
-        for (List<List<Long>> tabletsList : tabletsListPerParallel) {
-            List<TableRef> tableRefList = Lists.newArrayList();
-            for (List<Long> tablets : tabletsList) {
-                // Since export does not support the alias, here we pass the 
null value.
-                // we can not use this.tableRef.getAlias(),
-                // because the constructor of `Tableref` will convert 
this.tableRef.getAlias()
-                // into lower case when lower_case_table_names = 1
-                TableRef tblRef = new TableRef(this.tableRef.getName(), null,
-                        this.tableRef.getPartitionNames(), (ArrayList) tablets,
-                        this.tableRef.getTableSample(), 
this.tableRef.getCommonHints());
-                tableRefList.add(tblRef);
-            }
-            tableRefListPerParallel.add(tableRefList);
+    private List<TableRef> getTableRefListPerParallel() throws UserException {
+        List<List<Long>> tabletsListPerParallel = splitTablets();
+        List<TableRef> tableRefPerParallel = Lists.newArrayList();
+        for (List<Long> tabletsList : tabletsListPerParallel) {
+            // Since export does not support the alias, here we pass the null 
value.
+            // we can not use this.tableRef.getAlias(),
+            // because the constructor of `Tableref` will convert 
this.tableRef.getAlias()
+            // into lower case when lower_case_table_names = 1
+            TableRef tblRef = new TableRef(this.tableRef.getName(), null,
+                    this.tableRef.getPartitionNames(), (ArrayList) tabletsList,
+                    this.tableRef.getTableSample(), 
this.tableRef.getCommonHints());
+            tableRefPerParallel.add(tblRef);
         }
-        return tableRefListPerParallel;
+        return tableRefPerParallel;
     }
 
-    private List<List<List<Long>>> splitTablets() throws UserException {
+    private List<List<Long>> splitTablets() throws UserException {
         // get tablets
         Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb());
         OlapTable table = 
db.getOlapTableOrAnalysisException(this.tableName.getTbl());
@@ -560,7 +525,7 @@ public class ExportJob implements Writable {
                             + "set parallelism to partition num.", id, 
totalPartitions, this.parallelism);
             }
             int start = 0;
-            List<List<List<Long>>> tabletsListPerParallel = new ArrayList<>();
+            List<List<Long>> tabletsListPerParallel = new ArrayList<>();
             for (int i = 0; i < realParallelism; ++i) {
                 int partitionNum = numPerParallel;
                 if (numPerQueryRemainder > 0) {
@@ -568,8 +533,9 @@ public class ExportJob implements Writable {
                     --numPerQueryRemainder;
                 }
                 List<List<Long>> tablets = new 
ArrayList<>(tabletIdList.subList(start, start + partitionNum));
+                List<Long> flatTablets = 
tablets.stream().flatMap(List::stream).collect(Collectors.toList());
                 start += partitionNum;
-                tabletsListPerParallel.add(tablets);
+                tabletsListPerParallel.add(flatTablets);
             }
             return tabletsListPerParallel;
         }
@@ -583,7 +549,7 @@ public class ExportJob implements Writable {
         Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism;
         Integer tabletsNumPerQueryRemainder = tabletsAllNum - 
tabletsNumPerParallel * this.parallelism;
 
-        List<List<List<Long>>> tabletsListPerParallel = Lists.newArrayList();
+        List<List<Long>> tabletsListPerParallel = Lists.newArrayList();
         Integer realParallelism = this.parallelism;
         if (tabletsAllNum < this.parallelism) {
             realParallelism = tabletsAllNum;
@@ -599,14 +565,8 @@ public class ExportJob implements Writable {
                 --tabletsNumPerQueryRemainder;
             }
             List<Long> tabletsList = new 
ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum));
-            List<List<Long>> tablets = new ArrayList<>();
-            for (int i = 0; i < tabletsList.size(); i += 
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
-                int end = Math.min(i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT, 
tabletsList.size());
-                tablets.add(new ArrayList<>(tabletsList.subList(i, end)));
-            }
-
             start += tabletsNum;
-            tabletsListPerParallel.add(tablets);
+            tabletsListPerParallel.add(tabletsList);
         }
         return tabletsListPerParallel;
     }
@@ -702,7 +662,7 @@ public class ExportJob implements Writable {
         finishTimeMs = System.currentTimeMillis();
         failMsg = new ExportFailMsg(type, msg);
         jobExecutorList.clear();
-        selectStmtListPerParallel.clear();
+        selectStmtPerParallel.clear();
         allOutfileInfo.clear();
         partitionToVersion.clear();
         if (FeConstants.runningUnitTest) {
@@ -755,7 +715,7 @@ public class ExportJob implements Writable {
         outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
         // Clear the jobExecutorList to release memory.
         jobExecutorList.clear();
-        selectStmtListPerParallel.clear();
+        selectStmtPerParallel.clear();
         allOutfileInfo.clear();
         partitionToVersion.clear();
         Env.getCurrentEnv().getEditLog().logExportUpdateState(this, 
ExportJobState.FINISHED);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
index ae84068b20d..3d321cfc9fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -55,7 +55,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class ExportTaskExecutor implements TransientTaskExecutor {
     private static final Logger LOG = 
LogManager.getLogger(ExportTaskExecutor.class);
 
-    List<StatementBase> selectStmtLists;
+    Optional<StatementBase> selectStmt;
 
     ExportJob exportJob;
 
@@ -67,9 +67,9 @@ public class ExportTaskExecutor implements 
TransientTaskExecutor {
 
     private AtomicBoolean isFinished;
 
-    ExportTaskExecutor(List<StatementBase> selectStmtLists, ExportJob 
exportJob) {
+    ExportTaskExecutor(Optional<StatementBase> selectStmt, ExportJob 
exportJob) {
         this.taskId = UUID.randomUUID().getMostSignificantBits();
-        this.selectStmtLists = selectStmtLists;
+        this.selectStmt = selectStmt;
         this.exportJob = exportJob;
         this.isCanceled = new AtomicBoolean(false);
         this.isFinished = new AtomicBoolean(false);
@@ -90,16 +90,14 @@ public class ExportTaskExecutor implements 
TransientTaskExecutor {
         LOG.debug("[Export Task] taskId: {} updating state to EXPORTING", 
taskId);
         exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null, 
null, null);
         List<OutfileInfo> outfileInfoList = Lists.newArrayList();
-        for (int idx = 0; idx < selectStmtLists.size(); ++idx) {
-            LOG.debug("[Export Task] taskId: {} processing statement {}/{}",
-                    taskId, idx + 1, selectStmtLists.size());
+        if (selectStmt.isPresent()) {
             if (isCanceled.get()) {
-                LOG.debug("[Export Task] taskId: {} canceled during execution 
at statement {}", taskId, idx + 1);
+                LOG.debug("[Export Task] taskId: {} canceled during 
execution", taskId);
                 throw new JobException("Export executor has been canceled, 
task id: {}", taskId);
             }
             // check the version of tablets, skip if the consistency is in 
partition level.
             if (exportJob.getExportTable().isManagedTable() && 
!exportJob.isPartitionConsistency()) {
-                LOG.debug("[Export Task] taskId: {} checking tablet versions 
for statement {}", taskId, idx + 1);
+                LOG.debug("[Export Task] taskId: {} checking tablet versions", 
taskId);
                 try {
                     Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
                             exportJob.getTableName().getDb());
@@ -110,7 +108,7 @@ public class ExportTaskExecutor implements 
TransientTaskExecutor {
                     LOG.debug("[Export Lock] taskId: {}, table: {} acquired 
readLock", taskId, table.getName());
                     try {
                         List<Long> tabletIds;
-                        LogicalPlanAdapter logicalPlanAdapter = 
(LogicalPlanAdapter) selectStmtLists.get(idx);
+                        LogicalPlanAdapter logicalPlanAdapter = 
(LogicalPlanAdapter) selectStmt.get();
                         Optional<UnboundRelation> unboundRelation = 
findUnboundRelation(
                                 logicalPlanAdapter.getLogicalPlan());
                         tabletIds = unboundRelation.get().getTabletIds();
@@ -151,8 +149,8 @@ public class ExportTaskExecutor implements 
TransientTaskExecutor {
             }
 
             try (AutoCloseConnectContext r = buildConnectContext()) {
-                LOG.debug("[Export Task] taskId: {} executing statement {}", 
taskId, idx + 1);
-                stmtExecutor = new StmtExecutor(r.connectContext, 
selectStmtLists.get(idx));
+                LOG.debug("[Export Task] taskId: {} executing", taskId);
+                stmtExecutor = new StmtExecutor(r.connectContext, 
selectStmt.get());
                 stmtExecutor.execute();
                 if (r.connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
                     LOG.debug("[Export Task] taskId: {} failed with MySQL 
error: {}", taskId,
@@ -161,16 +159,11 @@ public class ExportTaskExecutor implements 
TransientTaskExecutor {
                             ExportFailMsg.CancelType.RUN_FAIL, 
r.connectContext.getState().getErrorMessage());
                     return;
                 }
-                LOG.debug("[Export Task] taskId: {} statement {} executed 
successfully", taskId, idx + 1);
-                OutfileInfo outfileInfo = 
getOutFileInfo(r.connectContext.getResultAttachedInfo());
-                LOG.debug("[Export Task] taskId: {} got outfile info for 
statement {}:"
-                                + "fileNumber={}, totalRows={}, fileSize={}",
-                        taskId, idx + 1, outfileInfo.getFileNumber(),
-                        outfileInfo.getTotalRows(), outfileInfo.getFileSize());
-                outfileInfoList.add(outfileInfo);
+                LOG.debug("[Export Task] taskId: {} executed successfully", 
taskId);
+                outfileInfoList = 
getOutFileInfo(r.connectContext.getResultAttachedInfo());
             } catch (Exception e) {
-                LOG.debug("[Export Task] taskId: {} failed with exception 
during statement {}: {}",
-                        taskId, idx + 1, e.getMessage(), e);
+                LOG.debug("[Export Task] taskId: {} failed with exception: {}",
+                        taskId, e.getMessage(), e);
                 exportJob.updateExportJobState(ExportJobState.CANCELLED, 
taskId, null,
                         ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
                 throw new JobException(e);
@@ -214,12 +207,18 @@ public class ExportTaskExecutor implements 
TransientTaskExecutor {
         return new AutoCloseConnectContext(connectContext);
     }
 
-    private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) 
{
-        OutfileInfo outfileInfo = new OutfileInfo();
-        
outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER));
-        
outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS));
-        
outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE));
-        outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL));
+    private List<OutfileInfo> getOutFileInfo(List<Map<String, String>> 
resultAttachedInfo) {
+        List<OutfileInfo> outfileInfo = Lists.newArrayList();
+        for (Map<String, String> row : resultAttachedInfo) {
+            OutfileInfo outfileInfoOneRow = new OutfileInfo();
+            
outfileInfoOneRow.setFileNumber(row.get(OutFileClause.FILE_NUMBER));
+            outfileInfoOneRow.setTotalRows(row.get(OutFileClause.TOTAL_ROWS));
+            outfileInfoOneRow.setFileSize(row.get(OutFileClause.FILE_SIZE));
+            outfileInfoOneRow.setUrl(row.get(OutFileClause.URL));
+            
outfileInfoOneRow.setWriteTime(row.get(OutFileClause.WRITE_TIME_SEC));
+            
outfileInfoOneRow.setWriteSpeed(row.get(OutFileClause.WRITE_SPEED_KB));
+            outfileInfo.add(outfileInfoOneRow);
+        }
         return outfileInfo;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
index b9befd9d326..262bf98adef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
@@ -34,4 +34,10 @@ public class OutfileInfo {
 
     @SerializedName("url")
     private String url;
+
+    @SerializedName("writeTime")
+    private String writeTime;
+
+    @SerializedName("writeSpeed")
+    private String writeSpeed;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index 631339c3732..3dbd5bc2115 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -24,8 +24,6 @@ import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
@@ -145,39 +143,26 @@ public class ResultFileSink extends DataSink {
 
     /**
      * Construct a tuple for file status, the tuple schema as following:
-     * | FileNumber | Int     |
-     * | TotalRows  | Bigint  |
-     * | FileSize   | Bigint  |
-     * | URL        | Varchar |
+     * | FileNumber    | Int     |
+     * | TotalRows     | Bigint  |
+     * | FileSize      | Bigint  |
+     * | URL           | Varchar |
+     * | WriteTimeSec  | Varchar |
+     * | WriteSpeedKB  | Varchar |
      */
     public static TupleDescriptor constructFileStatusTupleDesc(DescriptorTable 
descriptorTable) {
         TupleDescriptor resultFileStatusTupleDesc =
                 descriptorTable.createTupleDescriptor("result_file_status");
         resultFileStatusTupleDesc.setIsMaterialized(true);
-        SlotDescriptor fileNumber = 
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
-        fileNumber.setLabel("FileNumber");
-        fileNumber.setType(ScalarType.createType(PrimitiveType.INT));
-        fileNumber.setColumn(new Column("FileNumber", 
ScalarType.createType(PrimitiveType.INT)));
-        fileNumber.setIsMaterialized(true);
-        fileNumber.setIsNullable(false);
-        SlotDescriptor totalRows = 
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
-        totalRows.setLabel("TotalRows");
-        totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT));
-        totalRows.setColumn(new Column("TotalRows", 
ScalarType.createType(PrimitiveType.BIGINT)));
-        totalRows.setIsMaterialized(true);
-        totalRows.setIsNullable(false);
-        SlotDescriptor fileSize = 
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
-        fileSize.setLabel("FileSize");
-        fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT));
-        fileSize.setColumn(new Column("FileSize", 
ScalarType.createType(PrimitiveType.BIGINT)));
-        fileSize.setIsMaterialized(true);
-        fileSize.setIsNullable(false);
-        SlotDescriptor url = 
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
-        url.setLabel("URL");
-        url.setType(ScalarType.createType(PrimitiveType.VARCHAR));
-        url.setColumn(new Column("URL", 
ScalarType.createType(PrimitiveType.VARCHAR)));
-        url.setIsMaterialized(true);
-        url.setIsNullable(false);
+        for (int i = 0; i < OutFileClause.RESULT_COL_NAMES.size(); ++i) {
+            SlotDescriptor slotDescriptor = 
descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
+            slotDescriptor.setLabel(OutFileClause.RESULT_COL_NAMES.get(i));
+            slotDescriptor.setType(OutFileClause.RESULT_COL_TYPES.get(i));
+            slotDescriptor.setColumn(new 
Column(OutFileClause.RESULT_COL_NAMES.get(i),
+                    OutFileClause.RESULT_COL_TYPES.get(i)));
+            slotDescriptor.setIsMaterialized(true);
+            slotDescriptor.setIsNullable(false);
+        }
         resultFileStatusTupleDesc.computeStatAndMemLayout();
         return resultFileStatusTupleDesc;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 54de7ed1339..c5782466e88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -229,7 +229,7 @@ public class ConnectContext {
 
     private StatsErrorEstimator statsErrorEstimator;
 
-    private Map<String, String> resultAttachedInfo = Maps.newHashMap();
+    private List<Map<String, String>> resultAttachedInfo = 
Lists.newArrayList();
 
     private String workloadGroupName = "";
     private boolean isGroupCommit;
@@ -1098,11 +1098,11 @@ public class ConnectContext {
         return env.getAuth().getExecMemLimit(getQualifiedUser());
     }
 
-    public void setResultAttachedInfo(Map<String, String> resultAttachedInfo) {
-        this.resultAttachedInfo = resultAttachedInfo;
+    public void addResultAttachedInfo(Map<String, String> resultAttachedInfo) {
+        this.resultAttachedInfo.add(resultAttachedInfo);
     }
 
-    public Map<String, String> getResultAttachedInfo() {
+    public List<Map<String, String>> getResultAttachedInfo() {
         return resultAttachedInfo;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 75553377563..139abb74597 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2003,7 +2003,7 @@ public class StmtExecutor {
                     }
                     profile.getSummaryProfile().freshWriteResultConsumeTime();
                     
context.updateReturnRows(batch.getBatch().getRows().size());
-                    
context.setResultAttachedInfo(batch.getBatch().getAttachedInfos());
+                    
context.addResultAttachedInfo(batch.getBatch().getAttachedInfos());
                 }
                 if (batch.isEos()) {
                     break;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/ExportToOutfileLogicalPlanTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/ExportToOutfileLogicalPlanTest.java
index 096868276fd..7605b772c57 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/ExportToOutfileLogicalPlanTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/ExportToOutfileLogicalPlanTest.java
@@ -39,6 +39,7 @@ import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * The `Export` sql finally generates the `Outfile` sql.
@@ -93,31 +94,18 @@ public class ExportToOutfileLogicalPlanTest extends 
TestWithFeService {
                 + ");";
 
         List<Long> currentTablets1 = Arrays.asList(10010L, 10012L, 10014L, 
10016L, 10018L, 10020L, 10022L, 10024L,
-                10026L, 10028L);
-        List<Long> currentTablets2 = Arrays.asList(10030L, 10032L, 10034L, 
10036L, 10038L, 10040L, 10042L, 10044L,
-                10046L, 10048L);
-        List<Long> currentTablets3 = Arrays.asList(10050L, 10052L, 10054L, 
10056L, 10058L, 10060L, 10062L, 10064L,
-                10066L, 10068L);
-        List<Long> currentTablets4 = Arrays.asList(10070L, 10072L, 10074L, 
10076L, 10078L, 10080L, 10082L, 10084L,
+                10026L, 10028L, 10030L, 10032L, 10034L, 10036L, 10038L, 
10040L, 10042L, 10044L,
+                10046L, 10048L, 10050L, 10052L, 10054L, 10056L, 10058L, 
10060L, 10062L, 10064L,
+                10066L, 10068L, 10070L, 10072L, 10074L, 10076L, 10078L, 
10080L, 10082L, 10084L,
                 10086L, 10088L);
         // generate outfile
-        List<List<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
+        List<Optional<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
 
         // check
         Assert.assertEquals(1, outfileSqlPerParallel.size());
-        Assert.assertEquals(4, outfileSqlPerParallel.get(0).size());
 
-        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan1, false), 
Lists.newArrayList(), currentTablets1);
-
-        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get(1)).getLogicalPlan();
-        checkPartitionsAndTablets(getUnboundRelation(plan2, false), 
Lists.newArrayList(), currentTablets2);
-
-        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get(2)).getLogicalPlan();
-        checkPartitionsAndTablets(getUnboundRelation(plan3, false), 
Lists.newArrayList(), currentTablets3);
-
-        LogicalPlan plan4 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get(3)).getLogicalPlan();
-        checkPartitionsAndTablets(getUnboundRelation(plan4, false), 
Lists.newArrayList(), currentTablets4);
     }
 
     /**
@@ -153,25 +141,21 @@ public class ExportToOutfileLogicalPlanTest extends 
TestWithFeService {
                 10086L, 10088L);
 
         // generate outfile
-        List<List<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
+        List<Optional<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
 
         // check
         Assert.assertEquals(4, outfileSqlPerParallel.size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(0).size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(1).size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(2).size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(3).size());
 
-        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan1, false), 
Lists.newArrayList(), currentTablets1);
 
-        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
+        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan2, false), 
Lists.newArrayList(), currentTablets2);
 
-        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
+        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan3, false), 
Lists.newArrayList(), currentTablets3);
 
-        LogicalPlan plan4 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
+        LogicalPlan plan4 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(3).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan4, false), 
Lists.newArrayList(), currentTablets4);
     }
 
@@ -199,41 +183,27 @@ public class ExportToOutfileLogicalPlanTest extends 
TestWithFeService {
         // This export sql should generate 4 array, and there should be 1 
outfile sql in per array.
         // The only difference between them is the TABLET(). They are:
         List<Long> currentTablets1 = Arrays.asList(10010L, 10012L, 10014L, 
10016L, 10018L, 10020L, 10022L, 10024L,
-                10026L, 10028L);
-        List<Long> currentTablets12 = Arrays.asList(10030L, 10032L, 10034L, 
10036L);
+                10026L, 10028L, 10030L, 10032L, 10034L, 10036L);
         List<Long> currentTablets2 = Arrays.asList(10038L, 10040L, 10042L, 
10044L, 10046L, 10048L, 10050L, 10052L,
-                10054L, 10056L);
-        List<Long> currentTablets22 = Arrays.asList(10058L, 10060L, 10062L);
+                10054L, 10056L, 10058L, 10060L, 10062L);
         List<Long> currentTablets3 = Arrays.asList(10064L, 10066L, 10068L, 
10070L, 10072L, 10074L, 10076L, 10078L,
-                10080L, 10082L);
-        List<Long> currentTablets32 = Arrays.asList(10084L, 10086L, 10088L);
+                10080L, 10082L, 10084L, 10086L, 10088L);
 
         // generate outfile
-        List<List<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
+        List<Optional<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
 
         // check
         Assert.assertEquals(3, outfileSqlPerParallel.size());
-        Assert.assertEquals(2, outfileSqlPerParallel.get(0).size());
-        Assert.assertEquals(2, outfileSqlPerParallel.get(1).size());
-        Assert.assertEquals(2, outfileSqlPerParallel.get(2).size());
 
-        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan1, false), 
Lists.newArrayList(), currentTablets1);
 
-        LogicalPlan plan12 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get(1)).getLogicalPlan();
-        checkPartitionsAndTablets(getUnboundRelation(plan12, false), 
Lists.newArrayList(), currentTablets12);
-
-        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
+        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan2, false), 
Lists.newArrayList(), currentTablets2);
 
-        LogicalPlan plan22 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get(1)).getLogicalPlan();
-        checkPartitionsAndTablets(getUnboundRelation(plan22, false), 
Lists.newArrayList(), currentTablets22);
 
-        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
+        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan3, false), 
Lists.newArrayList(), currentTablets3);
-
-        LogicalPlan plan32 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get(1)).getLogicalPlan();
-        checkPartitionsAndTablets(getUnboundRelation(plan32, false), 
Lists.newArrayList(), currentTablets32);
     }
 
     /**
@@ -267,26 +237,22 @@ public class ExportToOutfileLogicalPlanTest extends 
TestWithFeService {
         List<String> currentPartitions = Arrays.asList("p1");
 
         // generate outfile
-        List<List<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
+        List<Optional<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
 
         // check
         Assert.assertEquals(4, outfileSqlPerParallel.size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(0).size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(1).size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(2).size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(3).size());
 
 
-        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan1, false), 
currentPartitions, currentTablets1);
 
-        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
+        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan2, false), 
currentPartitions, currentTablets2);
 
-        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
+        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan3, false), 
currentPartitions, currentTablets3);
 
-        LogicalPlan plan4 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
+        LogicalPlan plan4 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(3).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan4, false), 
currentPartitions, currentTablets4);
     }
 
@@ -320,25 +286,21 @@ public class ExportToOutfileLogicalPlanTest extends 
TestWithFeService {
         List<String> currentPartitions = Arrays.asList("p1", "p4");
 
         // generate outfile
-        List<List<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
+        List<Optional<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
 
         // check
         Assert.assertEquals(4, outfileSqlPerParallel.size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(0).size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(1).size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(2).size());
-        Assert.assertEquals(1, outfileSqlPerParallel.get(3).size());
 
-        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan1, false), 
currentPartitions, currentTablets1);
 
-        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
+        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan2, false), 
currentPartitions, currentTablets2);
 
-        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
+        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan3, false), 
currentPartitions, currentTablets3);
 
-        LogicalPlan plan4 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
+        LogicalPlan plan4 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(3).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan4, false), 
currentPartitions, currentTablets4);
     }
 
@@ -380,42 +342,39 @@ public class ExportToOutfileLogicalPlanTest extends 
TestWithFeService {
         List<String> currentPartitions = Arrays.asList("p1");
 
         // generate outfile
-        List<List<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
+        List<Optional<StatementBase>> outfileSqlPerParallel = 
getOutfileSqlPerParallel(exportSql);
 
         // check
         Assert.assertEquals(10, outfileSqlPerParallel.size());
-        for (int i = 0; i < 10; ++i) {
-            Assert.assertEquals(1, outfileSqlPerParallel.get(i).size());
-        }
 
-        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
+        LogicalPlan plan1 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(0).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan1, false), 
currentPartitions, currentTablets1);
 
-        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
+        LogicalPlan plan2 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(1).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan2, false), 
currentPartitions, currentTablets2);
 
-        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
+        LogicalPlan plan3 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(2).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan3, false), 
currentPartitions, currentTablets3);
 
-        LogicalPlan plan4 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
+        LogicalPlan plan4 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(3).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan4, false), 
currentPartitions, currentTablets4);
 
-        LogicalPlan plan5 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(4).get(0)).getLogicalPlan();
+        LogicalPlan plan5 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(4).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan5, false), 
currentPartitions, currentTablets5);
 
-        LogicalPlan plan6 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(5).get(0)).getLogicalPlan();
+        LogicalPlan plan6 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(5).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan6, false), 
currentPartitions, currentTablets6);
 
-        LogicalPlan plan7 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(6).get(0)).getLogicalPlan();
+        LogicalPlan plan7 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(6).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan7, false), 
currentPartitions, currentTablets7);
 
-        LogicalPlan plan8 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(7).get(0)).getLogicalPlan();
+        LogicalPlan plan8 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(7).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan8, false), 
currentPartitions, currentTablets8);
 
-        LogicalPlan plan9 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(8).get(0)).getLogicalPlan();
+        LogicalPlan plan9 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(8).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan9, false), 
currentPartitions, currentTablets9);
 
-        LogicalPlan plan10 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(9).get(0)).getLogicalPlan();
+        LogicalPlan plan10 = ((LogicalPlanAdapter) 
outfileSqlPerParallel.get(9).get()).getLogicalPlan();
         checkPartitionsAndTablets(getUnboundRelation(plan10, false), 
currentPartitions, currentTablets10);
     }
 
@@ -425,9 +384,9 @@ public class ExportToOutfileLogicalPlanTest extends 
TestWithFeService {
     }
 
     // need open EnableNereidsPlanner
-    private List<List<StatementBase>> getOutfileSqlPerParallel(String 
exportSql) throws UserException {
+    private List<Optional<StatementBase>> getOutfileSqlPerParallel(String 
exportSql) throws UserException {
         ExportCommand exportCommand = (ExportCommand) parseSql(exportSql);
-        List<List<StatementBase>> selectStmtListPerParallel = 
Lists.newArrayList();
+        List<Optional<StatementBase>> selectStmtPerParallel = 
Lists.newArrayList();
         try {
             Method checkAllParameters = 
exportCommand.getClass().getDeclaredMethod("checkAllParameters",
                     ConnectContext.class, TableName.class, Map.class);
@@ -445,7 +404,7 @@ public class ExportToOutfileLogicalPlanTest extends 
TestWithFeService {
 
             ExportJob job = (ExportJob) generateExportJob.invoke(
                     exportCommand, connectContext, 
exportCommand.getFileProperties(), tblName);
-            selectStmtListPerParallel = job.getSelectStmtListPerParallel();
+            selectStmtPerParallel = job.getSelectStmtPerParallel();
         } catch (NoSuchMethodException e) {
             throw new UserException(e);
         } catch (InvocationTargetException e) {
@@ -453,7 +412,7 @@ public class ExportToOutfileLogicalPlanTest extends 
TestWithFeService {
         } catch (IllegalAccessException e) {
             throw new UserException(e);
         }
-        return selectStmtListPerParallel;
+        return selectStmtPerParallel;
     }
 
     private void checkPartitionsAndTablets(UnboundRelation relation, 
List<String> currentPartitionNames,
diff --git a/regression-test/data/export_p0/outfile/test_outfile_result.out 
b/regression-test/data/export_p0/outfile/test_outfile_result.out
new file mode 100644
index 00000000000..cda7d5ce1e6
Binary files /dev/null and 
b/regression-test/data/export_p0/outfile/test_outfile_result.out differ
diff --git 
a/regression-test/suites/export_p0/outfile/test_outfile_result.groovy 
b/regression-test/suites/export_p0/outfile/test_outfile_result.groovy
new file mode 100644
index 00000000000..a4cef1c6ada
--- /dev/null
+++ b/regression-test/suites/export_p0/outfile/test_outfile_result.groovy
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outfile_result", "p0") {
+    def export_table_name = "test_outfile_result"
+
+    sql """ DROP TABLE IF EXISTS ${export_table_name} """
+    sql """
+    CREATE TABLE IF NOT EXISTS ${export_table_name} (
+        `user_id` LARGEINT NOT NULL COMMENT "用户id",
+        `Name` STRING COMMENT "用户年龄",
+        `Age` int(11) NULL
+        )
+        DISTRIBUTED BY HASH(user_id) BUCKETS 3
+        PROPERTIES("replication_num" = "1");
+    """
+    StringBuilder sb = new StringBuilder()
+    int i = 1
+    for (; i < 10; i ++) {
+        sb.append("""
+            (${i}, 'ftw-${i}', ${i + 18}),
+        """)
+    }
+    sb.append("""
+            (${i}, NULL, NULL)
+        """)
+    sql """ INSERT INTO ${export_table_name} VALUES
+            ${sb.toString()}
+        """
+    qt_select_export """ SELECT * FROM ${export_table_name} t ORDER BY 
user_id; """
+
+    def isNumber = {String str ->
+        logger.info("str = " + str)
+        if (str == null || str.trim().isEmpty()) {
+            throw exception("result is null or empty")
+        }
+        try {
+            double num = Double.parseDouble(str);
+            if (num < 0) {
+                throw exception("result can not be less than 0")
+            }
+        } catch (NumberFormatException e) {
+            throw exception("NumberFormatException: " + e.getMessage())
+        }
+        return true
+    }
+
+
+    // 1. test s3
+    try {
+        String ak = getS3AK()
+        String sk = getS3SK()
+        String s3_endpoint = getS3Endpoint()
+        String region = getS3Region()
+        String bucket = context.config.otherConfigs.get("s3BucketName");
+
+        // http schema
+        def outFilePath = "${bucket}/outfile_different_s3/exp_"
+        List<List<Object>> outfile_res = sql """ SELECT * FROM 
${export_table_name} t ORDER BY user_id
+                                                INTO OUTFILE 
"s3://${outFilePath}"
+                                                FORMAT AS parquet
+                                                PROPERTIES (
+                                                    "s3.endpoint" = 
"${s3_endpoint}",
+                                                    "s3.region" = "${region}",
+                                                    "s3.secret_key"="${sk}",
+                                                    "s3.access_key" = "${ak}"
+                                                );
+                                            """
+
+        assertEquals(6, outfile_res[0].size())
+        assertEquals(true, isNumber(outfile_res[0][4]))
+        assertEquals(true, isNumber(outfile_res[0][5]))
+    } finally {
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to