This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 27cf5a667f0 [enhancement](export) filter empty partition before export
table to remote storage (#35389) (#35542)
27cf5a667f0 is described below
commit 27cf5a667f0b6a49bb9f6a50750bd285e68e19f5
Author: caiconghui <[email protected]>
AuthorDate: Tue May 28 18:11:12 2024 +0800
[enhancement](export) filter empty partition before export table to remote
storage (#35389) (#35542)
## Proposed changes
Linked PR : #35389
<!--Describe your changes.-->
## Further comments
If this is a relatively large or complex change, kick off the discussion
at [[email protected]](mailto:[email protected]) by explaining why
you chose the solution you did and what alternatives you considered,
etc...
---
.../main/java/org/apache/doris/load/ExportJob.java | 32 ++-
.../export_p0/test_export_empty_table.groovy | 269 +--------------------
2 files changed, 28 insertions(+), 273 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index d0ccf23ae0a..b072ed2f2fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -407,6 +407,13 @@ public class ExportJob implements Writable {
ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts,
this);
jobExecutorList.add(executor);
}
+
+ // add empty task to make export job could be finished finally if
jobExecutorList is empty
+ // which means that export table without data
+ if (jobExecutorList.isEmpty()) {
+ ExportTaskExecutor executor = new
ExportTaskExecutor(Lists.newArrayList(), this);
+ jobExecutorList.add(executor);
+ }
}
/**
@@ -511,15 +518,23 @@ public class ExportJob implements Writable {
// get partitions
// user specifies partitions, already checked in ExportCommand
if (!this.partitionNames.isEmpty()) {
- this.partitionNames.forEach(partitionName ->
partitions.add(table.getPartition(partitionName)));
+ this.partitionNames.forEach(partitionName -> {
+ Partition partition = table.getPartition(partitionName);
+ if (partition.hasData()) {
+ partitions.add(partition);
+ }
+ });
} else {
- if (table.getPartitions().size() >
Config.maximum_number_of_export_partitions) {
- throw new UserException("The partitions number of this
export job is larger than the maximum number"
- + " of partitions allowed by a export job");
- }
- partitions.addAll(table.getPartitions());
+ table.getPartitions().forEach(partition -> {
+ if (partition.hasData()) {
+ partitions.add(partition);
+ }
+ });
+ }
+ if (partitions.size() >
Config.maximum_number_of_export_partitions) {
+ throw new UserException("The partitions number of this export
job is larger than the maximum number"
+ + " of partitions allowed by a export job");
}
-
// get tablets
for (Partition partition : partitions) {
// Partition data consistency is not need to verify partition
version.
@@ -589,8 +604,7 @@ public class ExportJob implements Writable {
List<Long> tabletsList = new
ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum));
List<List<Long>> tablets = new ArrayList<>();
for (int i = 0; i < tabletsList.size(); i +=
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
- int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT <
tabletsList.size()
- ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT :
tabletsList.size();
+ int end = Math.min(i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT,
tabletsList.size());
tablets.add(new ArrayList<>(tabletsList.subList(i, end)));
}
diff --git a/regression-test/suites/export_p0/test_export_empty_table.groovy
b/regression-test/suites/export_p0/test_export_empty_table.groovy
index f70ff97b38b..584c65d73bc 100644
--- a/regression-test/suites/export_p0/test_export_empty_table.groovy
+++ b/regression-test/suites/export_p0/test_export_empty_table.groovy
@@ -143,62 +143,8 @@ suite("test_export_empty_table", "p0") {
waiting_export.call(label)
// check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
- // check data correctness
- sql """ DROP TABLE IF EXISTS ${table_load_name} """
- sql """
- CREATE TABLE IF NOT EXISTS ${table_load_name} (
- `user_id` INT NOT NULL COMMENT "用户id",
- `date` DATE NOT NULL COMMENT "数据灌入日期时间",
- `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
- `city` VARCHAR(20) COMMENT "用户所在城市",
- `age` SMALLINT COMMENT "用户年龄",
- `sex` TINYINT COMMENT "用户性别",
- `bool_col` boolean COMMENT "",
- `int_col` int COMMENT "",
- `bigint_col` bigint COMMENT "",
- `float_col` float COMMENT "",
- `double_col` double COMMENT "",
- `char_col` CHAR(10) COMMENT "",
- `decimal_col` decimal COMMENT "",
- `ipv4_col` ipv4 COMMENT "",
- `ipv6_col` ipv6 COMMENT ""
- )
- DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
- """
-
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'columns', 'user_id, date, datetime, city, age, sex, bool_col,
int_col, bigint_col, float_col, double_col, char_col, decimal_col'
- set 'strict_mode', 'true'
- set 'format', 'csv'
- set 'column_separator', ','
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(0, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
- }
-
- sql """ sync; """
-
- qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
-
+ check_file_amounts.call("${outFilePath}", 0)
} finally {
- try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}
@@ -222,59 +168,8 @@ suite("test_export_empty_table", "p0") {
waiting_export.call(label)
// check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
- // check data correctness
- sql """ DROP TABLE IF EXISTS ${table_load_name} """
- sql """
- CREATE TABLE IF NOT EXISTS ${table_load_name} (
- `user_id` INT NOT NULL COMMENT "用户id",
- `date` DATE NOT NULL COMMENT "数据灌入日期时间",
- `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
- `city` VARCHAR(20) COMMENT "用户所在城市",
- `age` SMALLINT COMMENT "用户年龄",
- `sex` TINYINT COMMENT "用户性别",
- `bool_col` boolean COMMENT "",
- `int_col` int COMMENT "",
- `bigint_col` bigint COMMENT "",
- `float_col` float COMMENT "",
- `double_col` double COMMENT "",
- `char_col` CHAR(10) COMMENT "",
- `decimal_col` decimal COMMENT "",
- `ipv4_col` ipv4 COMMENT "",
- `ipv6_col` ipv6 COMMENT ""
- )
- DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
- """
-
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'columns', 'user_id, date, datetime, city, age, sex, bool_col,
int_col, bigint_col, float_col, double_col, char_col, decimal_col'
- set 'strict_mode', 'true'
- set 'format', 'parquet'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(0, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
- }
-
- qt_select_load2 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
-
+ check_file_amounts.call("${outFilePath}", 0)
} finally {
- try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}
@@ -297,59 +192,8 @@ suite("test_export_empty_table", "p0") {
waiting_export.call(label)
// check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
- // check data correctness
- sql """ DROP TABLE IF EXISTS ${table_load_name} """
- sql """
- CREATE TABLE IF NOT EXISTS ${table_load_name} (
- `user_id` INT NOT NULL COMMENT "用户id",
- `date` DATE NOT NULL COMMENT "数据灌入日期时间",
- `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
- `city` VARCHAR(20) COMMENT "用户所在城市",
- `age` SMALLINT COMMENT "用户年龄",
- `sex` TINYINT COMMENT "用户性别",
- `bool_col` boolean COMMENT "",
- `int_col` int COMMENT "",
- `bigint_col` bigint COMMENT "",
- `float_col` float COMMENT "",
- `double_col` double COMMENT "",
- `char_col` CHAR(10) COMMENT "",
- `decimal_col` decimal COMMENT "",
- `ipv4_col` ipv4 COMMENT "",
- `ipv6_col` ipv6 COMMENT ""
- )
- DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
- """
-
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'columns', 'user_id, date, datetime, city, age, sex, bool_col,
int_col, bigint_col, float_col, double_col, char_col, decimal_col'
- set 'strict_mode', 'true'
- set 'format', 'orc'
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(0, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
- }
-
- qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
-
+ check_file_amounts.call("${outFilePath}", 0)
} finally {
- try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}
@@ -373,58 +217,7 @@ suite("test_export_empty_table", "p0") {
waiting_export.call(label)
// check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
- // check data correctness
- sql """ DROP TABLE IF EXISTS ${table_load_name} """
- sql """
- CREATE TABLE IF NOT EXISTS ${table_load_name} (
- `user_id` INT NOT NULL COMMENT "用户id",
- `date` DATE NOT NULL COMMENT "数据灌入日期时间",
- `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
- `city` VARCHAR(20) COMMENT "用户所在城市",
- `age` SMALLINT COMMENT "用户年龄",
- `sex` TINYINT COMMENT "用户性别",
- `bool_col` boolean COMMENT "",
- `int_col` int COMMENT "",
- `bigint_col` bigint COMMENT "",
- `float_col` float COMMENT "",
- `double_col` double COMMENT "",
- `char_col` CHAR(10) COMMENT "",
- `decimal_col` decimal COMMENT "",
- `ipv4_col` ipv4 COMMENT "",
- `ipv6_col` ipv6 COMMENT ""
- )
- DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
- """
-
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'columns', 'user_id, date, datetime, city, age, sex, bool_col,
int_col, bigint_col, float_col, double_col, char_col, decimal_col'
- set 'strict_mode', 'true'
- set 'format', 'csv_with_names'
- set 'column_separator', ','
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(0, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
- }
-
- qt_select_load4 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
-
+ check_file_amounts.call("${outFilePath}", 0)
} finally {
try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
@@ -451,60 +244,8 @@ suite("test_export_empty_table", "p0") {
waiting_export.call(label)
// check file amounts
- check_file_amounts.call("${outFilePath}", 1)
-
- // check data correctness
- sql """ DROP TABLE IF EXISTS ${table_load_name} """
- sql """
- CREATE TABLE IF NOT EXISTS ${table_load_name} (
- `user_id` INT NOT NULL COMMENT "用户id",
- `date` DATE NOT NULL COMMENT "数据灌入日期时间",
- `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
- `city` VARCHAR(20) COMMENT "用户所在城市",
- `age` SMALLINT COMMENT "用户年龄",
- `sex` TINYINT COMMENT "用户性别",
- `bool_col` boolean COMMENT "",
- `int_col` int COMMENT "",
- `bigint_col` bigint COMMENT "",
- `float_col` float COMMENT "",
- `double_col` double COMMENT "",
- `char_col` CHAR(10) COMMENT "",
- `decimal_col` decimal COMMENT "",
- `ipv4_col` ipv4 COMMENT "",
- `ipv6_col` ipv6 COMMENT ""
- )
- DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
- """
-
- File[] files = new File("${outFilePath}").listFiles()
- String file_path = files[0].getAbsolutePath()
- streamLoad {
- table "${table_load_name}"
-
- set 'columns', 'user_id, date, datetime, city, age, sex, bool_col,
int_col, bigint_col, float_col, double_col, char_col, decimal_col'
- set 'strict_mode', 'true'
- set 'format', 'csv_with_names_and_types'
- set 'column_separator', ','
-
- file "${file_path}"
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(0, json.NumberTotalRows)
- assertEquals(0, json.NumberFilteredRows)
- }
- }
-
- qt_select_load5 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
-
+ check_file_amounts.call("${outFilePath}", 0)
} finally {
- try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]