This is an automated email from the ASF dual-hosted git repository.
lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6cbbc36 [Export] Expand function of export stmt (#5445)
6cbbc36 is described below
commit 6cbbc36ea1df6842f838fea0ee37d1f879c3e885
Author: EmmyMiao87 <[email protected]>
AuthorDate: Thu Mar 11 20:43:32 2021 +0800
[Export] Expand function of export stmt (#5445)
1. Support where clause in export stmt which only export selected rows.
The syntax is following:
Export table [table name]
where [expr]
To xxx
xxxx
It will filter table rows.
Only rows that meet the where condition can be exported.
2. Support utf8 separator
3. Support export to local
The syntax is following:
Export table [table name]
To (file:///xxx/xx/xx)
If user export rows to local, the broker properties is not requried.
User only need to create a local folder to store data, and fill in the path
of the folder starting with file://
Change-Id: Ib7e7ece5accb3e359a67310b0bf006d42cd3f6f5
---
docs/en/administrator-guide/export-manual.md | 5 +-
.../sql-statements/Data Manipulation/EXPORT.md | 93 +++++++++-------
docs/zh-CN/administrator-guide/export-manual.md | 5 +-
.../sql-statements/Data Manipulation/EXPORT.md | 31 ++++--
fe/fe-core/src/main/cup/sql_parser.cup | 3 +-
.../java/org/apache/doris/analysis/BrokerDesc.java | 2 +-
.../java/org/apache/doris/analysis/ExportStmt.java | 34 ++++--
.../java/org/apache/doris/analysis/LoadStmt.java | 5 +-
.../org/apache/doris/analysis/OutFileClause.java | 1 +
.../org/apache/doris/analysis/StorageBackend.java | 2 +-
.../org/apache/doris/common/FeMetaVersion.java | 4 +-
.../main/java/org/apache/doris/load/ExportJob.java | 124 ++++++++++++++++++++-
.../main/java/org/apache/doris/load/ExportMgr.java | 8 +-
13 files changed, 240 insertions(+), 77 deletions(-)
diff --git a/docs/en/administrator-guide/export-manual.md
b/docs/en/administrator-guide/export-manual.md
index d4bac90..79c2680 100644
--- a/docs/en/administrator-guide/export-manual.md
+++ b/docs/en/administrator-guide/export-manual.md
@@ -102,6 +102,7 @@ Export's detailed commands can be passed through `HELP
EXPORT;` Examples are as
```
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
+[WHERE [expr]]
TO "bos://bj-test-cmy/export/"
PROPERTIES
(
@@ -116,8 +117,8 @@ WITH BROKER "hdfs"
);
```
-* `column_separator`: Column separator. The default is `\t`.
-* `line_delimiter`: Line separator. The default is `\n`.
+* `column_separator`: Column separator. The default is `\t`. Supports
invisible characters, such as'\x07'.
+* `line_delimiter`: Line separator. The default is `\n`. Supports invisible
characters, such as'\x07'.
* `exec_mem_limit`: Represents the memory usage limitation of a query plan on
a single BE in an Export job. Default 2GB. Unit bytes.
* `timeout`: homework timeout. Default 2 hours. Unit seconds.
* `tablet_num_per_task`: The maximum number of fragments allocated per query
plan. The default is 5.
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/EXPORT.md
b/docs/en/sql-reference/sql-statements/Data Manipulation/EXPORT.md
index f29a6bf..6da41ec 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/EXPORT.md
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/EXPORT.md
@@ -25,59 +25,72 @@ under the License.
-->
# EXPORT
-Description
+## Description
-This statement is used to export data from a specified table to a specified
location.
-This function is implemented by broker process. For different purpose storage
systems, different brokers need to be deployed. Deployed brokers can be viewed
through SHOW BROKER.
-This is an asynchronous operation, which returns if the task is submitted
successfully. After execution, you can use the SHOW EXPORT command to view
progress.
+ This statement is used to export data from a specified table to a
specified location.
+ This function is implemented by broker process. For different purpose
storage systems, different brokers need to be deployed. Deployed brokers can be
viewed through SHOW BROKER.
+ This is an asynchronous operation, which returns if the task is submitted
successfully. After execution, you can use the SHOW EXPORT command to view
progress.
-Grammar:
-EXPORT TABLE table_name
-[PARTITION (p1 [,p2]]
-TO export_path
-[opt_properties]
-broker;
+ Grammar:
+ EXPORT TABLE table_name
+ [PARTITION (p1 [,p2]]
+ [WHERE [expr]]
+ TO export_path
+ [opt_properties]
+ [broker];
-1. table_name
-The table names to be exported currently support the export of tables with
engine as OLAP and mysql.
+ 1. table_name
+ The table names to be exported currently support the export of tables
with engine as OLAP and mysql.
-2. partition
-You can export only certain specified partitions of the specified table
+ 2. partition
+ You can export only certain specified partitions of the specified table
-3. export_path
-The exported path needs to be a directory. At present, it can't be exported to
local, so it needs to be exported to broker.
+ 3. expr
+ Export rows that meet the where condition, optional. If you leave it
blank, all rows are exported by default.
-4. opt_properties
-Used to specify some special parameters.
-Grammar:
-[PROPERTIES ("key"="value", ...)]
+ 4. export_path
+ The exported path needs to be a directory. At present, it can't be
exported to local, so it needs to be exported to broker.
-The following parameters can be specified:
-Column_separator: Specifies the exported column separator, defaulting to t.
-Line_delimiter: Specifies the exported line separator, defaulting to\n.
-Exc_mem_limit: Exports the upper limit of memory usage for a single BE node,
defaulting to 2GB in bytes.
-Timeout: The time-out for importing jobs is 1 day by default, in seconds.
-Tablet_num_per_task: The maximum number of tablets that each subtask can
allocate.
+ 5. opt_properties
+ Used to specify some special parameters.
+ Grammar:
+ [PROPERTIES ("key"="value", ...)]
-Five. debris
-Broker used to specify export usage
-Grammar:
-WITH BROKER broker_name ("key"="value"[,...])
-Here you need to specify the specific broker name and the required broker
attributes
+ The following parameters can be specified:
+ column_separator: Specifies the exported column separator,
defaulting to t. Supports invisible characters, such as'\x07'.
+ line_delimiter: Specifies the exported line separator, defaulting
to\n. Supports invisible characters, such as'\x07'.
+ exec_mem_limit: Exports the upper limit of memory usage for a single
BE node, defaulting to 2GB in bytes.
+ timeout: The time-out for importing jobs is 1 day by default, in
seconds.
+ tablet_num_per_task: The maximum number of tablets that each subtask
can allocate.
-For brokers corresponding to different storage systems, the input parameters
are different. Specific parameters can be referred to: `help broker load',
broker required properties.
+ 6. broker
+ Broker used to specify export usage
+ Grammar:
+ WITH BROKER broker_name ("key"="value"[,...])
+ Here you need to specify the specific broker name and the required
broker attributes
-'35;'35; example
+ For brokers corresponding to different storage systems, the input
parameters are different. Specific parameters can be referred to: `help broker
load', broker required properties.
+ When exporting to local, you do not need to fill in this part.
-1. Export all data from the testTbl table to HDFS
-EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER
"broker_name" ("username"="xxx", "password"="yyy");
+## example
-2. Export partitions P1 and P2 from the testTbl table to HDFS
+ 1. Export all data from the testTbl table to HDFS
+ EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER
"broker_name" ("username"="xxx", "password"="yyy");
-EXPORT TABLE testTbl PARTITION (p1,p2) TO "hdfs://hdfs_host:port/a/b/c" WITH
BROKER "broker_name" ("username"="xxx", "password"="yyy");
-3. Export all data in the testTbl table to hdfs, using "," as column separator
+ 2. Export partitions P1 and P2 from the testTbl table to HDFS
+ EXPORT TABLE testTbl PARTITION (p1,p2) TO "hdfs://hdfs_host:port/a/b/c"
WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
-EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES
("column_separator"=",") WITH BROKER "broker_name" ("username"="xxx",
"password"="yyy");
+ 3. Export all data in the testTbl table to hdfs, using "," as column
separator
+ EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES
("column_separator"=",") WITH BROKER "broker_name" ("username"="xxx",
"password"="yyy");
+
+ 4. Export the row meet condition k1 = 1 in the testTbl table to hdfs.
+ EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WHERE k1=1 WITH
BROKER "broker_name" ("username"="xxx", "password"="yyy");
+
+ 5. Export all data in the testTbl table to the local.
+ EXPORT TABLE testTbl TO "file:///home/data/a";
+
+ 6. Export all data in the testTbl table to hdfs, using the invisible
character "\x07" as the column and row separator.
+ EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES
("column_separator"="\\x07", "line_delimiter" = "\\x07") WITH BROKER
"broker_name" ("username"="xxx", "password"="yyy")
## keyword
-EXPORT
+ EXPORT
diff --git a/docs/zh-CN/administrator-guide/export-manual.md
b/docs/zh-CN/administrator-guide/export-manual.md
index 9725c59..64306ce 100644
--- a/docs/zh-CN/administrator-guide/export-manual.md
+++ b/docs/zh-CN/administrator-guide/export-manual.md
@@ -106,6 +106,7 @@ Export 的详细命令可以通过 `HELP EXPORT;` 。举例如下:
```
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
+[WHERE [expr]]
TO "hdfs://host/path/to/export/"
PROPERTIES
(
@@ -120,8 +121,8 @@ WITH BROKER "hdfs"
);
```
-* `column_separator`:列分隔符。默认为 `\t`。
-* `line_delimiter`:行分隔符。默认为 `\n`。
+* `column_separator`:列分隔符。默认为 `\t`。支持不可见字符,比如 '\x07'。
+* `line_delimiter`:行分隔符。默认为 `\n`。支持不可见字符,比如 '\x07'。
* `exec_mem_limit`: 表示 Export 作业中,一个查询计划在单个 BE 上的内存使用限制。默认 2GB。单位字节。
* `timeout`:作业超时时间。默认 2小时。单位秒。
* `tablet_num_per_task`:每个查询计划分配的最大分片数。默认为 5。
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data
Manipulation/EXPORT.md b/docs/zh-CN/sql-reference/sql-statements/Data
Manipulation/EXPORT.md
index 44814a4..f41be07 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/EXPORT.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/EXPORT.md
@@ -34,9 +34,10 @@ under the License.
语法:
EXPORT TABLE table_name
[PARTITION (p1[,p2])]
+ [WHERE [expr]]
TO export_path
[opt_properties]
- broker;
+ [broker];
1. table_name
当前要导出的表的表名,目前支持engine为olap和mysql的表的导出。
@@ -44,28 +45,32 @@ under the License.
2. partition
可以只导出指定表的某些指定分区
- 3. export_path
- 导出的路径,需为目录。目前不能导出到本地,需要导出到broker。
+ 3. expr
+ 导出满足 where 条件的行,选填。不填则默认导出所有行。
- 4. opt_properties
+ 4. export_path
+ 导出的路径,需为目录。支持导出到本地,hdfs,百度bos,s3协议的其他存储系统。
+
+ 5. opt_properties
用于指定一些特殊参数。
语法:
[PROPERTIES ("key"="value", ...)]
可以指定如下参数:
- column_separator: 指定导出的列分隔符,默认为\t。
- line_delimiter: 指定导出的行分隔符,默认为\n。
+ column_separator: 指定导出的列分隔符,默认为\t。支持不可见字符,比如 '\x07'。
+ line_delimiter: 指定导出的行分隔符,默认为\n。支持不可见字符,比如 '\x07'。
exec_mem_limit: 导出在单个 BE 节点的内存使用上限,默认为 2GB,单位为字节。
timeout:导入作业的超时时间,默认为1天,单位是秒。
tablet_num_per_task:每个子任务能分配的最大 Tablet 数量。
- 5. broker
+ 6. broker
用于指定导出使用的broker
语法:
WITH BROKER broker_name ("key"="value"[,...])
这里需要指定具体的broker name, 以及所需的broker属性
对于不同存储系统对应的 broker,这里需要输入的参数不同。具体参数可以参阅:`help broker load` 中 broker 所需属性。
+ 导出到本地时,不需要填写这部分。
## example
@@ -73,11 +78,19 @@ under the License.
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER
"broker_name" ("username"="xxx", "password"="yyy");
2. 将 testTbl 表中的分区p1,p2导出到 hdfs 上
-
EXPORT TABLE testTbl PARTITION (p1,p2) TO
"hdfs://hdfs_host:port/a/b/c" WITH BROKER "broker_name" ("username"="xxx",
"password"="yyy");
+
3. 将 testTbl 表中的所有数据导出到 hdfs 上,以","作为列分隔符
-
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES
("column_separator"=",") WITH BROKER "broker_name" ("username"="xxx",
"password"="yyy");
+
+ 4. 将 testTbl 表中 k1 = 1 的行导出到 hdfs 上。
+ EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WHERE k1=1 WITH
BROKER "broker_name" ("username"="xxx", "password"="yyy");
+
+ 5. 将 testTbl 表中的所有数据导出到本地。
+ EXPORT TABLE testTbl TO "file:///home/data/a";
+
+ 6. 将 testTbl 表中的所有数据导出到 hdfs 上,以不可见字符 "\x07" 作为列或者行分隔符。
+ EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES
("column_separator"="\\x07", "line_delimiter" = "\\x07") WITH BROKER
"broker_name" ("username"="xxx", "password"="yyy")
## keyword
EXPORT
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index c611932..615b945 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -1297,11 +1297,12 @@ help_stmt ::=
// Export statement
export_stmt ::=
KW_EXPORT KW_TABLE base_table_ref:tblRef
+ where_clause:whereExpr
KW_TO STRING_LITERAL:path
opt_properties:properties
opt_broker:broker
{:
- RESULT = new ExportStmt(tblRef, path, properties, broker);
+ RESULT = new ExportStmt(tblRef, whereExpr, path, properties, broker);
:}
;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
index 22526c4..f474be9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
@@ -90,7 +90,7 @@ public class BrokerDesc extends StorageDesc implements
Writable {
}
public TFileType getFileType() {
- if (isMultiLoadBroker()) {
+ if (storageType == StorageBackend.StorageType.LOCAL) {
return TFileType.FILE_LOCAL;
}
if (storageType == StorageBackend.StorageType.BROKER) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index a3ff5a8..160aaf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -63,17 +63,19 @@ public class ExportStmt extends StatementBase {
private TableName tblName;
private List<String> partitions;
- private final String path;
- private final BrokerDesc brokerDesc;
+ private Expr whereExpr;
+ private String path;
+ private BrokerDesc brokerDesc;
private Map<String, String> properties = Maps.newHashMap();
private String columnSeparator;
private String lineDelimiter;
private TableRef tableRef;
- public ExportStmt(TableRef tableRef, String path,
+ public ExportStmt(TableRef tableRef, Expr whereExpr, String path,
Map<String, String> properties, BrokerDesc brokerDesc) {
this.tableRef = tableRef;
+ this.whereExpr = whereExpr;
this.path = path.trim();
if (properties != null) {
this.properties = properties;
@@ -91,6 +93,10 @@ public class ExportStmt extends StatementBase {
return partitions;
}
+ public Expr getWhereExpr() {
+ return whereExpr;
+ }
+
public String getPath() {
return path;
}
@@ -144,7 +150,7 @@ public class ExportStmt extends StatementBase {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"EXPORT",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
- tblName.getTbl());
+ tblName.getTbl());
}
// check table && partitions whether exist
@@ -152,11 +158,13 @@ public class ExportStmt extends StatementBase {
// check broker whether exist
if (brokerDesc == null) {
- throw new AnalysisException("broker is not provided");
+ brokerDesc = new BrokerDesc("local",
StorageBackend.StorageType.LOCAL, null);
}
+ // where expr will be checked in export job
+
// check path is valid
- checkPath(path, brokerDesc.getStorageType());
+ path = checkPath(path, brokerDesc.getStorageType());
if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
if
(!analyzer.getCatalog().getBrokerMgr().containsBroker(brokerDesc.getName())) {
throw new AnalysisException("broker " + brokerDesc.getName() +
" does not exist");
@@ -217,7 +225,7 @@ public class ExportStmt extends StatementBase {
}
}
- public static void checkPath(String path, StorageBackend.StorageType type)
throws AnalysisException {
+ public static String checkPath(String path, StorageBackend.StorageType
type) throws AnalysisException {
if (Strings.isNullOrEmpty(path)) {
throw new AnalysisException("No dest path specified.");
}
@@ -240,18 +248,22 @@ public class ExportStmt extends StatementBase {
}
} else if (type == StorageBackend.StorageType.LOCAL) {
if (schema != null && !schema.equalsIgnoreCase("file")) {
- throw new AnalysisException("Invalid export path. please
use valid 'file://' path.");
+ throw new AnalysisException("Invalid export path. please
use valid '"
+ + OutFileClause.LOCAL_FILE_PREFIX + "' path.");
}
+ path = path.substring(OutFileClause.LOCAL_FILE_PREFIX.length()
- 1);
}
} catch (URISyntaxException e) {
throw new AnalysisException("Invalid path format. " +
e.getMessage());
}
+ return path;
}
private void checkProperties(Map<String, String> properties) throws
UserException {
- this.columnSeparator = PropertyAnalyzer.analyzeColumnSeparator(
- properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR);
- this.lineDelimiter = PropertyAnalyzer.analyzeLineDelimiter(properties,
ExportStmt.DEFAULT_LINE_DELIMITER);
+ this.columnSeparator =
Separator.convertSeparator(PropertyAnalyzer.analyzeColumnSeparator(
+ properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR));
+ this.lineDelimiter =
Separator.convertSeparator(PropertyAnalyzer.analyzeLineDelimiter(
+ properties, ExportStmt.DEFAULT_LINE_DELIMITER));
// exec_mem_limit
if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index 8de1adb..d8dfee5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -321,9 +321,8 @@ public class LoadStmt extends DdlStmt {
for (int i = 0; i < dataDescription.getFilePaths().size();
i++) {
dataDescription.getFilePaths().set(i,
brokerDesc.convertPathToS3(dataDescription.getFilePaths().get(i)));
- }
- for (String path : dataDescription.getFilePaths()) {
- ExportStmt.checkPath(path, brokerDesc.getStorageType());
+ dataDescription.getFilePaths().set(i,
+
ExportStmt.checkPath(dataDescription.getFilePaths().get(i),
brokerDesc.getStorageType()));
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 1f67167..778ec5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -39,6 +39,7 @@ import java.util.stream.Collectors;
public class OutFileClause {
private static final Logger LOG =
LogManager.getLogger(OutFileClause.class);
+ public static final String LOCAL_FILE_PREFIX = "file:///";
private static final String BROKER_PROP_PREFIX = "broker.";
private static final String PROP_BROKER_NAME = "broker.name";
private static final String PROP_COLUMN_SEPARATOR = "column_separator";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 921c727..e73a90e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -92,7 +92,7 @@ public class StorageBackend extends StorageDesc implements
ParseNode {
if (Strings.isNullOrEmpty(location)) {
throw new AnalysisException("You must specify a location on the
repository");
}
- ExportStmt.checkPath(location, storageType);
+ location = ExportStmt.checkPath(location, storageType);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index ba17874..ad480e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -204,6 +204,8 @@ public final class FeMetaVersion {
public static final int VERSION_95 = 95;
// support delete without partition
public static final int VERSION_96 = 96;
+ // persist orig stmt of export job
+ public static final int VERSION_97 = 97;
// note: when increment meta version, should assign the latest version to
VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_96;
+ public static final int VERSION_CURRENT = VERSION_97;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index bf96b85..89513cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -23,10 +23,14 @@ import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.ExportStmt;
import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
@@ -47,6 +51,7 @@ import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.ExportSink;
@@ -57,10 +62,15 @@ import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.system.Backend;
import org.apache.doris.task.AgentClient;
import org.apache.doris.thrift.TAgentResult;
+import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
@@ -80,6 +90,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
@@ -107,6 +118,7 @@ public class ExportJob implements Writable {
private String clusterName;
private long tableId;
private BrokerDesc brokerDesc;
+ private Expr whereExpr;
private String exportPath;
private String columnSeparator;
private String lineDelimiter;
@@ -150,6 +162,11 @@ public class ExportJob implements Writable {
// backend_address => snapshot path
private List<Pair<TNetworkAddress, String>> snapshotPaths =
Lists.newArrayList();
+ // this is the origin stmt of ExportStmt, we use it to persist where expr
of Export job,
+ // because we can not serialize the Expressions contained in job.
+ private OriginStatement origStmt;
+ protected Map<String, String> sessionVariables = Maps.newHashMap();
+
public ExportJob() {
this.id = -1;
this.dbId = -1;
@@ -188,6 +205,7 @@ public class ExportJob implements Writable {
String path = stmt.getPath();
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
+ this.whereExpr = stmt.getWhereExpr();
this.exportPath = path;
this.partitions = stmt.getPartitions();
@@ -208,6 +226,13 @@ public class ExportJob implements Writable {
}
this.sql = stmt.toSql();
+ this.origStmt = stmt.getOrigStmt();
+ if (ConnectContext.get() != null) {
+ SessionVariable var = ConnectContext.get().getSessionVariable();
+ this.sessionVariables.put(SessionVariable.SQL_MODE,
Long.toString(var.getSqlMode()));
+ } else {
+ this.sessionVariables.put(SessionVariable.SQL_MODE,
String.valueOf(SqlModeHelper.MODE_DEFAULT));
+ }
}
private void genExecFragment() throws UserException {
@@ -246,14 +271,19 @@ public class ExportJob implements Writable {
List<PlanFragment> fragments = Lists.newArrayList();
List<ScanNode> scanNodes = Lists.newArrayList();
- ScanNode scanNode = genScanNode();
- tabletLocations = scanNode.getScanRangeLocations(0);
- if (tabletLocations == null) {
+ // analyze where expr
+ analyzeWhereExpr();
+ // only for
+ if (exportTable.getType() != Table.TableType.OLAP) {
// not olap scan node
+ ScanNode scanNode = genScanNode();
PlanFragment fragment = genPlanFragment(exportTable.getType(),
scanNode);
scanNodes.add(scanNode);
fragments.add(fragment);
} else {
+ // The function of this scan node is only to get the
tabletlocation.
+ ScanNode tmpOlapScanNode = genScanNode();
+ tabletLocations = tmpOlapScanNode.getScanRangeLocations(0);
for (TScanRangeLocations tablet : tabletLocations) {
List<TScanRangeLocation> locations = tablet.getLocations();
Collections.shuffle(locations);
@@ -278,9 +308,47 @@ public class ExportJob implements Writable {
size, id, fragments.size());
}
+ // add conjunct
+ if (whereExpr != null) {
+ for (ScanNode scanNode: scanNodes) {
+ scanNode.addConjuncts(whereExpr.getConjuncts());
+ }
+ }
+
genCoordinators(fragments, scanNodes);
}
+ private void analyzeWhereExpr() throws UserException {
+ if (whereExpr == null) {
+ return;
+ }
+ whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer);
+
+ // analyze where slot ref
+ Map<String, SlotDescriptor> dstDescMap =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ for (SlotDescriptor slotDescriptor : exportTupleDesc.getSlots()) {
+ dstDescMap.put(slotDescriptor.getColumn().getName(),
slotDescriptor);
+ }
+ List<SlotRef> slots = Lists.newArrayList();
+ whereExpr.collect(SlotRef.class, slots);
+ ExprSubstitutionMap smap = new ExprSubstitutionMap();
+ for (SlotRef slot : slots) {
+ SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName());
+ if (slotDesc == null) {
+ throw new UserException("unknown column reference in where
statement, reference="
+ + slot.getColumnName());
+ }
+ smap.getLhs().add(slot);
+ smap.getRhs().add(new SlotRef(slotDesc));
+ }
+ whereExpr = whereExpr.clone(smap);
+
+ whereExpr.analyze(analyzer);
+ if (!whereExpr.getType().equals(Type.BOOLEAN)) {
+ throw new UserException("where statement is not a valid statement
return bool");
+ }
+ }
+
private ScanNode genScanNode() throws UserException {
ScanNode scanNode = null;
switch (exportTable.getType()) {
@@ -391,6 +459,10 @@ public class ExportJob implements Writable {
return this.tableId;
}
+ public Expr getWhereExpr() {
+ return whereExpr;
+ }
+
public JobState getState() {
return state;
}
@@ -407,6 +479,16 @@ public class ExportJob implements Writable {
return exportPath;
}
+ public String getShowExportPath() {
+ if (brokerDesc.getFileType() == TFileType.FILE_LOCAL) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(OutFileClause.LOCAL_FILE_PREFIX.substring(0,
OutFileClause.LOCAL_FILE_PREFIX.length() - 1));
+ sb.append(exportPath);
+ return sb.toString();
+ }
+ return exportPath;
+ }
+
public String getColumnSeparator() {
return this.columnSeparator;
}
@@ -631,8 +713,14 @@ public class ExportJob implements Writable {
out.writeBoolean(true);
brokerDesc.write(out);
}
-
tableName.write(out);
+
+ origStmt.write(out);
+ out.writeInt(sessionVariables.size());
+ for (Map.Entry<String, String> entry : sessionVariables.entrySet()) {
+ Text.writeString(out, entry.getKey());
+ Text.writeString(out, entry.getValue());
+ }
}
public void readFields(DataInput in) throws IOException {
@@ -680,6 +768,34 @@ public class ExportJob implements Writable {
} else {
tableName = new TableName("DUMMY", "DUMMY");
}
+
+ if (Catalog.getCurrentCatalogJournalVersion() <
FeMetaVersion.VERSION_97) {
+ origStmt = new OriginStatement("", 0);
+ // old version of export does not have sqlmode, set it to default
+ sessionVariables.put(SessionVariable.SQL_MODE,
String.valueOf(SqlModeHelper.MODE_DEFAULT));
+ return;
+ }
+ origStmt = OriginStatement.read(in);
+ int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ String key = Text.readString(in);
+ String value = Text.readString(in);
+ sessionVariables.put(key, value);
+ }
+
+ if (origStmt.originStmt.isEmpty()) {
+ return;
+ }
+ // parse the origin stmt to get where expr
+ SqlParser parser = new SqlParser(new SqlScanner(new
StringReader(origStmt.originStmt),
+ Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
+ ExportStmt stmt = null;
+ try {
+ stmt = (ExportStmt) SqlParserUtils.getStmt(parser, origStmt.idx);
+ this.whereExpr = stmt.getWhereExpr();
+ } catch (Exception e) {
+ throw new IOException("error happens when parsing create routine
load stmt: " + origStmt, e);
+ }
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 533e22f..efb90f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -31,7 +31,6 @@ import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.gson.Gson;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -46,6 +45,8 @@ import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+import com.google.gson.Gson;
+
public class ExportMgr {
private static final Logger LOG = LogManager.getLogger(ExportJob.class);
@@ -184,6 +185,9 @@ public class ExportMgr {
}
infoMap.put("db", job.getTableName().getDb());
infoMap.put("tbl", job.getTableName().getTbl());
+ if (job.getWhereExpr() != null) {
+ infoMap.put("where expr", job.getWhereExpr().toMySql());
+ }
infoMap.put("partitions", partitions);
infoMap.put("broker", job.getBrokerDesc().getName());
infoMap.put("column separator", job.getColumnSeparator());
@@ -193,7 +197,7 @@ public class ExportMgr {
infoMap.put("tablet num", job.getTabletLocations() == null ?
-1 : job.getTabletLocations().size());
jobInfo.add(new Gson().toJson(infoMap));
// path
- jobInfo.add(job.getExportPath());
+ jobInfo.add(job.getShowExportPath());
jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs()));
jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs()));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]