This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bf2c9a545a8 [improve](export) Support partition data consistency
(#31290)
bf2c9a545a8 is described below
commit bf2c9a545a81f146834b84972e4e4e6a95a71870
Author: walter <[email protected]>
AuthorDate: Thu Feb 29 22:08:29 2024 +0800
[improve](export) Support partition data consistency (#31290)
---
.../java/org/apache/doris/analysis/ExportStmt.java | 15 +
.../main/java/org/apache/doris/load/ExportJob.java | 87 ++++--
.../main/java/org/apache/doris/load/ExportMgr.java | 1 +
.../org/apache/doris/load/ExportTaskExecutor.java | 5 +-
.../trees/plans/commands/ExportCommand.java | 13 +
.../export_p0/test_export_data_consistency.out | 305 +++++++++++++++++++++
.../export_p0/test_export_data_consistency.groovy | 210 ++++++++++++++
7 files changed, 611 insertions(+), 25 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 123bfa727fb..1a7d9dc93b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -63,6 +63,7 @@ import java.util.stream.Collectors;
public class ExportStmt extends StatementBase {
public static final String PARALLELISM = "parallelism";
public static final String LABEL = "label";
+ public static final String DATA_CONSISTENCY = "data_consistency";
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
@@ -72,6 +73,7 @@ public class ExportStmt extends StatementBase {
private static final ImmutableSet<String> PROPERTIES_SET = new
ImmutableSet.Builder<String>()
.add(LABEL)
.add(PARALLELISM)
+ .add(DATA_CONSISTENCY)
.add(LoadStmt.KEY_IN_PARAM_COLUMNS)
.add(OutFileClause.PROP_MAX_FILE_SIZE)
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
@@ -104,6 +106,7 @@ public class ExportStmt extends StatementBase {
private String maxFileSize;
private String deleteExistingFiles;
private String withBom;
+ private String dataConsistency;
private SessionVariable sessionVariables;
private String qualifiedUser;
@@ -230,6 +233,7 @@ public class ExportStmt extends StatementBase {
exportJob.setMaxFileSize(this.maxFileSize);
exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
exportJob.setWithBom(this.withBom);
+ exportJob.setDataConsistency(this.dataConsistency);
if (columns != null) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
@@ -359,6 +363,17 @@ public class ExportStmt extends StatementBase {
// with bom
this.withBom = properties.getOrDefault(OutFileClause.PROP_WITH_BOM,
"false");
+
+ // data consistency
+ String dataConsistencyStr = properties.get(DATA_CONSISTENCY);
+ if (dataConsistencyStr != null) {
+ if
(!dataConsistencyStr.equalsIgnoreCase(ExportJob.CONSISTENT_PARTITION)) {
+ throw new UserException("The value of data_consistency is
invalid, only `partition` is allowed");
+ }
+ this.dataConsistency = ExportJob.CONSISTENT_PARTITION;
+ } else {
+ this.dataConsistency = ExportJob.CONSISTENT_ALL;
+ }
}
private void checkColumns() throws DdlException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 9fb827f6d6a..fe28151a546 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -99,6 +99,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
@Data
public class ExportJob implements Writable {
@@ -108,6 +109,9 @@ public class ExportJob implements Writable {
private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT =
Config.maximum_tablets_of_outfile_in_export;
+ public static final String CONSISTENT_ALL = "all";
+ public static final String CONSISTENT_PARTITION = "partition";
+
@SerializedName("id")
private long id;
@SerializedName("label")
@@ -168,6 +172,8 @@ public class ExportJob implements Writable {
private Integer tabletsNum;
@SerializedName("withBom")
private String withBom;
+ @SerializedName("dataConsistency")
+ private String dataConsistency;
private TableRef tableRef;
@@ -222,6 +228,7 @@ public class ExportJob implements Writable {
this.lineDelimiter = "\n";
this.columns = "";
this.withBom = "false";
+ this.dataConsistency = "all";
}
public ExportJob(long jobId) {
@@ -229,6 +236,10 @@ public class ExportJob implements Writable {
this.id = jobId;
}
+ public boolean isPartitionConsistency() {
+ return dataConsistency != null &&
dataConsistency.equals(CONSISTENT_PARTITION);
+ }
+
public void generateOutfileStatement() throws UserException {
exportTable.readLock();
try {
@@ -302,16 +313,12 @@ public class ExportJob implements Writable {
}
// get all tablets
- List<List<Long>> tabletsListPerParallel = splitTablets();
+ List<List<List<Long>>> tabletsListPerParallel = splitTablets();
// Each Outfile clause responsible for
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT tablets
- for (List<Long> tabletsList : tabletsListPerParallel) {
+ for (List<List<Long>> tabletsList : tabletsListPerParallel) {
List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
- for (int i = 0; i < tabletsList.size(); i +=
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
- int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT <
tabletsList.size()
- ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT :
tabletsList.size();
- List<Long> tabletIds = new ArrayList<>(tabletsList.subList(i,
end));
-
+ for (List<Long> tabletIds : tabletsList) {
// generate LogicalPlan
LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName,
tabletIds,
this.partitionNames, selectLists);
@@ -471,15 +478,12 @@ public class ExportJob implements Writable {
}
private List<List<TableRef>> getTableRefListPerParallel() throws
UserException {
- List<List<Long>> tabletsListPerParallel = splitTablets();
+ List<List<List<Long>>> tabletsListPerParallel = splitTablets();
List<List<TableRef>> tableRefListPerParallel = Lists.newArrayList();
- for (List<Long> tabletsList : tabletsListPerParallel) {
+ for (List<List<Long>> tabletsList : tabletsListPerParallel) {
List<TableRef> tableRefList = Lists.newArrayList();
- for (int i = 0; i < tabletsList.size(); i +=
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
- int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT <
tabletsList.size()
- ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT :
tabletsList.size();
- List<Long> tablets = new ArrayList<>(tabletsList.subList(i,
end));
+ for (List<Long> tablets : tabletsList) {
// Since export does not support the alias, here we pass the
null value.
// we can not use this.tableRef.getAlias(),
// because the constructor of `Tableref` will convert
this.tableRef.getAlias()
@@ -494,11 +498,13 @@ public class ExportJob implements Writable {
return tableRefListPerParallel;
}
- private List<List<Long>> splitTablets() throws UserException {
+ private List<List<List<Long>>> splitTablets() throws UserException {
// get tablets
Database db =
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb());
OlapTable table =
db.getOlapTableOrAnalysisException(this.tableName.getTbl());
- List<Long> tabletIdList = Lists.newArrayList();
+
+ Integer tabletsAllNum = 0;
+ List<List<Long>> tabletIdList = Lists.newArrayList();
table.readLock();
try {
final Collection<Partition> partitions = new
ArrayList<Partition>();
@@ -516,26 +522,56 @@ public class ExportJob implements Writable {
// get tablets
for (Partition partition : partitions) {
- partitionToVersion.put(partition.getName(),
partition.getVisibleVersion());
+ // Partition data consistency is not need to verify partition
version.
+ if (!isPartitionConsistency()) {
+ partitionToVersion.put(partition.getName(),
partition.getVisibleVersion());
+ }
for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
- tabletIdList.addAll(index.getTabletIdsInOrder());
+ List<Long> tablets = index.getTabletIdsInOrder();
+ tabletsAllNum += tablets.size();
+ tabletIdList.add(tablets);
}
}
} finally {
table.readUnlock();
}
+ if (isPartitionConsistency()) {
+ // Assign tablets of a partition to per parallel.
+ int totalPartitions = tabletIdList.size();
+ int numPerParallel = totalPartitions / this.parallelism;
+ int numPerQueryRemainder = totalPartitions - numPerParallel *
this.parallelism;
+ int realParallelism = this.parallelism;
+ if (totalPartitions < this.parallelism) {
+ realParallelism = totalPartitions;
+ LOG.warn("Export Job [{}]: The number of partitions ({}) is
smaller than parallelism ({}), "
+ + "set parallelism to partition num.", id,
totalPartitions, this.parallelism);
+ }
+ int start = 0;
+ List<List<List<Long>>> tabletsListPerParallel = new ArrayList<>();
+ for (int i = 0; i < realParallelism; ++i) {
+ int partitionNum = numPerParallel;
+ if (numPerQueryRemainder > 0) {
+ partitionNum += 1;
+ --numPerQueryRemainder;
+ }
+ List<List<Long>> tablets = new
ArrayList<>(tabletIdList.subList(start, start + partitionNum));
+ start += partitionNum;
+ tabletsListPerParallel.add(tablets);
+ }
+ return tabletsListPerParallel;
+ }
+
/**
* Assign tablets to per parallel, for example:
* If the number of all tablets if 10, and the real parallelism is 4,
* then, the number of tablets of per parallel should be: 3 3 2 2.
*/
- Integer tabletsAllNum = tabletIdList.size();
tabletsNum = tabletsAllNum;
Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism;
Integer tabletsNumPerQueryRemainder = tabletsAllNum -
tabletsNumPerParallel * this.parallelism;
- List<List<Long>> tabletsListPerParallel = Lists.newArrayList();
+ List<List<List<Long>>> tabletsListPerParallel = Lists.newArrayList();
Integer realParallelism = this.parallelism;
if (tabletsAllNum < this.parallelism) {
realParallelism = tabletsAllNum;
@@ -543,15 +579,22 @@ public class ExportJob implements Writable {
+ "set parallelism to tablets num.", id,
tabletsAllNum, this.parallelism);
}
Integer start = 0;
- for (int i = 0; i < realParallelism; ++i) {
+ List<Long> flatTabletIdList =
tabletIdList.stream().flatMap(List::stream).collect(Collectors.toList());
+ for (int j = 0; j < realParallelism; ++j) {
Integer tabletsNum = tabletsNumPerParallel;
if (tabletsNumPerQueryRemainder > 0) {
tabletsNum = tabletsNum + 1;
--tabletsNumPerQueryRemainder;
}
- ArrayList<Long> tablets = new
ArrayList<>(tabletIdList.subList(start, start + tabletsNum));
- start += tabletsNum;
+ List<Long> tabletsList = new
ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum));
+ List<List<Long>> tablets = new ArrayList<>();
+ for (int i = 0; i < tabletsList.size(); i +=
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
+ int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT <
tabletsList.size()
+ ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT :
tabletsList.size();
+ tablets.add(new ArrayList<>(tabletsList.subList(i, end)));
+ }
+ start += tabletsNum;
tabletsListPerParallel.add(tablets);
}
return tabletsListPerParallel;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index efbfd33966f..f72c0b44a63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -355,6 +355,7 @@ public class ExportMgr {
infoMap.put("tablet_num", job.getTabletsNum());
infoMap.put("max_file_size", job.getMaxFileSize());
infoMap.put("delete_existing_files", job.getDeleteExistingFiles());
+ infoMap.put("data_consistency", job.getDataConsistency());
jobInfo.add(new Gson().toJson(infoMap));
// path
jobInfo.add(job.getExportPath());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
index 48f9b2ca1a0..f4ee84298b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -89,8 +89,8 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
if (isCanceled.get()) {
throw new JobException("Export executor has been canceled,
task id: {}", taskId);
}
- // check the version of tablets
- if (exportJob.getExportTable().getType() == TableType.OLAP) {
+ // check the version of tablets, skip if the consistency is in
partition level.
+ if (exportJob.getExportTable().getType() == TableType.OLAP &&
!exportJob.isPartitionConsistency()) {
try {
Database db =
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
exportJob.getTableName().getDb());
@@ -136,7 +136,6 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
}
try (AutoCloseConnectContext r = buildConnectContext()) {
-
StatementBase statementBase = selectStmtLists.get(idx);
OriginStatement originStatement = new OriginStatement(
StringUtils.isEmpty(statementBase.getOrigStmt().originStmt)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
index 713ff6b2760..e5484336491 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
@@ -73,6 +73,7 @@ import java.util.stream.Collectors;
public class ExportCommand extends Command implements ForwardWithSync {
public static final String PARALLELISM = "parallelism";
public static final String LABEL = "label";
+ public static final String DATA_CONSISTENCY = "data_consistency";
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_PARALLELISM = "1";
@@ -81,6 +82,7 @@ public class ExportCommand extends Command implements
ForwardWithSync {
private static final ImmutableSet<String> PROPERTIES_SET = new
ImmutableSet.Builder<String>()
.add(LABEL)
.add(PARALLELISM)
+ .add(DATA_CONSISTENCY)
.add(LoadStmt.KEY_IN_PARAM_COLUMNS)
.add(OutFileClause.PROP_MAX_FILE_SIZE)
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
@@ -310,6 +312,17 @@ public class ExportCommand extends Command implements
ForwardWithSync {
exportJob.setQualifiedUser(ctx.getQualifiedUser());
exportJob.setUserIdentity(ctx.getCurrentUserIdentity());
+ // set data consistency
+ String dataConsistencyStr = fileProperties.get(DATA_CONSISTENCY);
+ if (dataConsistencyStr != null) {
+ if
(!dataConsistencyStr.equalsIgnoreCase(ExportJob.CONSISTENT_PARTITION)) {
+ throw new AnalysisException("The value of data_consistency is
invalid, only partition is allowed!");
+ }
+ exportJob.setDataConsistency(ExportJob.CONSISTENT_PARTITION);
+ } else {
+ exportJob.setDataConsistency(ExportJob.CONSISTENT_ALL);
+ }
+
// Must copy session variable, because session variable may be changed
during export job running.
SessionVariable clonedSessionVariable =
VariableMgr.cloneSessionVariable(Optional.ofNullable(
ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable()));
diff --git a/regression-test/data/export_p0/test_export_data_consistency.out
b/regression-test/data/export_p0/test_export_data_consistency.out
new file mode 100644
index 00000000000..97135d2bb98
--- /dev/null
+++ b/regression-test/data/export_p0/test_export_data_consistency.out
@@ -0,0 +1,305 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_export --
+1 ftw-1 19
+2 ftw-2 20
+3 ftw-3 21
+4 ftw-4 22
+5 ftw-5 23
+6 ftw-6 24
+7 ftw-7 25
+8 ftw-8 26
+9 ftw-9 27
+10 ftw-10 28
+11 ftw-11 29
+12 ftw-12 30
+13 ftw-13 31
+14 ftw-14 32
+15 ftw-15 33
+16 ftw-16 34
+17 ftw-17 35
+18 ftw-18 36
+19 ftw-19 37
+20 ftw-20 38
+21 ftw-21 39
+22 ftw-22 40
+23 ftw-23 41
+24 ftw-24 42
+25 ftw-25 43
+26 ftw-26 44
+27 ftw-27 45
+28 ftw-28 46
+29 ftw-29 47
+30 ftw-30 48
+31 ftw-31 49
+32 ftw-32 50
+33 ftw-33 51
+34 ftw-34 52
+35 ftw-35 53
+36 ftw-36 54
+37 ftw-37 55
+38 ftw-38 56
+39 ftw-39 57
+40 ftw-40 58
+41 ftw-41 59
+42 ftw-42 60
+43 ftw-43 61
+44 ftw-44 62
+45 ftw-45 63
+46 ftw-46 64
+47 ftw-47 65
+48 ftw-48 66
+49 ftw-49 67
+50 ftw-50 68
+51 ftw-51 69
+52 ftw-52 70
+53 ftw-53 71
+54 ftw-54 72
+55 ftw-55 73
+56 ftw-56 74
+57 ftw-57 75
+58 ftw-58 76
+59 ftw-59 77
+60 ftw-60 78
+61 ftw-61 79
+62 ftw-62 80
+63 ftw-63 81
+64 ftw-64 82
+65 ftw-65 83
+66 ftw-66 84
+67 ftw-67 85
+68 ftw-68 86
+69 ftw-69 87
+70 ftw-70 88
+71 ftw-71 89
+72 ftw-72 90
+73 ftw-73 91
+74 ftw-74 92
+75 ftw-75 93
+76 ftw-76 94
+77 ftw-77 95
+78 ftw-78 96
+79 ftw-79 97
+80 ftw-80 98
+81 ftw-81 99
+82 ftw-82 100
+83 ftw-83 101
+84 ftw-84 102
+85 ftw-85 103
+86 ftw-86 104
+87 ftw-87 105
+88 ftw-88 106
+89 ftw-89 107
+90 ftw-90 108
+91 ftw-91 109
+92 ftw-92 110
+93 ftw-93 111
+94 ftw-94 112
+95 ftw-95 113
+96 ftw-96 114
+97 ftw-97 115
+98 ftw-98 116
+99 ftw-99 117
+100 ftw-100 118
+101 ftw-101 119
+102 ftw-102 120
+103 ftw-103 121
+104 ftw-104 122
+105 ftw-105 123
+106 ftw-106 124
+107 ftw-107 125
+108 ftw-108 126
+109 ftw-109 127
+110 ftw-110 128
+111 ftw-111 129
+112 ftw-112 130
+113 ftw-113 131
+114 ftw-114 132
+115 ftw-115 133
+116 ftw-116 134
+117 ftw-117 135
+118 ftw-118 136
+119 ftw-119 137
+120 ftw-120 138
+121 ftw-121 139
+122 ftw-122 140
+123 ftw-123 141
+124 ftw-124 142
+125 ftw-125 143
+126 ftw-126 144
+127 ftw-127 145
+128 ftw-128 146
+129 ftw-129 147
+130 ftw-130 148
+131 ftw-131 149
+132 ftw-132 150
+133 ftw-133 151
+134 ftw-134 152
+135 ftw-135 153
+136 ftw-136 154
+137 ftw-137 155
+138 ftw-138 156
+139 ftw-139 157
+140 ftw-140 158
+141 ftw-141 159
+142 ftw-142 160
+143 ftw-143 161
+144 ftw-144 162
+145 ftw-145 163
+146 ftw-146 164
+147 ftw-147 165
+148 ftw-148 166
+149 ftw-149 167
+150 \N \N
+
+-- !select_load1 --
+1 ftw-1 19
+2 ftw-2 20
+3 ftw-3 21
+4 ftw-4 22
+5 ftw-5 23
+6 ftw-6 24
+7 ftw-7 25
+8 ftw-8 26
+9 ftw-9 27
+10 ftw-10 28
+11 ftw-11 29
+12 ftw-12 30
+13 ftw-13 31
+14 ftw-14 32
+15 ftw-15 33
+16 ftw-16 34
+17 ftw-17 35
+18 ftw-18 36
+19 ftw-19 37
+20 ftw-20 38
+21 ftw-21 39
+22 ftw-22 40
+23 ftw-23 41
+24 ftw-24 42
+25 ftw-25 43
+26 ftw-26 44
+27 ftw-27 45
+28 ftw-28 46
+29 ftw-29 47
+30 ftw-30 48
+31 ftw-31 49
+32 ftw-32 50
+33 ftw-33 51
+34 ftw-34 52
+35 ftw-35 53
+36 ftw-36 54
+37 ftw-37 55
+38 ftw-38 56
+39 ftw-39 57
+40 ftw-40 58
+41 ftw-41 59
+42 ftw-42 60
+43 ftw-43 61
+44 ftw-44 62
+45 ftw-45 63
+46 ftw-46 64
+47 ftw-47 65
+48 ftw-48 66
+49 ftw-49 67
+50 ftw-50 68
+51 ftw-51 69
+52 ftw-52 70
+53 ftw-53 71
+54 ftw-54 72
+55 ftw-55 73
+56 ftw-56 74
+57 ftw-57 75
+58 ftw-58 76
+59 ftw-59 77
+60 ftw-60 78
+61 ftw-61 79
+62 ftw-62 80
+63 ftw-63 81
+64 ftw-64 82
+65 ftw-65 83
+66 ftw-66 84
+67 ftw-67 85
+68 ftw-68 86
+69 ftw-69 87
+70 ftw-70 88
+71 ftw-71 89
+72 ftw-72 90
+73 ftw-73 91
+74 ftw-74 92
+75 ftw-75 93
+76 ftw-76 94
+77 ftw-77 95
+78 ftw-78 96
+79 ftw-79 97
+80 ftw-80 98
+81 ftw-81 99
+82 ftw-82 100
+83 ftw-83 101
+84 ftw-84 102
+85 ftw-85 103
+86 ftw-86 104
+87 ftw-87 105
+88 ftw-88 106
+89 ftw-89 107
+90 ftw-90 108
+91 ftw-91 109
+92 ftw-92 110
+93 ftw-93 111
+94 ftw-94 112
+95 ftw-95 113
+96 ftw-96 114
+97 ftw-97 115
+98 ftw-98 116
+99 ftw-99 117
+100 ftw-100 118
+101 ftw-101 119
+102 ftw-102 120
+103 ftw-103 121
+104 ftw-104 122
+105 ftw-105 123
+106 ftw-106 124
+107 ftw-107 125
+108 ftw-108 126
+109 ftw-109 127
+110 ftw-110 128
+111 ftw-111 129
+112 ftw-112 130
+113 ftw-113 131
+114 ftw-114 132
+115 ftw-115 133
+116 ftw-116 134
+117 ftw-117 135
+118 ftw-118 136
+119 ftw-119 137
+120 ftw-120 138
+121 ftw-121 139
+122 ftw-122 140
+123 ftw-123 141
+124 ftw-124 142
+125 ftw-125 143
+126 ftw-126 144
+127 ftw-127 145
+128 ftw-128 146
+129 ftw-129 147
+130 ftw-130 148
+131 ftw-131 149
+132 ftw-132 150
+133 ftw-133 151
+134 ftw-134 152
+135 ftw-135 153
+136 ftw-136 154
+137 ftw-137 155
+138 ftw-138 156
+139 ftw-139 157
+140 ftw-140 158
+141 ftw-141 159
+142 ftw-142 160
+143 ftw-143 161
+144 ftw-144 162
+145 ftw-145 163
+146 ftw-146 164
+147 ftw-147 165
+148 ftw-148 166
+149 ftw-149 167
+150 \N \N
+
diff --git
a/regression-test/suites/export_p0/test_export_data_consistency.groovy
b/regression-test/suites/export_p0/test_export_data_consistency.groovy
new file mode 100644
index 00000000000..87d2eb4a7b7
--- /dev/null
+++ b/regression-test/suites/export_p0/test_export_data_consistency.groovy
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_export_data_consistency", "p0") {
+ // open nereids
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_fallback_to_original_planner=false """
+
+ def db = "regression_test_export_p0"
+
+ // check whether the FE config 'enable_outfile_to_local' is true
+ StringBuilder strBuilder = new StringBuilder()
+ strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser
+ ":" + context.config.jdbcPassword)
+ strBuilder.append(" http://" + context.config.feHttpAddress +
"/rest/v1/config/fe")
+
+ String command = strBuilder.toString()
+ def process = command.toString().execute()
+ def code = process.waitFor()
+ def err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())));
+ def out = process.getText()
+ logger.info("Request FE Config: code=" + code + ", out=" + out + ", err="
+ err)
+ assertEquals(code, 0)
+ def response = parseJson(out.trim())
+ assertEquals(response.code, 0)
+ assertEquals(response.msg, "success")
+ def configJson = response.data.rows
+ boolean enableOutfileToLocal = false
+ for (Object conf: configJson) {
+ assert conf instanceof Map
+ if (((Map<String, String>) conf).get("Name").toLowerCase() ==
"enable_outfile_to_local") {
+ enableOutfileToLocal = ((Map<String, String>)
conf).get("Value").toLowerCase() == "true"
+ }
+ }
+ if (!enableOutfileToLocal) {
+ logger.warn("Please set enable_outfile_to_local to true to run
test_outfile")
+ return
+ }
+
+ def table_export_name = "test_export_data_consistency"
+ def table_load_name = "test_load_data_consistency"
+ def outfile_path_prefix = """/tmp/test_export_data_consistency"""
+
+ // create table and insert
+ sql """ DROP TABLE IF EXISTS ${table_export_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_export_name} (
+ `id` int(11) NULL,
+ `name` string NULL,
+ `age` int(11) NULL
+ )
+ PARTITION BY RANGE(id)
+ (
+ PARTITION less_than_20 VALUES LESS THAN ("20"),
+ PARTITION between_20_70 VALUES [("20"),("70")),
+ PARTITION more_than_70 VALUES LESS THAN ("151")
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 3
+ PROPERTIES("replication_num" = "1");
+ """
+ StringBuilder sb = new StringBuilder()
+ int i = 1
+ for (; i < 150; i ++) {
+ sb.append("""
+ (${i}, 'ftw-${i}', ${i + 18}),
+ """)
+ }
+ sb.append("""
+ (${i}, NULL, NULL)
+ """)
+ sql """ INSERT INTO ${table_export_name} VALUES
+ ${sb.toString()}
+ """
+ def insert_res = sql "show last insert;"
+ logger.info("insert result: " + insert_res.toString())
+ qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """
+
+
+ def check_path_exists = { dir_path ->
+ File path = new File(dir_path)
+ if (!path.exists()) {
+ assert path.mkdirs()
+ } else {
+ throw new IllegalStateException("""${dir_path} already exists! """)
+ }
+ }
+
+ def check_file_amounts = { dir_path, amount ->
+ File path = new File(dir_path)
+ File[] files = path.listFiles()
+ assert files.length == amount
+ }
+
+ def delete_files = { dir_path ->
+ File path = new File(dir_path)
+ if (path.exists()) {
+ for (File f: path.listFiles()) {
+ f.delete();
+ }
+ path.delete();
+ }
+ }
+
+ def waiting_export = { the_db, export_label ->
+ while (true) {
+ def res = sql """ show export from ${the_db} where label =
"${export_label}" """
+ logger.info("export state: " + res[0][2])
+ if (res[0][2] == "FINISHED") {
+ break;
+ } else if (res[0][2] == "CANCELLED") {
+ throw new IllegalStateException("""export failed:
${res[0][10]}""")
+ } else {
+ sleep(5000)
+ }
+ }
+ }
+
+ // 1. basic test
+ def uuid = UUID.randomUUID().toString()
+ def outFilePath = """${outfile_path_prefix}_${uuid}"""
+ def label = "label_${uuid}"
+ try {
+ // check export path
+ check_path_exists.call("${outFilePath}")
+
+ // exec export
+ sql """
+ EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
+ PROPERTIES(
+ "label" = "${label}",
+ "format" = "csv",
+ "column_separator" = ",",
+ "data_consistency" = "partition"
+ );
+ """
+ // do insert in parallel
+ sql """INSERT INTO ${table_export_name} VALUES
+ (10, 'test', 11),
+ (20, 'test', 21),
+ (40, 'test', 51),
+ (80, 'test', 51)
+ """
+
+ // wait export
+ waiting_export.call(db, label)
+
+ // check file amounts
+ check_file_amounts.call("${outFilePath}", 3)
+
+ // check data correctness
+ sql """ DROP TABLE IF EXISTS ${table_load_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_load_name} (
+ `id` int(11) NULL,
+ `name` string NULL,
+ `age` int(11) NULL
+ )
+ DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ """
+
+ File[] files = new File("${outFilePath}").listFiles()
+ for (exportLoadFile in files) {
+ String file_path = exportLoadFile.getAbsolutePath()
+ streamLoad {
+ table "${table_load_name}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, age'
+ set 'strict_mode', 'true'
+
+ file "${file_path}"
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ }
+
+ qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${table_load_name}")
+ delete_files.call("${outFilePath}")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]