This is an automated email from the ASF dual-hosted git repository.

lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cbbc36  [Export] Expand function of export stmt (#5445)
6cbbc36 is described below

commit 6cbbc36ea1df6842f838fea0ee37d1f879c3e885
Author: EmmyMiao87 <[email protected]>
AuthorDate: Thu Mar 11 20:43:32 2021 +0800

    [Export] Expand function of export stmt (#5445)
    
    1. Support where clause in export stmt which only export selected rows.
    
    The syntax is following:
    
    Export table [table name]
        where [expr]
    To xxx
    xxxx
    
    It will filter table rows.
    Only rows that meet the where condition can be exported.
    
    2. Support utf8 separator
    
    3. Support export to local
    
    The syntax is following:
    
    Export table [table name]
    To (file:///xxx/xx/xx)
    
    If user export rows to local, the broker properties is not requried.
    User only need to create a local folder to store data, and fill in the path 
of the folder starting with file://
    
    Change-Id: Ib7e7ece5accb3e359a67310b0bf006d42cd3f6f5
---
 docs/en/administrator-guide/export-manual.md       |   5 +-
 .../sql-statements/Data Manipulation/EXPORT.md     |  93 +++++++++-------
 docs/zh-CN/administrator-guide/export-manual.md    |   5 +-
 .../sql-statements/Data Manipulation/EXPORT.md     |  31 ++++--
 fe/fe-core/src/main/cup/sql_parser.cup             |   3 +-
 .../java/org/apache/doris/analysis/BrokerDesc.java |   2 +-
 .../java/org/apache/doris/analysis/ExportStmt.java |  34 ++++--
 .../java/org/apache/doris/analysis/LoadStmt.java   |   5 +-
 .../org/apache/doris/analysis/OutFileClause.java   |   1 +
 .../org/apache/doris/analysis/StorageBackend.java  |   2 +-
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../main/java/org/apache/doris/load/ExportJob.java | 124 ++++++++++++++++++++-
 .../main/java/org/apache/doris/load/ExportMgr.java |   8 +-
 13 files changed, 240 insertions(+), 77 deletions(-)

diff --git a/docs/en/administrator-guide/export-manual.md 
b/docs/en/administrator-guide/export-manual.md
index d4bac90..79c2680 100644
--- a/docs/en/administrator-guide/export-manual.md
+++ b/docs/en/administrator-guide/export-manual.md
@@ -102,6 +102,7 @@ Export's detailed commands can be passed through `HELP 
EXPORT;` Examples are as
 ```
 EXPORT TABLE db1.tbl1 
 PARTITION (p1,p2)
+[WHERE [expr]]
 TO "bos://bj-test-cmy/export/" 
 PROPERTIES
 (
@@ -116,8 +117,8 @@ WITH BROKER "hdfs"
 );
 ```
 
-* `column_separator`: Column separator. The default is `\t`.
-* `line_delimiter`: Line separator. The default is `\n`.
+* `column_separator`: Column separator. The default is `\t`. Supports 
invisible characters, such as'\x07'.
+* `line_delimiter`: Line separator. The default is `\n`. Supports invisible 
characters, such as'\x07'.
 * `exec_mem_limit`: Represents the memory usage limitation of a query plan on 
a single BE in an Export job. Default 2GB. Unit bytes.
 * `timeout`: homework timeout. Default 2 hours. Unit seconds.
 * `tablet_num_per_task`: The maximum number of fragments allocated per query 
plan. The default is 5.
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/EXPORT.md 
b/docs/en/sql-reference/sql-statements/Data Manipulation/EXPORT.md
index f29a6bf..6da41ec 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/EXPORT.md  
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/EXPORT.md  
@@ -25,59 +25,72 @@ under the License.
 -->
 
 # EXPORT
-Description
+## Description
 
-This statement is used to export data from a specified table to a specified 
location.
-This function is implemented by broker process. For different purpose storage 
systems, different brokers need to be deployed. Deployed brokers can be viewed 
through SHOW BROKER.
-This is an asynchronous operation, which returns if the task is submitted 
successfully. After execution, you can use the SHOW EXPORT command to view 
progress.
+    This statement is used to export data from a specified table to a 
specified location.
+    This function is implemented by broker process. For different purpose 
storage systems, different brokers need to be deployed. Deployed brokers can be 
viewed through SHOW BROKER.
+    This is an asynchronous operation, which returns if the task is submitted 
successfully. After execution, you can use the SHOW EXPORT command to view 
progress.
 
-Grammar:
-EXPORT TABLE table_name
-[PARTITION (p1 [,p2]]
-TO export_path
-[opt_properties]
-broker;
+    Grammar:
+        EXPORT TABLE table_name
+        [PARTITION (p1 [,p2]]
+        [WHERE [expr]]
+        TO export_path
+        [opt_properties]
+        [broker];
 
-1. table_name
-The table names to be exported currently support the export of tables with 
engine as OLAP and mysql.
+    1. table_name
+       The table names to be exported currently support the export of tables 
with engine as OLAP and mysql.
 
-2. partition
-You can export only certain specified partitions of the specified table
+    2. partition
+       You can export only certain specified partitions of the specified table
 
-3. export_path
-The exported path needs to be a directory. At present, it can't be exported to 
local, so it needs to be exported to broker.
+    3. expr
+       Export rows that meet the where condition, optional. If you leave it 
blank, all rows are exported by default. 
 
-4. opt_properties
-Used to specify some special parameters.
-Grammar:
-[PROPERTIES ("key"="value", ...)]
+    4. export_path
+       The exported path needs to be a directory. At present, it can't be 
exported to local, so it needs to be exported to broker.
 
-The following parameters can be specified:
-Column_separator: Specifies the exported column separator, defaulting to t.
-Line_delimiter: Specifies the exported line separator, defaulting to\n.
-Exc_mem_limit: Exports the upper limit of memory usage for a single BE node, 
defaulting to 2GB in bytes.
-Timeout: The time-out for importing jobs is 1 day by default, in seconds.
-Tablet_num_per_task: The maximum number of tablets that each subtask can 
allocate.
+    5. opt_properties
+       Used to specify some special parameters.
+       Grammar:
+       [PROPERTIES ("key"="value", ...)]
 
-Five. debris
-Broker used to specify export usage
-Grammar:
-WITH BROKER broker_name ("key"="value"[,...])
-Here you need to specify the specific broker name and the required broker 
attributes
+        The following parameters can be specified:
+          column_separator: Specifies the exported column separator, 
defaulting to t. Supports invisible characters, such as'\x07'.
+          line_delimiter: Specifies the exported line separator, defaulting 
to\n. Supports invisible characters, such as'\x07'.
+          exec_mem_limit: Exports the upper limit of memory usage for a single 
BE node, defaulting to 2GB in bytes.
+          timeout: The time-out for importing jobs is 1 day by default, in 
seconds.
+          tablet_num_per_task: The maximum number of tablets that each subtask 
can allocate.
 
-For brokers corresponding to different storage systems, the input parameters 
are different. Specific parameters can be referred to: `help broker load', 
broker required properties.
+     6. broker
+        Broker used to specify export usage
+          Grammar:
+          WITH BROKER broker_name ("key"="value"[,...])
+          Here you need to specify the specific broker name and the required 
broker attributes
 
-'35;'35; example
+        For brokers corresponding to different storage systems, the input 
parameters are different. Specific parameters can be referred to: `help broker 
load', broker required properties.
+        When exporting to local, you do not need to fill in this part.
 
-1. Export all data from the testTbl table to HDFS
-EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER 
"broker_name" ("username"="xxx", "password"="yyy");
+## example
 
-2. Export partitions P1 and P2 from the testTbl table to HDFS
+    1. Export all data from the testTbl table to HDFS
+       EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER 
"broker_name" ("username"="xxx", "password"="yyy");
 
-EXPORT TABLE testTbl PARTITION (p1,p2) TO "hdfs://hdfs_host:port/a/b/c" WITH 
BROKER "broker_name" ("username"="xxx", "password"="yyy");
-3. Export all data in the testTbl table to hdfs, using "," as column separator
+    2. Export partitions P1 and P2 from the testTbl table to HDFS
+       EXPORT TABLE testTbl PARTITION (p1,p2) TO "hdfs://hdfs_host:port/a/b/c" 
WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
 
-EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES 
("column_separator"=",") WITH BROKER "broker_name" ("username"="xxx", 
"password"="yyy");
+    3. Export all data in the testTbl table to hdfs, using "," as column 
separator
+       EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES 
("column_separator"=",") WITH BROKER "broker_name" ("username"="xxx", 
"password"="yyy");
+
+    4. Export the row meet condition k1 = 1 in the testTbl table to hdfs.
+       EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WHERE k1=1 WITH 
BROKER "broker_name" ("username"="xxx", "password"="yyy");
+
+    5. Export all data in the testTbl table to the local.
+       EXPORT TABLE testTbl TO "file:///home/data/a";
+
+    6. Export all data in the testTbl table to hdfs, using the invisible 
character "\x07" as the column and row separator. 
+       EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES 
("column_separator"="\\x07", "line_delimiter" = "\\x07") WITH BROKER 
"broker_name" ("username"="xxx", "password"="yyy")
 
 ## keyword
-EXPORT
+    EXPORT
diff --git a/docs/zh-CN/administrator-guide/export-manual.md 
b/docs/zh-CN/administrator-guide/export-manual.md
index 9725c59..64306ce 100644
--- a/docs/zh-CN/administrator-guide/export-manual.md
+++ b/docs/zh-CN/administrator-guide/export-manual.md
@@ -106,6 +106,7 @@ Export 的详细命令可以通过 `HELP EXPORT;` 。举例如下:
 ```
 EXPORT TABLE db1.tbl1 
 PARTITION (p1,p2)
+[WHERE [expr]]
 TO "hdfs://host/path/to/export/" 
 PROPERTIES
 (
@@ -120,8 +121,8 @@ WITH BROKER "hdfs"
 );
 ```
 
-* `column_separator`:列分隔符。默认为 `\t`。
-* `line_delimiter`:行分隔符。默认为 `\n`。
+* `column_separator`:列分隔符。默认为 `\t`。支持不可见字符,比如 '\x07'。
+* `line_delimiter`:行分隔符。默认为 `\n`。支持不可见字符,比如 '\x07'。
 * `exec_mem_limit`: 表示 Export 作业中,一个查询计划在单个 BE 上的内存使用限制。默认 2GB。单位字节。
 * `timeout`:作业超时时间。默认 2小时。单位秒。
 * `tablet_num_per_task`:每个查询计划分配的最大分片数。默认为 5。
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data 
Manipulation/EXPORT.md b/docs/zh-CN/sql-reference/sql-statements/Data 
Manipulation/EXPORT.md
index 44814a4..f41be07 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/EXPORT.md       
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/EXPORT.md       
@@ -34,9 +34,10 @@ under the License.
     语法:
         EXPORT TABLE table_name
         [PARTITION (p1[,p2])]
+        [WHERE [expr]]
         TO export_path
         [opt_properties]
-        broker;
+        [broker];
 
     1. table_name
       当前要导出的表的表名,目前支持engine为olap和mysql的表的导出。
@@ -44,28 +45,32 @@ under the License.
     2. partition
       可以只导出指定表的某些指定分区
 
-    3. export_path
-      导出的路径,需为目录。目前不能导出到本地,需要导出到broker。
+    3. expr
+      导出满足 where 条件的行,选填。不填则默认导出所有行。
 
-    4. opt_properties
+    4. export_path
+      导出的路径,需为目录。支持导出到本地,hdfs,百度bos,s3协议的其他存储系统。
+
+    5. opt_properties
       用于指定一些特殊参数。
           语法:
           [PROPERTIES ("key"="value", ...)]
         
           可以指定如下参数:
-            column_separator: 指定导出的列分隔符,默认为\t。
-            line_delimiter: 指定导出的行分隔符,默认为\n。
+            column_separator: 指定导出的列分隔符,默认为\t。支持不可见字符,比如 '\x07'。
+            line_delimiter: 指定导出的行分隔符,默认为\n。支持不可见字符,比如 '\x07'。
             exec_mem_limit: 导出在单个 BE 节点的内存使用上限,默认为 2GB,单位为字节。
             timeout:导入作业的超时时间,默认为1天,单位是秒。
             tablet_num_per_task:每个子任务能分配的最大 Tablet 数量。
 
-    5. broker
+    6. broker
       用于指定导出使用的broker
           语法:
           WITH BROKER broker_name ("key"="value"[,...])
           这里需要指定具体的broker name, 以及所需的broker属性
 
       对于不同存储系统对应的 broker,这里需要输入的参数不同。具体参数可以参阅:`help broker load` 中 broker 所需属性。
+      导出到本地时,不需要填写这部分。
 
 ## example
 
@@ -73,11 +78,19 @@ under the License.
         EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER 
"broker_name" ("username"="xxx", "password"="yyy");
 
     2. 将 testTbl 表中的分区p1,p2导出到 hdfs 上
-
         EXPORT TABLE testTbl PARTITION (p1,p2) TO 
"hdfs://hdfs_host:port/a/b/c" WITH BROKER "broker_name" ("username"="xxx", 
"password"="yyy");
+    
     3. 将 testTbl 表中的所有数据导出到 hdfs 上,以","作为列分隔符
-
         EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES 
("column_separator"=",") WITH BROKER "broker_name" ("username"="xxx", 
"password"="yyy");
+    
+    4. 将 testTbl 表中 k1 = 1 的行导出到 hdfs 上。
+        EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WHERE k1=1 WITH 
BROKER "broker_name" ("username"="xxx", "password"="yyy");
+
+    5. 将 testTbl 表中的所有数据导出到本地。
+        EXPORT TABLE testTbl TO "file:///home/data/a";
+
+    6. 将 testTbl 表中的所有数据导出到 hdfs 上,以不可见字符 "\x07" 作为列或者行分隔符。
+        EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES 
("column_separator"="\\x07", "line_delimiter" = "\\x07") WITH BROKER 
"broker_name" ("username"="xxx", "password"="yyy")
 
 ## keyword
     EXPORT
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index c611932..615b945 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -1297,11 +1297,12 @@ help_stmt ::=
 // Export statement
 export_stmt ::=
     KW_EXPORT KW_TABLE base_table_ref:tblRef
+    where_clause:whereExpr
     KW_TO STRING_LITERAL:path
     opt_properties:properties
     opt_broker:broker
     {:
-        RESULT = new ExportStmt(tblRef, path, properties, broker);
+        RESULT = new ExportStmt(tblRef, whereExpr, path, properties, broker);
     :}
     ;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
index 22526c4..f474be9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
@@ -90,7 +90,7 @@ public class BrokerDesc extends StorageDesc implements 
Writable {
     }
 
     public TFileType getFileType() {
-        if (isMultiLoadBroker()) {
+        if (storageType == StorageBackend.StorageType.LOCAL) {
             return TFileType.FILE_LOCAL;
         }
         if (storageType == StorageBackend.StorageType.BROKER) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index a3ff5a8..160aaf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -63,17 +63,19 @@ public class ExportStmt extends StatementBase {
 
     private TableName tblName;
     private List<String> partitions;
-    private final String path;
-    private final BrokerDesc brokerDesc;
+    private Expr whereExpr;
+    private String path;
+    private BrokerDesc brokerDesc;
     private Map<String, String> properties = Maps.newHashMap();
     private String columnSeparator;
     private String lineDelimiter;
 
     private TableRef tableRef;
 
-    public ExportStmt(TableRef tableRef, String path,
+    public ExportStmt(TableRef tableRef, Expr whereExpr, String path,
                       Map<String, String> properties, BrokerDesc brokerDesc) {
         this.tableRef = tableRef;
+        this.whereExpr = whereExpr;
         this.path = path.trim();
         if (properties != null) {
             this.properties = properties;
@@ -91,6 +93,10 @@ public class ExportStmt extends StatementBase {
         return partitions;
     }
 
+    public Expr getWhereExpr() {
+        return whereExpr;
+    }
+
     public String getPath() {
         return path;
     }
@@ -144,7 +150,7 @@ public class ExportStmt extends StatementBase {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"EXPORT",
                                                 
ConnectContext.get().getQualifiedUser(),
                                                 
ConnectContext.get().getRemoteIP(),
-                                                tblName.getTbl());
+                    tblName.getTbl());
         }
 
         // check table && partitions whether exist
@@ -152,11 +158,13 @@ public class ExportStmt extends StatementBase {
 
         // check broker whether exist
         if (brokerDesc == null) {
-            throw new AnalysisException("broker is not provided");
+            brokerDesc = new BrokerDesc("local", 
StorageBackend.StorageType.LOCAL, null);
         }
 
+        // where expr will be checked in export job
+
         // check path is valid
-        checkPath(path, brokerDesc.getStorageType());
+        path = checkPath(path, brokerDesc.getStorageType());
         if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
             if 
(!analyzer.getCatalog().getBrokerMgr().containsBroker(brokerDesc.getName())) {
                 throw new AnalysisException("broker " + brokerDesc.getName() + 
" does not exist");
@@ -217,7 +225,7 @@ public class ExportStmt extends StatementBase {
         }
     }
 
-    public static void checkPath(String path, StorageBackend.StorageType type) 
throws AnalysisException {
+    public static String checkPath(String path, StorageBackend.StorageType 
type) throws AnalysisException {
         if (Strings.isNullOrEmpty(path)) {
             throw new AnalysisException("No dest path specified.");
         }
@@ -240,18 +248,22 @@ public class ExportStmt extends StatementBase {
                 }
             } else if (type == StorageBackend.StorageType.LOCAL) {
                 if (schema != null && !schema.equalsIgnoreCase("file")) {
-                    throw new AnalysisException("Invalid export path. please 
use valid 'file://' path.");
+                    throw new AnalysisException("Invalid export path. please 
use valid '"
+                            + OutFileClause.LOCAL_FILE_PREFIX + "' path.");
                 }
+                path = path.substring(OutFileClause.LOCAL_FILE_PREFIX.length() 
- 1);
             }
         } catch (URISyntaxException e) {
             throw new AnalysisException("Invalid path format. " + 
e.getMessage());
         }
+        return path;
     }
 
     private void checkProperties(Map<String, String> properties) throws 
UserException {
-        this.columnSeparator = PropertyAnalyzer.analyzeColumnSeparator(
-                properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR);
-        this.lineDelimiter = PropertyAnalyzer.analyzeLineDelimiter(properties, 
ExportStmt.DEFAULT_LINE_DELIMITER);
+        this.columnSeparator = 
Separator.convertSeparator(PropertyAnalyzer.analyzeColumnSeparator(
+                properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR));
+        this.lineDelimiter = 
Separator.convertSeparator(PropertyAnalyzer.analyzeLineDelimiter(
+                properties, ExportStmt.DEFAULT_LINE_DELIMITER));
         // exec_mem_limit
         if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
             try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index 8de1adb..d8dfee5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -321,9 +321,8 @@ public class LoadStmt extends DdlStmt {
                 for (int i = 0; i < dataDescription.getFilePaths().size(); 
i++) {
                     dataDescription.getFilePaths().set(i,
                         
brokerDesc.convertPathToS3(dataDescription.getFilePaths().get(i)));
-                }
-                for (String path : dataDescription.getFilePaths()) {
-                    ExportStmt.checkPath(path, brokerDesc.getStorageType());
+                    dataDescription.getFilePaths().set(i,
+                        
ExportStmt.checkPath(dataDescription.getFilePaths().get(i), 
brokerDesc.getStorageType()));
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 1f67167..778ec5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -39,6 +39,7 @@ import java.util.stream.Collectors;
 public class OutFileClause {
     private static final Logger LOG = 
LogManager.getLogger(OutFileClause.class);
 
+    public static final String LOCAL_FILE_PREFIX = "file:///";
     private static final String BROKER_PROP_PREFIX = "broker.";
     private static final String PROP_BROKER_NAME = "broker.name";
     private static final String PROP_COLUMN_SEPARATOR = "column_separator";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 921c727..e73a90e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -92,7 +92,7 @@ public class StorageBackend extends StorageDesc implements 
ParseNode {
         if (Strings.isNullOrEmpty(location)) {
             throw new AnalysisException("You must specify a location on the 
repository");
         }
-        ExportStmt.checkPath(location, storageType);
+        location = ExportStmt.checkPath(location, storageType);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index ba17874..ad480e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -204,6 +204,8 @@ public final class FeMetaVersion {
     public static final int VERSION_95 = 95;
     // support delete without partition
     public static final int VERSION_96 = 96;
+    // persist orig stmt of export job
+    public static final int VERSION_97 = 97;
     // note: when increment meta version, should assign the latest version to 
VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_96;
+    public static final int VERSION_CURRENT = VERSION_97;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index bf96b85..89513cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -23,10 +23,14 @@ import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.ExportStmt;
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.analysis.OutFileClause;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.SqlScanner;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TableRef;
@@ -47,6 +51,7 @@ import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.planner.DataPartition;
 import org.apache.doris.planner.ExportSink;
@@ -57,10 +62,15 @@ import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.SqlModeHelper;
 import org.apache.doris.system.Backend;
 import org.apache.doris.task.AgentClient;
 import org.apache.doris.thrift.TAgentResult;
+import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
@@ -80,6 +90,7 @@ import org.apache.logging.log4j.Logger;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.StringReader;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
@@ -107,6 +118,7 @@ public class ExportJob implements Writable {
     private String clusterName;
     private long tableId;
     private BrokerDesc brokerDesc;
+    private Expr whereExpr;
     private String exportPath;
     private String columnSeparator;
     private String lineDelimiter;
@@ -150,6 +162,11 @@ public class ExportJob implements Writable {
     // backend_address => snapshot path
     private List<Pair<TNetworkAddress, String>> snapshotPaths = 
Lists.newArrayList();
 
+    // this is the origin stmt of ExportStmt, we use it to persist where expr 
of Export job,
+    // because we can not serialize the Expressions contained in job.
+    private OriginStatement origStmt;
+    protected Map<String, String> sessionVariables = Maps.newHashMap();
+
     public ExportJob() {
         this.id = -1;
         this.dbId = -1;
@@ -188,6 +205,7 @@ public class ExportJob implements Writable {
 
         String path = stmt.getPath();
         Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
+        this.whereExpr = stmt.getWhereExpr();
         this.exportPath = path;
 
         this.partitions = stmt.getPartitions();
@@ -208,6 +226,13 @@ public class ExportJob implements Writable {
         }
 
         this.sql = stmt.toSql();
+        this.origStmt = stmt.getOrigStmt();
+        if (ConnectContext.get() != null) {
+            SessionVariable var = ConnectContext.get().getSessionVariable();
+            this.sessionVariables.put(SessionVariable.SQL_MODE, 
Long.toString(var.getSqlMode()));
+        } else {
+            this.sessionVariables.put(SessionVariable.SQL_MODE, 
String.valueOf(SqlModeHelper.MODE_DEFAULT));
+        }
     }
 
     private void genExecFragment() throws UserException {
@@ -246,14 +271,19 @@ public class ExportJob implements Writable {
         List<PlanFragment> fragments = Lists.newArrayList();
         List<ScanNode> scanNodes = Lists.newArrayList();
 
-        ScanNode scanNode = genScanNode();
-        tabletLocations = scanNode.getScanRangeLocations(0);
-        if (tabletLocations == null) {
+        // analyze where expr
+        analyzeWhereExpr();
+        // only for
+        if (exportTable.getType() != Table.TableType.OLAP) {
             // not olap scan node
+            ScanNode scanNode = genScanNode();
             PlanFragment fragment = genPlanFragment(exportTable.getType(), 
scanNode);
             scanNodes.add(scanNode);
             fragments.add(fragment);
         } else {
+            // The function of this scan node is only to get the 
tabletlocation.
+            ScanNode tmpOlapScanNode = genScanNode();
+            tabletLocations = tmpOlapScanNode.getScanRangeLocations(0);
             for (TScanRangeLocations tablet : tabletLocations) {
                 List<TScanRangeLocation> locations = tablet.getLocations();
                 Collections.shuffle(locations);
@@ -278,9 +308,47 @@ public class ExportJob implements Writable {
                     size, id, fragments.size());
         }
 
+        // add conjunct
+        if (whereExpr != null) {
+            for (ScanNode scanNode: scanNodes) {
+                scanNode.addConjuncts(whereExpr.getConjuncts());
+            }
+        }
+
         genCoordinators(fragments, scanNodes);
     }
 
+    private void analyzeWhereExpr() throws UserException {
+        if (whereExpr == null) {
+            return;
+        }
+        whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer);
+
+        // analyze where slot ref
+        Map<String, SlotDescriptor> dstDescMap = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        for (SlotDescriptor slotDescriptor : exportTupleDesc.getSlots()) {
+            dstDescMap.put(slotDescriptor.getColumn().getName(), 
slotDescriptor);
+        }
+        List<SlotRef> slots = Lists.newArrayList();
+        whereExpr.collect(SlotRef.class, slots);
+        ExprSubstitutionMap smap = new ExprSubstitutionMap();
+        for (SlotRef slot : slots) {
+            SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName());
+            if (slotDesc == null) {
+                throw new UserException("unknown column reference in where 
statement, reference="
+                        + slot.getColumnName());
+            }
+            smap.getLhs().add(slot);
+            smap.getRhs().add(new SlotRef(slotDesc));
+        }
+        whereExpr = whereExpr.clone(smap);
+
+        whereExpr.analyze(analyzer);
+        if (!whereExpr.getType().equals(Type.BOOLEAN)) {
+            throw new UserException("where statement is not a valid statement 
return bool");
+        }
+    }
+
     private ScanNode genScanNode() throws UserException {
         ScanNode scanNode = null;
         switch (exportTable.getType()) {
@@ -391,6 +459,10 @@ public class ExportJob implements Writable {
         return this.tableId;
     }
 
+    public Expr getWhereExpr() {
+        return whereExpr;
+    }
+
     public JobState getState() {
         return state;
     }
@@ -407,6 +479,16 @@ public class ExportJob implements Writable {
         return exportPath;
     }
 
+    public String getShowExportPath() {
+        if (brokerDesc.getFileType() == TFileType.FILE_LOCAL) {
+            StringBuilder sb = new StringBuilder();
+            sb.append(OutFileClause.LOCAL_FILE_PREFIX.substring(0, 
OutFileClause.LOCAL_FILE_PREFIX.length() - 1));
+            sb.append(exportPath);
+            return sb.toString();
+        }
+        return exportPath;
+    }
+
     public String getColumnSeparator() {
         return this.columnSeparator;
     }
@@ -631,8 +713,14 @@ public class ExportJob implements Writable {
             out.writeBoolean(true);
             brokerDesc.write(out);
         }
-
         tableName.write(out);
+
+        origStmt.write(out);
+        out.writeInt(sessionVariables.size());
+        for (Map.Entry<String, String> entry : sessionVariables.entrySet()) {
+            Text.writeString(out, entry.getKey());
+            Text.writeString(out, entry.getValue());
+        }
     }
 
     public void readFields(DataInput in) throws IOException {
@@ -680,6 +768,34 @@ public class ExportJob implements Writable {
         } else {
             tableName = new TableName("DUMMY", "DUMMY");
         }
+
+        if (Catalog.getCurrentCatalogJournalVersion() < 
FeMetaVersion.VERSION_97) {
+            origStmt = new OriginStatement("", 0);
+            // old version of export does not have sqlmode, set it to default
+            sessionVariables.put(SessionVariable.SQL_MODE, 
String.valueOf(SqlModeHelper.MODE_DEFAULT));
+            return;
+        }
+        origStmt = OriginStatement.read(in);
+        int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            String key = Text.readString(in);
+            String value = Text.readString(in);
+            sessionVariables.put(key, value);
+        }
+        
+        if (origStmt.originStmt.isEmpty()) {
+            return;
+        }
+        // parse the origin stmt to get where expr
+        SqlParser parser = new SqlParser(new SqlScanner(new 
StringReader(origStmt.originStmt),
+                Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
+        ExportStmt stmt = null;
+        try {
+            stmt = (ExportStmt) SqlParserUtils.getStmt(parser, origStmt.idx);
+            this.whereExpr = stmt.getWhereExpr();
+        } catch (Exception e) {
+            throw new IOException("error happens when parsing create routine 
load stmt: " + origStmt, e);
+        }
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 533e22f..efb90f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -31,7 +31,6 @@ import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.gson.Gson;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -46,6 +45,8 @@ import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+import com.google.gson.Gson;
+
 public class ExportMgr {
     private static final Logger LOG = LogManager.getLogger(ExportJob.class);
 
@@ -184,6 +185,9 @@ public class ExportMgr {
                 }
                 infoMap.put("db", job.getTableName().getDb());
                 infoMap.put("tbl", job.getTableName().getTbl());
+                if (job.getWhereExpr() != null) {
+                    infoMap.put("where expr", job.getWhereExpr().toMySql());
+                }
                 infoMap.put("partitions", partitions);
                 infoMap.put("broker", job.getBrokerDesc().getName());
                 infoMap.put("column separator", job.getColumnSeparator());
@@ -193,7 +197,7 @@ public class ExportMgr {
                 infoMap.put("tablet num", job.getTabletLocations() == null ? 
-1 : job.getTabletLocations().size());
                 jobInfo.add(new Gson().toJson(infoMap));
                 // path
-                jobInfo.add(job.getExportPath());
+                jobInfo.add(job.getShowExportPath());
 
                 jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs()));
                 jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs()));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to