This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 95aa4d8631 [Feature](Export) Supports concurrently export of table 
data (#21911)
95aa4d8631 is described below

commit 95aa4d863178a8f54335b5de9b6a3599342f25b1
Author: Tiewei Fang <[email protected]>
AuthorDate: Fri Aug 4 18:50:17 2023 +0800

    [Feature](Export) Supports concurrently export of table data (#21911)
---
 .../Manipulation/EXPORT.md                         |  25 +-
 .../Manipulation/EXPORT.md                         |  26 +-
 .../main/java/org/apache/doris/common/Config.java  |   9 +
 .../java/org/apache/doris/analysis/ExportStmt.java |  85 ++++---
 .../org/apache/doris/analysis/OutFileClause.java   |   2 +-
 .../java/org/apache/doris/analysis/TableRef.java   |   4 +
 .../main/java/org/apache/doris/load/ExportJob.java | 264 ++++++++++++---------
 .../java/org/apache/doris/persist/EditLog.java     |   3 +-
 .../org/apache/doris/task/ExportExportingTask.java | 172 +++++++++++---
 9 files changed, 392 insertions(+), 198 deletions(-)

diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
index 10b6198ec8..8cf76f11ed 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
@@ -46,7 +46,11 @@ WITH BROKER/S3/HDFS
 [broker_properties];
 ```
 
-illustrate:
+**principle**
+
+The bottom layer of the `Export` statement actually executes the 
`select...outfile..` statement. The `Export` task will be decomposed into 
multiple `select...outfile..` statements to execute concurrently according to 
the value of the `parallelism` parameter. Each `select...outfile..` is 
responsible for exporting some tablets of table.
+
+**illustrate:**
 
 - `table_name`
 
@@ -76,6 +80,7 @@ illustrate:
   - `timeout`: The timeout period of the export job, the default is 2 hours, 
the unit is seconds.
   - `columns`: Specifies certain columns of the export job table
   - `format`: Specifies the file format, support: parquet, orc, csv, 
csv_with_names, csv_with_names_and_types.The default is csv format.
+  - `parallelism`: The concurrency degree of the `export` job, the default is 
`1`. The export job will be divided into `select..outfile..` statements of the 
number of `parallelism` to execute concurrently. (If the value of `parallelism` 
is greater than the number of tablets in the table, the system will 
automatically set `parallelism` to the number of tablets, that is, each 
`select..outfile..` statement is responsible for one tablet)
   - `delete_existing_files`: default `false`. If it is specified as true, you 
will first delete all files specified in the directory specified by the 
file_path, and then export the data to the directory.For example: "file_path" = 
"/user/tmp", then delete all files and directory under "/user/"; "file_path" = 
"/user/tmp/", then delete all files and directory under "/user/tmp/"
 
   > Note that to use the `delete_existing_files` parameter, you also need to 
add the configuration `enable_delete_existing_files = true` to the fe.conf file 
and restart the FE. Only then will the `delete_existing_files` parameter take 
effect. Setting `delete_existing_files = true` is a dangerous operation and it 
is recommended to only use it in a testing environment.
@@ -218,7 +223,17 @@ PROPERTIES (
 
 When the exported file size is larger than 5MB, the data will be split into 
multiple files, with each file containing a maximum of 5MB.
 
-7. set delete_existing_files
+7. set parallelism
+```sql
+EXPORT TABLE test TO "file:///home/user/tmp/"
+PROPERTIES (
+  "format" = "parquet",
+  "max_file_size" = "5MB",
+  "parallelism" = "5"
+);
+```
+
+8. set delete_existing_files
 
 ```sql
 EXPORT TABLE test TO "file:///home/user/tmp"
@@ -348,8 +363,10 @@ WITH BROKER "broker_name"
   #### Precautions
 
   - Exporting a large amount of data at one time is not recommended. The 
maximum recommended export data volume for an Export job is several tens of GB. 
An overly large export results in more junk files and higher retry costs. If 
the amount of table data is too large, it is recommended to export by partition.
-  - If the Export job fails, the `__doris_export_tmp_xxx` temporary directory 
generated in the remote storage and the generated files will not be deleted, 
and the user needs to delete it manually.
-  - If the Export job runs successfully, the `__doris_export_tmp_xxx` 
directory generated in the remote storage may be preserved or cleared according 
to the file system semantics of the remote storage. For example, in S3 object 
storage, after the last file in a directory is removed by the rename operation, 
the directory will also be deleted. If the directory is not cleared, the user 
can clear it manually.
+  - If the Export job fails, the generated files will not be deleted, and the 
user needs to delete it manually.
   - The Export job only exports the data of the Base table, not the data of 
the materialized view.
   - The export job scans data and occupies IO resources, which may affect the 
query latency of the system.
   - The maximum number of export jobs running simultaneously in a cluster is 
5. Only jobs submitted after that will be queued.
+  - Currently, The `Export Job` is simply check whether the `Tablets version` 
is the same, it is recommended not to import data during the execution of the 
`Export Job`.
+  - The maximum parallelism of all `Export jobs` in a cluster is `50`. You can 
change the value by adding the parameter `maximum_parallelism_of_export_job` to 
fe.conf and restart FE.
+  - The maximum number of partitions that an `Export job` allows is 2000. You 
can add a parameter to the fe.conf `maximum_number_of_export_partitions` and 
restart FE to modify the setting.
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
index 187233f378..c08d703f22 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
@@ -46,7 +46,10 @@ WITH BROKER/S3/HDFS
 [broker_properties];
 ```
 
-说明:
+**原理**
+Export语句底层实际执行的是`select...outfile..`语句,Export任务会根据`parallelism`参数的值来分解为多个`select...outfile..`语句并发地去执行,每一个`select...outfile..`负责导出部份tablets数据。
+
+**说明**:
 
 - `table_name`
 
@@ -77,6 +80,7 @@ WITH BROKER/S3/HDFS
   - `timeout`:导出作业的超时时间,默认为2小时,单位是秒。
   - `format`:导出作业的文件格式,支持:parquet, orc, csv, 
csv_with_names、csv_with_names_and_types。 默认为csv格式。
   - `max_file_size`:导出作业单个文件大小限制,如果结果超过这个值,将切割成多个文件。
+  - 
`parallelism`:导出作业的并发度,默认为`1`,导出作业会分割为`parallelism`个数的`select..outfile..`语句去并发执行。(如果parallelism个数大于表的tablets个数,系统将自动把parallelism设置为tablets个数大小,即每一个`select..outfile..`语句负责一个tablets)
   - `delete_existing_files`: 
默认为false,若指定为true,则会先删除`export_path`所指定目录下的所有文件,然后导出数据到该目录下。例如:"export_path" = 
"/user/tmp", 则会删除"/user/"下所有文件及目录;"file_path" = "/user/tmp/", 
则会删除"/user/tmp/"下所有文件及目录。
 
   > 
注意:要使用delete_existing_files参数,还需要在fe.conf中添加配置`enable_delete_existing_files = 
true`并重启fe,此时delete_existing_files才会生效。delete_existing_files = true 
是一个危险的操作,建议只在测试环境中使用。
@@ -214,7 +218,17 @@ PROPERTIES (
 ```
 当导出文件大于5MB时,将切割数据为多个文件,每个文件最大为5MB。
 
-7. 设置delete_existing_files属性
+7. 设置parallelism属性
+```sql
+EXPORT TABLE test TO "file:///home/user/tmp/"
+PROPERTIES (
+  "format" = "parquet",
+  "max_file_size" = "5MB",
+  "parallelism" = "5"
+);
+```
+
+8. 设置delete_existing_files属性
 ```sql
 EXPORT TABLE test TO "file:///home/user/tmp"
 PROPERTIES (
@@ -337,13 +351,15 @@ WITH BROKER "broker_name"
 
 通常一个 Export 作业的查询计划只有 `扫描-导出` 两部分,不涉及需要太多内存的计算逻辑。所以通常 2GB 的默认内存限制可以满足需求。
 
-但在某些场景下,比如一个查询计划,在同一个 BE 上需要扫描的 Tablet 过多,或者 Tablet 
的数据版本过多时,可能会导致内存不足。此时需要通过这个 `exec_mem_limit` 参数设置更大的内存,比如 4GB、8GB 等。
+但在某些场景下,比如一个查询计划,在同一个 BE 上需要扫描的 Tablet 过多,或者 Tablet 
的数据版本过多时,可能会导致内存不足。此时需要通过参数 `exec_mem_limit` 设置更大的内存,比如 4GB、8GB 等。
 
 #### 注意事项
 
 - 不建议一次性导出大量数据。一个 Export 作业建议的导出数据量最大在几十 
GB。过大的导出会导致更多的垃圾文件和更高的重试成本。如果表数据量过大,建议按照分区导出。
-- 如果 Export 作业运行失败,在远端存储中产生的 `__doris_export_tmp_xxx` 
临时目录,以及已经生成的文件不会被删除,需要用户手动删除。
-- 如果 Export 作业运行成功,在远端存储中产生的 `__doris_export_tmp_xxx` 
目录,根据远端存储的文件系统语义,可能会保留,也可能会被清除。比如在S3对象存储中,通过 rename 
操作将一个目录中的最后一个文件移走后,该目录也会被删除。如果该目录没有被清除,用户可以手动清除。
+- 如果 Export 作业运行失败,已经生成的文件不会被删除,需要用户手动删除。
 - Export 作业只会导出 Base 表的数据,不会导出物化视图的数据。
 - Export 作业会扫描数据,占用 IO 资源,可能会影响系统的查询延迟。
 - 一个集群内同时运行的 Export 作业最大个数为 5。之后提交的作业将会排队。
+- 目前在export时只是简单检查tablets版本是否一致,建议在执行export过程中不要对该表进行导入数据操作。
+- 一个集群内所有Export 
Job的parallelism加起来最多是50。可以在fe.conf中添加参数`maximum_parallelism_of_export_job`并重启FE来修改该设置。
+- 一个Export 
Job允许导出的分区数量最大为2000,可以在fe.conf中添加参数`maximum_number_of_export_partitions`并重启FE来修改该设置。
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5721928b98..4d560cf519 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2059,4 +2059,13 @@ public class Config extends ConfigBase {
     @ConfField
     public static int cpu_resource_limit_per_analyze_task = 1;
 
+    @ConfField(mutable = true, description = {
+            "Export任务允许的最大分区数量",
+            "The maximum number of partitions allowed by Export job"})
+    public static int maximum_number_of_export_partitions = 2000;
+
+    @ConfField(mutable = true, description = {
+            "Export任务允许的最大并行数",
+            "The maximum parallelism allowed by Export job"})
+    public static int maximum_parallelism_of_export_job = 50;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index b04d1f9a48..7c88098c22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
@@ -41,6 +40,7 @@ import org.apache.doris.qe.VariableMgr;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -58,15 +58,30 @@ import java.util.UUID;
 //          BY BROKER 'broker_name' [( $broker_attrs)]
 public class ExportStmt extends StatementBase {
     private static final Logger LOG = LogManager.getLogger(ExportStmt.class);
-
-    public static final String TABLET_NUMBER_PER_TASK_PROP = 
"tablet_num_per_task";
+    public static final String PARALLELISM = "parallelism";
     public static final String LABEL = "label";
 
     private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
     private static final String DEFAULT_LINE_DELIMITER = "\n";
     private static final String DEFAULT_COLUMNS = "";
+    private static final String DEFAULT_PARALLELISM = "1";
+
+    private static final ImmutableSet<String> PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
+            .add(LABEL)
+            .add(PARALLELISM)
+            .add(LoadStmt.EXEC_MEM_LIMIT)
+            .add(LoadStmt.TIMEOUT_PROPERTY)
+            .add(LoadStmt.KEY_IN_PARAM_COLUMNS)
+            .add(LoadStmt.TIMEOUT_PROPERTY)
+            .add(OutFileClause.PROP_MAX_FILE_SIZE)
+            .add(OutFileClause.PROP_DELETE_EXISTING_FILES)
+            .add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
+            .add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
+            .add("format")
+            .build();
+
     private TableName tblName;
-    private List<String> partitions;
+    private List<String> partitionStringNames;
     private Expr whereExpr;
     private String path;
     private BrokerDesc brokerDesc;
@@ -81,6 +96,8 @@ public class ExportStmt extends StatementBase {
 
     private String label;
 
+    private Integer parallelism;
+
     private String maxFileSize;
     private String deleteExistingFiles;
     private SessionVariable sessionVariables;
@@ -117,7 +134,7 @@ public class ExportStmt extends StatementBase {
     }
 
     public List<String> getPartitions() {
-        return partitions;
+        return partitionStringNames;
     }
 
     public Expr getWhereExpr() {
@@ -189,7 +206,7 @@ public class ExportStmt extends StatementBase {
             if (partitionNames.isTemp()) {
                 throw new AnalysisException("Do not support exporting 
temporary partitions");
             }
-            partitions = partitionNames.getPartitionNames();
+            partitionStringNames = partitionNames.getPartitionNames();
         }
 
         // check auth
@@ -234,7 +251,7 @@ public class ExportStmt extends StatementBase {
         Table table = db.getTableOrAnalysisException(tblName.getTbl());
         table.readLock();
         try {
-            if (partitions == null) {
+            if (partitionStringNames == null) {
                 return;
             }
             if (!table.isPartitioned()) {
@@ -256,7 +273,7 @@ public class ExportStmt extends StatementBase {
                             + tblType.toString() + " type, do not support 
EXPORT.");
             }
 
-            for (String partitionName : partitions) {
+            for (String partitionName : partitionStringNames) {
                 Partition partition = table.getPartition(partitionName);
                 if (partition == null) {
                     throw new AnalysisException("Partition [" + partitionName 
+ "] does not exist");
@@ -306,41 +323,31 @@ public class ExportStmt extends StatementBase {
     }
 
     private void checkProperties(Map<String, String> properties) throws 
UserException {
+        for (String key : properties.keySet()) {
+            if (!PROPERTIES_SET.contains(key.toLowerCase())) {
+                throw new DdlException("Invalid property key: '" + key + "'");
+            }
+        }
+
+        // convert key to lowercase
+        Map<String, String> tmpMap = Maps.newHashMap();
+        for (String key : properties.keySet()) {
+            tmpMap.put(key.toLowerCase(), properties.get(key));
+        }
+        properties = tmpMap;
+
         this.columnSeparator = 
Separator.convertSeparator(PropertyAnalyzer.analyzeColumnSeparator(
                 properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR));
         this.lineDelimiter = 
Separator.convertSeparator(PropertyAnalyzer.analyzeLineDelimiter(
                 properties, ExportStmt.DEFAULT_LINE_DELIMITER));
         this.columns = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_COLUMNS, 
DEFAULT_COLUMNS);
 
-        // timeout
-        if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
-            try {
-                Integer.parseInt(properties.get(LoadStmt.TIMEOUT_PROPERTY));
-            } catch (NumberFormatException e) {
-                throw new DdlException("Invalid timeout value: " + 
e.getMessage());
-            }
-        } else {
-            properties.put(LoadStmt.TIMEOUT_PROPERTY, 
String.valueOf(Config.export_task_default_timeout_second));
-        }
-
         // format
-        if (properties.containsKey(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE)) {
-            this.format = 
properties.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE).toLowerCase();
-        } else {
-            this.format = "csv";
-        }
+        this.format = 
properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE, "csv").toLowerCase();
 
-        // tablet num per task
-        if (properties.containsKey(TABLET_NUMBER_PER_TASK_PROP)) {
-            try {
-                Long.parseLong(properties.get(TABLET_NUMBER_PER_TASK_PROP));
-            } catch (NumberFormatException e) {
-                throw new DdlException("Invalid tablet num per task value: " + 
e.getMessage());
-            }
-        } else {
-            // use session variables
-            properties.put(TABLET_NUMBER_PER_TASK_PROP, 
String.valueOf(Config.export_tablet_num_per_task));
-        }
+        // parallelism
+        String parallelismString = properties.getOrDefault(PARALLELISM, 
DEFAULT_PARALLELISM);
+        parallelism = Integer.parseInt(parallelismString);
 
         // max_file_size
         this.maxFileSize = 
properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, "");
@@ -365,9 +372,9 @@ public class ExportStmt extends StatementBase {
         } else {
             sb.append(tblName.toSql());
         }
-        if (partitions != null && !partitions.isEmpty()) {
+        if (partitionStringNames != null && !partitionStringNames.isEmpty()) {
             sb.append(" PARTITION (");
-            Joiner.on(", ").appendTo(sb, partitions);
+            Joiner.on(", ").appendTo(sb, partitionStringNames);
             sb.append(")");
         }
         sb.append("\n");
@@ -408,4 +415,8 @@ public class ExportStmt extends StatementBase {
     public String getDeleteExistingFiles() {
         return deleteExistingFiles;
     }
+
+    public Integer getParallelNum() {
+        return parallelism;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index c3a98b7b7f..85c40d79bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -267,7 +267,7 @@ public class OutFileClause {
     }
 
     private void genOrcSchema(List<Expr> resultExprs, List<String> colLabels) 
throws AnalysisException {
-        Preconditions.checkState(this.parquetSchemas.isEmpty());
+        Preconditions.checkState(this.orcSchemas.isEmpty());
         for (int i = 0; i < resultExprs.size(); ++i) {
             Expr expr = resultExprs.get(i);
             String type = "";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 752211d435..f332d269b3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -317,6 +317,10 @@ public class TableRef implements ParseNode, Writable {
         return sampleTabletIds;
     }
 
+    public ArrayList<String> getCommonHints() {
+        return commonHints;
+    }
+
     public TableSample getTableSample() {
         return tableSample;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 607cbfa944..1527ba1ce4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -24,7 +24,6 @@ import org.apache.doris.analysis.FromClause;
 import org.apache.doris.analysis.LimitElement;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.analysis.OutFileClause;
-import org.apache.doris.analysis.QueryStmt;
 import org.apache.doris.analysis.SelectList;
 import org.apache.doris.analysis.SelectListItem;
 import org.apache.doris.analysis.SelectStmt;
@@ -38,11 +37,14 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.Pair;
-import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
@@ -53,18 +55,10 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.SessionVariable;
-import org.apache.doris.system.Backend;
-import org.apache.doris.task.AgentClient;
+import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.task.ExportExportingTask;
-import org.apache.doris.thrift.TAgentResult;
 import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TPaloScanRange;
-import org.apache.doris.thrift.TScanRange;
-import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
-import org.apache.doris.thrift.TSnapshotRequest;
-import org.apache.doris.thrift.TStatusCode;
-import org.apache.doris.thrift.TypesConstants;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -81,6 +75,9 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -119,8 +116,8 @@ public class ExportJob implements Writable {
     private String columnSeparator;
     @SerializedName("lineDelimiter")
     private String lineDelimiter;
-    @SerializedName("partitions")
-    private List<String> partitions;
+    @SerializedName(value = "partitionNames", alternate = {"partitions"})
+    private List<String> partitionNames;
     @SerializedName("tableName")
     private TableName tableName;
     @SerializedName("state")
@@ -145,14 +142,18 @@ public class ExportJob implements Writable {
     private String maxFileSize;
     @SerializedName("deleteExistingFiles")
     private String deleteExistingFiles;
-    // progress has two functions at EXPORTING stage:
-    // 1. when progress < 100, it indicates exporting
-    // 2. set progress = 100 ONLY when exporting progress is completely done
-    private int progress;
+    @SerializedName("startTimeMs")
     private long startTimeMs;
+    @SerializedName("finishTimeMs")
     private long finishTimeMs;
+    @SerializedName("failMsg")
     private ExportFailMsg failMsg;
+    @SerializedName("outfileInfo")
     private String outfileInfo;
+    // progress has two functions at EXPORTING stage:
+    // 1. when progress < 100, it indicates exporting
+    // 2. set progress = 100 ONLY when exporting progress is completely done
+    private int progress;
 
     private TableRef tableRef;
 
@@ -160,9 +161,19 @@ public class ExportJob implements Writable {
 
     private String sql = "";
 
+    private Integer parallelNum;
+
+    public Map<String, Long> getPartitionToVersion() {
+        return partitionToVersion;
+    }
+
+    private Map<String, Long> partitionToVersion = Maps.newHashMap();
+
     // The selectStmt is sql 'select ... into outfile ...'
     @Getter
-    private List<QueryStmt> selectStmtList = Lists.newArrayList();
+    private List<SelectStmt> selectStmtList = Lists.newArrayList();
+
+    private List<StmtExecutor> stmtExecutorList;
 
     private List<String> exportColumns = Lists.newArrayList();
 
@@ -216,6 +227,7 @@ public class ExportJob implements Writable {
         String path = stmt.getPath();
         Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
         this.whereExpr = stmt.getWhereExpr();
+        this.parallelNum = stmt.getParallelNum();
         this.exportPath = path;
         this.sessionVariables = stmt.getSessionVariables();
         this.timeoutSecond = sessionVariables.getQueryTimeoutS();
@@ -225,7 +237,7 @@ public class ExportJob implements Writable {
         this.format = stmt.getFormat();
         this.maxFileSize = stmt.getMaxFileSize();
         this.deleteExistingFiles = stmt.getDeleteExistingFiles();
-        this.partitions = stmt.getPartitions();
+        this.partitionNames = stmt.getPartitions();
 
         this.exportTable = 
db.getTableOrDdlException(stmt.getTblName().getTbl());
         this.columns = stmt.getColumns();
@@ -242,7 +254,7 @@ public class ExportJob implements Writable {
             if (selectStmtList.isEmpty()) {
                 // This scenario is used for 'EXPORT TABLE tbl INTO PATH'
                 // we need generate Select Statement
-                generateQueryStmt();
+                generateQueryStmt(stmt);
             }
         } finally {
             exportTable.readUnlock();
@@ -251,7 +263,7 @@ public class ExportJob implements Writable {
         this.origStmt = stmt.getOrigStmt();
     }
 
-    private void generateQueryStmt() {
+    private void generateQueryStmt(ExportStmt stmt) throws UserException {
         SelectList list = new SelectList();
         if (exportColumns.isEmpty()) {
             list.addItem(SelectListItem.createStarItem(this.tableName));
@@ -266,17 +278,100 @@ public class ExportJob implements Writable {
             }
         }
 
-        List<TableRef> tableRefList = Lists.newArrayList();
-        tableRefList.add(this.tableRef);
-        FromClause fromClause = new FromClause(tableRefList);
+        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
splitTablets(stmt);
+        LOG.info("Export task is split into {} outfile statements.", 
tableRefListPerQuery.size());
 
-        SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
-                null, null, LimitElement.NO_LIMIT);
-        // generate outfile clause
-        OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
-        selectStmt.setOutFileClause(outfile);
-        selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
-        selectStmtList.add(selectStmt);
+        if (LOG.isDebugEnabled()) {
+            for (int i = 0; i < tableRefListPerQuery.size(); i++) {
+                LOG.debug("Outfile clause {} is responsible for tables: {}", i,
+                        
tableRefListPerQuery.get(i).get(0).getSampleTabletIds());
+            }
+        }
+
+        for (ArrayList<TableRef> tableRefList : tableRefListPerQuery) {
+            FromClause fromClause = new FromClause(tableRefList);
+            // generate outfile clause
+            OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
+            SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
+                    null, null, LimitElement.NO_LIMIT);
+            selectStmt.setOutFileClause(outfile);
+            selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
+            selectStmtList.add(selectStmt);
+        }
+        stmtExecutorList = Arrays.asList(new 
StmtExecutor[selectStmtList.size()]);
+        if (LOG.isDebugEnabled()) {
+            for (int i = 0; i < selectStmtList.size(); i++) {
+                LOG.debug("Outfile clause {} is: {}", i, 
selectStmtList.get(i).toSql());
+            }
+        }
+    }
+
+    private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) 
throws UserException {
+        // get tablets
+        Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb());
+        OlapTable table = 
db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl());
+        List<Long> tabletIdList = Lists.newArrayList();
+        table.readLock();
+        try {
+            Collection<Partition> partitions = new ArrayList<Partition>();
+            // get partitions
+            // user specifies partitions, already checked in ExportStmt
+            if (this.partitionNames != null) {
+                if (partitionNames.size() > 
Config.maximum_number_of_export_partitions) {
+                    throw new UserException("The partitions number of this 
export job is larger than the maximum number"
+                            + " of partitions allowed by a export job");
+                }
+                for (String partName : this.partitionNames) {
+                    partitions.add(table.getPartition(partName));
+                }
+            } else {
+                if (table.getPartitions().size() > 
Config.maximum_number_of_export_partitions) {
+                    throw new UserException("The partitions number of this 
export job is larger than the maximum number"
+                            + " of partitions allowed by a export job");
+                }
+                partitions = table.getPartitions();
+            }
+
+            // get tablets
+            for (Partition partition : partitions) {
+                partitionToVersion.put(partition.getName(), 
partition.getVisibleVersion());
+                for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                    tabletIdList.addAll(index.getTabletIdsInOrder());
+                }
+            }
+        } finally {
+            table.readUnlock();
+        }
+
+        Integer tabletsAllNum = tabletIdList.size();
+        Integer tabletsNumPerQuery = tabletsAllNum / this.parallelNum;
+        Integer tabletsNumPerQueryRemainder = tabletsAllNum - 
tabletsNumPerQuery * this.parallelNum;
+
+        Integer start = 0;
+
+        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
Lists.newArrayList();
+
+        int outfileNum = this.parallelNum;
+        if (tabletsAllNum < this.parallelNum) {
+            outfileNum = tabletsAllNum;
+            LOG.warn("Export Job [{}]: The number of tablets ({}) is smaller 
than parallelism ({}), "
+                        + "set parallelism to tablets num.", id, 
tabletsAllNum, this.parallelNum);
+        }
+        for (int i = 0; i < outfileNum; ++i) {
+            Integer tabletsNum = tabletsNumPerQuery;
+            if (tabletsNumPerQueryRemainder > 0) {
+                tabletsNum = tabletsNum + 1;
+                --tabletsNumPerQueryRemainder;
+            }
+            ArrayList<Long> tablets = new 
ArrayList<>(tabletIdList.subList(start, start + tabletsNum));
+            start += tabletsNum;
+            TableRef tblRef = new TableRef(this.tableRef.getName(), 
this.tableRef.getAlias(), null, tablets,
+                    this.tableRef.getTableSample(), 
this.tableRef.getCommonHints());
+            ArrayList<TableRef> tableRefList = Lists.newArrayList();
+            tableRefList.add(tblRef);
+            tableRefListPerQuery.add(tableRefList);
+        }
+        return tableRefListPerQuery;
     }
 
     private Map<String, String> convertOutfileProperties() {
@@ -378,7 +473,7 @@ public class ExportJob implements Writable {
     }
 
     public List<String> getPartitions() {
-        return partitions;
+        return partitionNames;
     }
 
     public int getProgress() {
@@ -434,6 +529,14 @@ public class ExportJob implements Writable {
         this.doExportingThread = isExportingThread;
     }
 
+    public synchronized void setStmtExecutor(int idx, StmtExecutor executor) {
+        this.stmtExecutorList.set(idx, executor);
+    }
+
+    public synchronized StmtExecutor getStmtExecutor(int idx) {
+        return this.stmtExecutorList.get(idx);
+    }
+
     public List<TScanRangeLocations> getTabletLocations() {
         return tabletLocations;
     }
@@ -470,6 +573,14 @@ public class ExportJob implements Writable {
         if (msg != null) {
             failMsg = new ExportFailMsg(type, msg);
         }
+
+        // maybe user cancel this job
+        if (task != null && state == JobState.EXPORTING && stmtExecutorList != 
null) {
+            for (int idx = 0; idx < stmtExecutorList.size(); ++idx) {
+                stmtExecutorList.get(idx).cancel();
+            }
+        }
+
         if (updateState(ExportJob.JobState.CANCELLED, false)) {
             // release snapshot
             // Status releaseSnapshotStatus = releaseSnapshotPaths();
@@ -500,7 +611,6 @@ public class ExportJob implements Writable {
         if (isFinalState() || (isReplay && newState == JobState.EXPORTING)) {
             return false;
         }
-        ExportJob.JobState oldState = state;
         state = newState;
         switch (newState) {
             case PENDING:
@@ -514,16 +624,16 @@ public class ExportJob implements Writable {
                 }
                 break;
             case FINISHED:
+                if (!isReplay) {
+                    finishTimeMs = System.currentTimeMillis();
+                }
+                progress = 100;
+                break;
             case CANCELLED:
                 // if isReplay == true, finishTimeMs will be read from log
                 if (!isReplay) {
                     finishTimeMs = System.currentTimeMillis();
-                    // maybe user cancel this job
-                    if (task != null && oldState == JobState.EXPORTING && 
task.getStmtExecutor() != null) {
-                        task.getStmtExecutor().cancel();
-                    }
                 }
-                progress = 100;
                 break;
             default:
                 Preconditions.checkState(false, "wrong job state: " + 
newState.name());
@@ -540,78 +650,6 @@ public class ExportJob implements Writable {
         return this.state == ExportJob.JobState.CANCELLED || this.state == 
ExportJob.JobState.FINISHED;
     }
 
-    private Status makeSnapshots() {
-        List<TScanRangeLocations> tabletLocations = getTabletLocations();
-        if (tabletLocations == null) {
-            return Status.OK;
-        }
-        for (TScanRangeLocations tablet : tabletLocations) {
-            TScanRange scanRange = tablet.getScanRange();
-            if (!scanRange.isSetPaloScanRange()) {
-                continue;
-            }
-            TPaloScanRange paloScanRange = scanRange.getPaloScanRange();
-            List<TScanRangeLocation> locations = tablet.getLocations();
-            for (TScanRangeLocation location : locations) {
-                TNetworkAddress address = location.getServer();
-                String host = address.getHostname();
-                int port = address.getPort();
-                Backend backend = 
Env.getCurrentSystemInfo().getBackendWithBePort(host, port);
-                if (backend == null) {
-                    return Status.CANCELLED;
-                }
-                long backendId = backend.getId();
-                if 
(!Env.getCurrentSystemInfo().checkBackendQueryAvailable(backendId)) {
-                    return Status.CANCELLED;
-                }
-                TSnapshotRequest snapshotRequest = new TSnapshotRequest();
-                snapshotRequest.setTabletId(paloScanRange.getTabletId());
-                
snapshotRequest.setSchemaHash(Integer.parseInt(paloScanRange.getSchemaHash()));
-                
snapshotRequest.setVersion(Long.parseLong(paloScanRange.getVersion()));
-                snapshotRequest.setTimeout(getTimeoutSecond());
-                
snapshotRequest.setPreferredSnapshotVersion(TypesConstants.TPREFER_SNAPSHOT_REQ_VERSION);
-
-                AgentClient client = new AgentClient(host, port);
-                TAgentResult result = client.makeSnapshot(snapshotRequest);
-                if (result == null || result.getStatus().getStatusCode() != 
TStatusCode.OK) {
-                    String err = "snapshot for tablet " + 
paloScanRange.getTabletId() + " failed on backend "
-                            + address.toString() + ". reason: "
-                            + (result == null ? "unknown" : 
result.getStatus().error_msgs);
-                    LOG.warn("{}, export job: {}", err, id);
-                    return new Status(TStatusCode.CANCELLED, err);
-                }
-                addSnapshotPath(Pair.of(address, result.getSnapshotPath()));
-            }
-        }
-        return Status.OK;
-    }
-
-    public Status releaseSnapshotPaths() {
-        List<Pair<TNetworkAddress, String>> snapshotPaths = getSnapshotPaths();
-        LOG.debug("snapshotPaths:{}", snapshotPaths);
-        for (Pair<TNetworkAddress, String> snapshotPath : snapshotPaths) {
-            TNetworkAddress address = snapshotPath.first;
-            String host = address.getHostname();
-            int port = address.getPort();
-            Backend backend = 
Env.getCurrentSystemInfo().getBackendWithBePort(host, port);
-            if (backend == null) {
-                continue;
-            }
-            long backendId = backend.getId();
-            if 
(!Env.getCurrentSystemInfo().checkBackendQueryAvailable(backendId)) {
-                continue;
-            }
-
-            AgentClient client = new AgentClient(host, port);
-            TAgentResult result = client.releaseSnapshot(snapshotPath.second);
-            if (result == null || result.getStatus().getStatusCode() != 
TStatusCode.OK) {
-                continue;
-            }
-        }
-        snapshotPaths.clear();
-        return Status.OK;
-    }
-
     public boolean isExpired(long curTime) {
         return (curTime - createTimeMs) / 1000 > 
Config.history_job_keep_max_second
                 && (state == ExportJob.JobState.CANCELLED || state == 
ExportJob.JobState.FINISHED);
@@ -633,7 +671,7 @@ public class ExportJob implements Writable {
                 + ", tableId=" + tableId
                 + ", state=" + state
                 + ", path=" + exportPath
-                + ", partitions=(" + StringUtils.join(partitions, ",") + ")"
+                + ", partitions=(" + StringUtils.join(partitionNames, ",") + 
")"
                 + ", progress=" + progress
                 + ", createTimeMs=" + TimeUtils.longToTimeString(createTimeMs)
                 + ", exportStartTimeMs=" + 
TimeUtils.longToTimeString(startTimeMs)
@@ -691,11 +729,11 @@ public class ExportJob implements Writable {
         }
         boolean hasPartition = in.readBoolean();
         if (hasPartition) {
-            partitions = Lists.newArrayList();
+            partitionNames = Lists.newArrayList();
             int partitionSize = in.readInt();
             for (int i = 0; i < partitionSize; ++i) {
                 String partitionName = Text.readString(in);
-                partitions.add(partitionName);
+                partitionNames.add(partitionName);
             }
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index d5289e0c32..68d2153556 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -364,11 +364,12 @@ public class EditLog {
                     exportMgr.replayCreateExportJob(job);
                     break;
                 }
-                case OperationType.OP_EXPORT_UPDATE_STATE:
+                case OperationType.OP_EXPORT_UPDATE_STATE: {
                     ExportJob.StateTransfer op = (ExportJob.StateTransfer) 
journal.getData();
                     ExportMgr exportMgr = env.getExportMgr();
                     exportMgr.replayUpdateJobState(op);
                     break;
+                }
                 case OperationType.OP_FINISH_DELETE: {
                     DeleteInfo info = (DeleteInfo) journal.getData();
                     DeleteHandler deleteHandler = env.getDeleteHandler();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
index 9f96bd689b..48f3ce609e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
@@ -18,12 +18,20 @@
 package org.apache.doris.task;
 
 import org.apache.doris.analysis.OutFileClause;
-import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.SelectStmt;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.load.ExportFailMsg;
+import org.apache.doris.load.ExportFailMsg.CancelType;
 import org.apache.doris.load.ExportJob;
 import org.apache.doris.load.ExportJob.JobState;
+import org.apache.doris.load.ExportJob.OutfileInfo;
 import org.apache.doris.qe.AutoCloseConnectContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
@@ -38,20 +46,49 @@ import org.apache.logging.log4j.Logger;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 
 public class ExportExportingTask extends MasterTask {
     private static final Logger LOG = 
LogManager.getLogger(ExportExportingTask.class);
 
     protected final ExportJob job;
-    private StmtExecutor stmtExecutor;
+
+    ThreadPoolExecutor exportExecPool = 
ThreadPoolManager.newDaemonCacheThreadPool(
+            Config.maximum_parallelism_of_export_job, "exporting-pool-", 
false);
 
     public ExportExportingTask(ExportJob job) {
         this.job = job;
         this.signature = job.getId();
     }
 
-    public StmtExecutor getStmtExecutor() {
-        return stmtExecutor;
+    private class ExportResult {
+        private boolean isFailed;
+
+        private ExportFailMsg failMsg;
+
+        private ExportJob.OutfileInfo outfileInfo;
+
+        public ExportResult(boolean isFailed, ExportFailMsg failMsg, 
ExportJob.OutfileInfo outfileInfo) {
+            this.isFailed = isFailed;
+            this.failMsg = failMsg;
+            this.outfileInfo = outfileInfo;
+        }
+
+
+        public boolean isFailed() {
+            return isFailed;
+        }
+
+        public ExportFailMsg getFailMsg() {
+            return failMsg;
+        }
+
+        public OutfileInfo getOutfileInfo() {
+            return outfileInfo;
+        }
     }
 
     @Override
@@ -73,49 +110,110 @@ public class ExportExportingTask extends MasterTask {
             job.setDoExportingThread(Thread.currentThread());
         }
 
-        List<QueryStmt> selectStmtList = job.getSelectStmtList();
-        boolean isFailed = false;
-        ExportFailMsg errorMsg = null;
+        List<SelectStmt> selectStmtList = job.getSelectStmtList();
         int completeTaskNum = 0;
         List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList();
+
+        int parallelNum = selectStmtList.size();
+        CompletionService<ExportResult> completionService = new 
ExecutorCompletionService<>(exportExecPool);
+
         // begin exporting
-        for (int i = 0; i < selectStmtList.size(); ++i) {
-            // maybe user cancelled this job
-            if (job.getState() != JobState.EXPORTING) {
-                isFailed = true;
-                break;
-            }
-            try (AutoCloseConnectContext r = buildConnectContext()) {
-                this.stmtExecutor = new StmtExecutor(r.connectContext, 
selectStmtList.get(i));
-                this.stmtExecutor.execute();
-                if (r.connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
-                    errorMsg = new 
ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
-                            r.connectContext.getState().getErrorMessage());
+        for (int i = 0; i < parallelNum; ++i) {
+            final int idx = i;
+            completionService.submit(() -> {
+                // maybe user cancelled this job
+                if (job.getState() != JobState.EXPORTING) {
+                    return new ExportResult(true, null, null);
+                }
+                try {
+                    Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
+                            job.getTableName().getDb());
+                    OlapTable table = 
db.getOlapTableOrAnalysisException(job.getTableName().getTbl());
+                    table.readLock();
+                    try {
+                        SelectStmt selectStmt = selectStmtList.get(idx);
+                        List<Long> tabletIds = 
selectStmt.getTableRefs().get(0).getSampleTabletIds();
+                        for (Long tabletId : tabletIds) {
+                            TabletMeta tabletMeta = 
Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(
+                                    tabletId);
+                            Partition partition = 
table.getPartition(tabletMeta.getPartitionId());
+                            long nowVersion = partition.getVisibleVersion();
+                            long oldVersion = 
job.getPartitionToVersion().get(partition.getName());
+                            if (nowVersion != oldVersion) {
+                                LOG.warn("Tablet {} has changed version, old 
version = {}, now version = {}",
+                                        tabletId, oldVersion, nowVersion);
+                                return new ExportResult(true, new 
ExportFailMsg(
+                                        ExportFailMsg.CancelType.RUN_FAIL,
+                                        "Tablet {" + tabletId + "} has 
changed"), null);
+                            }
+                        }
+                    } finally {
+                        table.readUnlock();
+                    }
+                } catch (AnalysisException e) {
+                    return new ExportResult(true,
+                            new 
ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, e.getMessage()), null);
+                }
+                try (AutoCloseConnectContext r = buildConnectContext()) {
+                    StmtExecutor stmtExecutor = new 
StmtExecutor(r.connectContext, selectStmtList.get(idx));
+                    job.setStmtExecutor(idx, stmtExecutor);
+                    stmtExecutor.execute();
+                    if (r.connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
+                        return new ExportResult(true, new 
ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
+                                
r.connectContext.getState().getErrorMessage()), null);
+                    }
+                    ExportJob.OutfileInfo outfileInfo = 
getOutFileInfo(r.connectContext.getResultAttachedInfo());
+                    return new ExportResult(false, null, outfileInfo);
+                } catch (Exception e) {
+                    return new ExportResult(true, new 
ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
+                            e.getMessage()),
+                            null);
+                } finally {
+                    job.getStmtExecutor(idx).addProfileToSpan();
+                }
+            });
+        }
+
+        Boolean isFailed = false;
+        ExportFailMsg failMsg = new ExportFailMsg();
+        try {
+            for (int i = 0; i < parallelNum; ++i) {
+                Future<ExportResult> future = completionService.take();
+                ExportResult result = future.get();
+                if (!result.isFailed) {
+                    outfileInfoList.add(result.getOutfileInfo());
+                    ++completeTaskNum;
+                    int progress = completeTaskNum * 100 / 
selectStmtList.size();
+                    if (progress >= 100) {
+                        progress = 99;
+                    }
+                    job.setProgress(progress);
+                    LOG.info("Export Job {} finished {} outfile export and 
it's progress is {}%", job.getId(),
+                            completeTaskNum, progress);
+                } else {
                     isFailed = true;
+                    failMsg.setCancelType(result.failMsg.getCancelType());
+                    failMsg.setMsg(result.failMsg.getMsg());
+                    LOG.warn("Exporting task failed because: {}", 
result.failMsg.getMsg());
                     break;
                 }
-                ExportJob.OutfileInfo outfileInfo = 
getOutFileInfo(r.connectContext.getResultAttachedInfo());
-                outfileInfoList.add(outfileInfo);
-                ++completeTaskNum;
-            } catch (Exception e) {
-                errorMsg = new 
ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
-                isFailed = true;
-                break;
-            } finally {
-                this.stmtExecutor.addProfileToSpan();
             }
+        } catch (Exception e) {
+            isFailed = true;
+            failMsg.setCancelType(CancelType.RUN_FAIL);
+            failMsg.setMsg(e.getMessage());
+        } finally {
+            // cancel all executor
+            if (isFailed) {
+                for (int idx = 0; idx < parallelNum; ++idx) {
+                    job.getStmtExecutor(idx).cancel();
+                }
+            }
+            exportExecPool.shutdownNow();
         }
 
-        int progress = completeTaskNum * 100 / selectStmtList.size();
-        if (progress >= 100) {
-            progress = 99;
-        }
-        job.setProgress(progress);
-        LOG.info("Exporting task progress is {}%, export job: {}", progress, 
job.getId());
-
         if (isFailed) {
-            job.cancel(errorMsg.getCancelType(), errorMsg.getMsg());
-            LOG.warn("Exporting task failed because Exception: {}", 
errorMsg.getMsg());
+            job.cancel(failMsg.getCancelType(), failMsg.getMsg());
             return;
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to