This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bf2c9a545a8 [improve](export) Support partition data consistency 
(#31290)
bf2c9a545a8 is described below

commit bf2c9a545a81f146834b84972e4e4e6a95a71870
Author: walter <[email protected]>
AuthorDate: Thu Feb 29 22:08:29 2024 +0800

    [improve](export) Support partition data consistency (#31290)
---
 .../java/org/apache/doris/analysis/ExportStmt.java |  15 +
 .../main/java/org/apache/doris/load/ExportJob.java |  87 ++++--
 .../main/java/org/apache/doris/load/ExportMgr.java |   1 +
 .../org/apache/doris/load/ExportTaskExecutor.java  |   5 +-
 .../trees/plans/commands/ExportCommand.java        |  13 +
 .../export_p0/test_export_data_consistency.out     | 305 +++++++++++++++++++++
 .../export_p0/test_export_data_consistency.groovy  | 210 ++++++++++++++
 7 files changed, 611 insertions(+), 25 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 123bfa727fb..1a7d9dc93b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -63,6 +63,7 @@ import java.util.stream.Collectors;
 public class ExportStmt extends StatementBase {
     public static final String PARALLELISM = "parallelism";
     public static final String LABEL = "label";
+    public static final String DATA_CONSISTENCY = "data_consistency";
 
     private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
     private static final String DEFAULT_LINE_DELIMITER = "\n";
@@ -72,6 +73,7 @@ public class ExportStmt extends StatementBase {
     private static final ImmutableSet<String> PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
             .add(LABEL)
             .add(PARALLELISM)
+            .add(DATA_CONSISTENCY)
             .add(LoadStmt.KEY_IN_PARAM_COLUMNS)
             .add(OutFileClause.PROP_MAX_FILE_SIZE)
             .add(OutFileClause.PROP_DELETE_EXISTING_FILES)
@@ -104,6 +106,7 @@ public class ExportStmt extends StatementBase {
     private String maxFileSize;
     private String deleteExistingFiles;
     private String withBom;
+    private String dataConsistency;
     private SessionVariable sessionVariables;
 
     private String qualifiedUser;
@@ -230,6 +233,7 @@ public class ExportStmt extends StatementBase {
         exportJob.setMaxFileSize(this.maxFileSize);
         exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
         exportJob.setWithBom(this.withBom);
+        exportJob.setDataConsistency(this.dataConsistency);
 
         if (columns != null) {
             Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
@@ -359,6 +363,17 @@ public class ExportStmt extends StatementBase {
 
         // with bom
         this.withBom = properties.getOrDefault(OutFileClause.PROP_WITH_BOM, 
"false");
+
+        // data consistency
+        String dataConsistencyStr = properties.get(DATA_CONSISTENCY);
+        if (dataConsistencyStr != null) {
+            if 
(!dataConsistencyStr.equalsIgnoreCase(ExportJob.CONSISTENT_PARTITION)) {
+                throw new UserException("The value of data_consistency is 
invalid, only `partition` is allowed");
+            }
+            this.dataConsistency = ExportJob.CONSISTENT_PARTITION;
+        } else {
+            this.dataConsistency = ExportJob.CONSISTENT_ALL;
+        }
     }
 
     private void checkColumns() throws DdlException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 9fb827f6d6a..fe28151a546 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -99,6 +99,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 @Data
 public class ExportJob implements Writable {
@@ -108,6 +109,9 @@ public class ExportJob implements Writable {
 
     private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT = 
Config.maximum_tablets_of_outfile_in_export;
 
+    public static final String CONSISTENT_ALL = "all";
+    public static final String CONSISTENT_PARTITION = "partition";
+
     @SerializedName("id")
     private long id;
     @SerializedName("label")
@@ -168,6 +172,8 @@ public class ExportJob implements Writable {
     private Integer tabletsNum;
     @SerializedName("withBom")
     private String withBom;
+    @SerializedName("dataConsistency")
+    private String dataConsistency;
 
     private TableRef tableRef;
 
@@ -222,6 +228,7 @@ public class ExportJob implements Writable {
         this.lineDelimiter = "\n";
         this.columns = "";
         this.withBom = "false";
+        this.dataConsistency = "all";
     }
 
     public ExportJob(long jobId) {
@@ -229,6 +236,10 @@ public class ExportJob implements Writable {
         this.id = jobId;
     }
 
+    public boolean isPartitionConsistency() {
+        return dataConsistency != null && 
dataConsistency.equals(CONSISTENT_PARTITION);
+    }
+
     public void generateOutfileStatement() throws UserException {
         exportTable.readLock();
         try {
@@ -302,16 +313,12 @@ public class ExportJob implements Writable {
         }
 
         // get all tablets
-        List<List<Long>> tabletsListPerParallel = splitTablets();
+        List<List<List<Long>>> tabletsListPerParallel = splitTablets();
 
         // Each Outfile clause responsible for 
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT tablets
-        for (List<Long> tabletsList : tabletsListPerParallel) {
+        for (List<List<Long>> tabletsList : tabletsListPerParallel) {
             List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
-            for (int i = 0; i < tabletsList.size(); i += 
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
-                int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < 
tabletsList.size()
-                        ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : 
tabletsList.size();
-                List<Long> tabletIds = new ArrayList<>(tabletsList.subList(i, 
end));
-
+            for (List<Long> tabletIds : tabletsList) {
                 // generate LogicalPlan
                 LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName, 
tabletIds,
                         this.partitionNames, selectLists);
@@ -471,15 +478,12 @@ public class ExportJob implements Writable {
     }
 
     private List<List<TableRef>> getTableRefListPerParallel() throws 
UserException {
-        List<List<Long>> tabletsListPerParallel = splitTablets();
+        List<List<List<Long>>> tabletsListPerParallel = splitTablets();
 
         List<List<TableRef>> tableRefListPerParallel = Lists.newArrayList();
-        for (List<Long> tabletsList : tabletsListPerParallel) {
+        for (List<List<Long>> tabletsList : tabletsListPerParallel) {
             List<TableRef> tableRefList = Lists.newArrayList();
-            for (int i = 0; i < tabletsList.size(); i += 
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
-                int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < 
tabletsList.size()
-                        ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : 
tabletsList.size();
-                List<Long> tablets = new ArrayList<>(tabletsList.subList(i, 
end));
+            for (List<Long> tablets : tabletsList) {
                 // Since export does not support the alias, here we pass the 
null value.
                 // we can not use this.tableRef.getAlias(),
                 // because the constructor of `Tableref` will convert 
this.tableRef.getAlias()
@@ -494,11 +498,13 @@ public class ExportJob implements Writable {
         return tableRefListPerParallel;
     }
 
-    private List<List<Long>> splitTablets() throws UserException {
+    private List<List<List<Long>>> splitTablets() throws UserException {
         // get tablets
         Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb());
         OlapTable table = 
db.getOlapTableOrAnalysisException(this.tableName.getTbl());
-        List<Long> tabletIdList = Lists.newArrayList();
+
+        Integer tabletsAllNum = 0;
+        List<List<Long>> tabletIdList = Lists.newArrayList();
         table.readLock();
         try {
             final Collection<Partition> partitions = new 
ArrayList<Partition>();
@@ -516,26 +522,56 @@ public class ExportJob implements Writable {
 
             // get tablets
             for (Partition partition : partitions) {
-                partitionToVersion.put(partition.getName(), 
partition.getVisibleVersion());
+                // Partition data consistency is not need to verify partition 
version.
+                if (!isPartitionConsistency()) {
+                    partitionToVersion.put(partition.getName(), 
partition.getVisibleVersion());
+                }
                 for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                    tabletIdList.addAll(index.getTabletIdsInOrder());
+                    List<Long> tablets = index.getTabletIdsInOrder();
+                    tabletsAllNum += tablets.size();
+                    tabletIdList.add(tablets);
                 }
             }
         } finally {
             table.readUnlock();
         }
 
+        if (isPartitionConsistency()) {
+            // Assign tablets of a partition to per parallel.
+            int totalPartitions = tabletIdList.size();
+            int numPerParallel = totalPartitions / this.parallelism;
+            int numPerQueryRemainder = totalPartitions - numPerParallel * 
this.parallelism;
+            int realParallelism = this.parallelism;
+            if (totalPartitions < this.parallelism) {
+                realParallelism = totalPartitions;
+                LOG.warn("Export Job [{}]: The number of partitions ({}) is 
smaller than parallelism ({}), "
+                            + "set parallelism to partition num.", id, 
totalPartitions, this.parallelism);
+            }
+            int start = 0;
+            List<List<List<Long>>> tabletsListPerParallel = new ArrayList<>();
+            for (int i = 0; i < realParallelism; ++i) {
+                int partitionNum = numPerParallel;
+                if (numPerQueryRemainder > 0) {
+                    partitionNum += 1;
+                    --numPerQueryRemainder;
+                }
+                List<List<Long>> tablets = new 
ArrayList<>(tabletIdList.subList(start, start + partitionNum));
+                start += partitionNum;
+                tabletsListPerParallel.add(tablets);
+            }
+            return tabletsListPerParallel;
+        }
+
         /**
          * Assign tablets to per parallel, for example:
          * If the number of all tablets if 10, and the real parallelism is 4,
          * then, the number of tablets of per parallel should be: 3 3 2 2.
          */
-        Integer tabletsAllNum = tabletIdList.size();
         tabletsNum = tabletsAllNum;
         Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism;
         Integer tabletsNumPerQueryRemainder = tabletsAllNum - 
tabletsNumPerParallel * this.parallelism;
 
-        List<List<Long>> tabletsListPerParallel = Lists.newArrayList();
+        List<List<List<Long>>> tabletsListPerParallel = Lists.newArrayList();
         Integer realParallelism = this.parallelism;
         if (tabletsAllNum < this.parallelism) {
             realParallelism = tabletsAllNum;
@@ -543,15 +579,22 @@ public class ExportJob implements Writable {
                         + "set parallelism to tablets num.", id, 
tabletsAllNum, this.parallelism);
         }
         Integer start = 0;
-        for (int i = 0; i < realParallelism; ++i) {
+        List<Long> flatTabletIdList = 
tabletIdList.stream().flatMap(List::stream).collect(Collectors.toList());
+        for (int j = 0; j < realParallelism; ++j) {
             Integer tabletsNum = tabletsNumPerParallel;
             if (tabletsNumPerQueryRemainder > 0) {
                 tabletsNum = tabletsNum + 1;
                 --tabletsNumPerQueryRemainder;
             }
-            ArrayList<Long> tablets = new 
ArrayList<>(tabletIdList.subList(start, start + tabletsNum));
-            start += tabletsNum;
+            List<Long> tabletsList = new 
ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum));
+            List<List<Long>> tablets = new ArrayList<>();
+            for (int i = 0; i < tabletsList.size(); i += 
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
+                int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < 
tabletsList.size()
+                        ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : 
tabletsList.size();
+                tablets.add(new ArrayList<>(tabletsList.subList(i, end)));
+            }
 
+            start += tabletsNum;
             tabletsListPerParallel.add(tablets);
         }
         return tabletsListPerParallel;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index efbfd33966f..f72c0b44a63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -355,6 +355,7 @@ public class ExportMgr {
         infoMap.put("tablet_num", job.getTabletsNum());
         infoMap.put("max_file_size", job.getMaxFileSize());
         infoMap.put("delete_existing_files", job.getDeleteExistingFiles());
+        infoMap.put("data_consistency", job.getDataConsistency());
         jobInfo.add(new Gson().toJson(infoMap));
         // path
         jobInfo.add(job.getExportPath());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
index 48f9b2ca1a0..f4ee84298b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -89,8 +89,8 @@ public class ExportTaskExecutor implements 
TransientTaskExecutor {
             if (isCanceled.get()) {
                 throw new JobException("Export executor has been canceled, 
task id: {}", taskId);
             }
-            // check the version of tablets
-            if (exportJob.getExportTable().getType() == TableType.OLAP) {
+            // check the version of tablets, skip if the consistency is in 
partition level.
+            if (exportJob.getExportTable().getType() == TableType.OLAP && 
!exportJob.isPartitionConsistency()) {
                 try {
                     Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
                             exportJob.getTableName().getDb());
@@ -136,7 +136,6 @@ public class ExportTaskExecutor implements 
TransientTaskExecutor {
             }
 
             try (AutoCloseConnectContext r = buildConnectContext()) {
-
                 StatementBase statementBase = selectStmtLists.get(idx);
                 OriginStatement originStatement = new OriginStatement(
                         
StringUtils.isEmpty(statementBase.getOrigStmt().originStmt)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
index 713ff6b2760..e5484336491 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
@@ -73,6 +73,7 @@ import java.util.stream.Collectors;
 public class ExportCommand extends Command implements ForwardWithSync {
     public static final String PARALLELISM = "parallelism";
     public static final String LABEL = "label";
+    public static final String DATA_CONSISTENCY = "data_consistency";
     private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
     private static final String DEFAULT_LINE_DELIMITER = "\n";
     private static final String DEFAULT_PARALLELISM = "1";
@@ -81,6 +82,7 @@ public class ExportCommand extends Command implements 
ForwardWithSync {
     private static final ImmutableSet<String> PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
             .add(LABEL)
             .add(PARALLELISM)
+            .add(DATA_CONSISTENCY)
             .add(LoadStmt.KEY_IN_PARAM_COLUMNS)
             .add(OutFileClause.PROP_MAX_FILE_SIZE)
             .add(OutFileClause.PROP_DELETE_EXISTING_FILES)
@@ -310,6 +312,17 @@ public class ExportCommand extends Command implements 
ForwardWithSync {
         exportJob.setQualifiedUser(ctx.getQualifiedUser());
         exportJob.setUserIdentity(ctx.getCurrentUserIdentity());
 
+        // set data consistency
+        String dataConsistencyStr = fileProperties.get(DATA_CONSISTENCY);
+        if (dataConsistencyStr != null) {
+            if 
(!dataConsistencyStr.equalsIgnoreCase(ExportJob.CONSISTENT_PARTITION)) {
+                throw new AnalysisException("The value of data_consistency is 
invalid, only partition is allowed!");
+            }
+            exportJob.setDataConsistency(ExportJob.CONSISTENT_PARTITION);
+        } else {
+            exportJob.setDataConsistency(ExportJob.CONSISTENT_ALL);
+        }
+
         // Must copy session variable, because session variable may be changed 
during export job running.
         SessionVariable clonedSessionVariable = 
VariableMgr.cloneSessionVariable(Optional.ofNullable(
                 
ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable()));
diff --git a/regression-test/data/export_p0/test_export_data_consistency.out 
b/regression-test/data/export_p0/test_export_data_consistency.out
new file mode 100644
index 00000000000..97135d2bb98
--- /dev/null
+++ b/regression-test/data/export_p0/test_export_data_consistency.out
@@ -0,0 +1,305 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_export --
+1      ftw-1   19
+2      ftw-2   20
+3      ftw-3   21
+4      ftw-4   22
+5      ftw-5   23
+6      ftw-6   24
+7      ftw-7   25
+8      ftw-8   26
+9      ftw-9   27
+10     ftw-10  28
+11     ftw-11  29
+12     ftw-12  30
+13     ftw-13  31
+14     ftw-14  32
+15     ftw-15  33
+16     ftw-16  34
+17     ftw-17  35
+18     ftw-18  36
+19     ftw-19  37
+20     ftw-20  38
+21     ftw-21  39
+22     ftw-22  40
+23     ftw-23  41
+24     ftw-24  42
+25     ftw-25  43
+26     ftw-26  44
+27     ftw-27  45
+28     ftw-28  46
+29     ftw-29  47
+30     ftw-30  48
+31     ftw-31  49
+32     ftw-32  50
+33     ftw-33  51
+34     ftw-34  52
+35     ftw-35  53
+36     ftw-36  54
+37     ftw-37  55
+38     ftw-38  56
+39     ftw-39  57
+40     ftw-40  58
+41     ftw-41  59
+42     ftw-42  60
+43     ftw-43  61
+44     ftw-44  62
+45     ftw-45  63
+46     ftw-46  64
+47     ftw-47  65
+48     ftw-48  66
+49     ftw-49  67
+50     ftw-50  68
+51     ftw-51  69
+52     ftw-52  70
+53     ftw-53  71
+54     ftw-54  72
+55     ftw-55  73
+56     ftw-56  74
+57     ftw-57  75
+58     ftw-58  76
+59     ftw-59  77
+60     ftw-60  78
+61     ftw-61  79
+62     ftw-62  80
+63     ftw-63  81
+64     ftw-64  82
+65     ftw-65  83
+66     ftw-66  84
+67     ftw-67  85
+68     ftw-68  86
+69     ftw-69  87
+70     ftw-70  88
+71     ftw-71  89
+72     ftw-72  90
+73     ftw-73  91
+74     ftw-74  92
+75     ftw-75  93
+76     ftw-76  94
+77     ftw-77  95
+78     ftw-78  96
+79     ftw-79  97
+80     ftw-80  98
+81     ftw-81  99
+82     ftw-82  100
+83     ftw-83  101
+84     ftw-84  102
+85     ftw-85  103
+86     ftw-86  104
+87     ftw-87  105
+88     ftw-88  106
+89     ftw-89  107
+90     ftw-90  108
+91     ftw-91  109
+92     ftw-92  110
+93     ftw-93  111
+94     ftw-94  112
+95     ftw-95  113
+96     ftw-96  114
+97     ftw-97  115
+98     ftw-98  116
+99     ftw-99  117
+100    ftw-100 118
+101    ftw-101 119
+102    ftw-102 120
+103    ftw-103 121
+104    ftw-104 122
+105    ftw-105 123
+106    ftw-106 124
+107    ftw-107 125
+108    ftw-108 126
+109    ftw-109 127
+110    ftw-110 128
+111    ftw-111 129
+112    ftw-112 130
+113    ftw-113 131
+114    ftw-114 132
+115    ftw-115 133
+116    ftw-116 134
+117    ftw-117 135
+118    ftw-118 136
+119    ftw-119 137
+120    ftw-120 138
+121    ftw-121 139
+122    ftw-122 140
+123    ftw-123 141
+124    ftw-124 142
+125    ftw-125 143
+126    ftw-126 144
+127    ftw-127 145
+128    ftw-128 146
+129    ftw-129 147
+130    ftw-130 148
+131    ftw-131 149
+132    ftw-132 150
+133    ftw-133 151
+134    ftw-134 152
+135    ftw-135 153
+136    ftw-136 154
+137    ftw-137 155
+138    ftw-138 156
+139    ftw-139 157
+140    ftw-140 158
+141    ftw-141 159
+142    ftw-142 160
+143    ftw-143 161
+144    ftw-144 162
+145    ftw-145 163
+146    ftw-146 164
+147    ftw-147 165
+148    ftw-148 166
+149    ftw-149 167
+150    \N      \N
+
+-- !select_load1 --
+1      ftw-1   19
+2      ftw-2   20
+3      ftw-3   21
+4      ftw-4   22
+5      ftw-5   23
+6      ftw-6   24
+7      ftw-7   25
+8      ftw-8   26
+9      ftw-9   27
+10     ftw-10  28
+11     ftw-11  29
+12     ftw-12  30
+13     ftw-13  31
+14     ftw-14  32
+15     ftw-15  33
+16     ftw-16  34
+17     ftw-17  35
+18     ftw-18  36
+19     ftw-19  37
+20     ftw-20  38
+21     ftw-21  39
+22     ftw-22  40
+23     ftw-23  41
+24     ftw-24  42
+25     ftw-25  43
+26     ftw-26  44
+27     ftw-27  45
+28     ftw-28  46
+29     ftw-29  47
+30     ftw-30  48
+31     ftw-31  49
+32     ftw-32  50
+33     ftw-33  51
+34     ftw-34  52
+35     ftw-35  53
+36     ftw-36  54
+37     ftw-37  55
+38     ftw-38  56
+39     ftw-39  57
+40     ftw-40  58
+41     ftw-41  59
+42     ftw-42  60
+43     ftw-43  61
+44     ftw-44  62
+45     ftw-45  63
+46     ftw-46  64
+47     ftw-47  65
+48     ftw-48  66
+49     ftw-49  67
+50     ftw-50  68
+51     ftw-51  69
+52     ftw-52  70
+53     ftw-53  71
+54     ftw-54  72
+55     ftw-55  73
+56     ftw-56  74
+57     ftw-57  75
+58     ftw-58  76
+59     ftw-59  77
+60     ftw-60  78
+61     ftw-61  79
+62     ftw-62  80
+63     ftw-63  81
+64     ftw-64  82
+65     ftw-65  83
+66     ftw-66  84
+67     ftw-67  85
+68     ftw-68  86
+69     ftw-69  87
+70     ftw-70  88
+71     ftw-71  89
+72     ftw-72  90
+73     ftw-73  91
+74     ftw-74  92
+75     ftw-75  93
+76     ftw-76  94
+77     ftw-77  95
+78     ftw-78  96
+79     ftw-79  97
+80     ftw-80  98
+81     ftw-81  99
+82     ftw-82  100
+83     ftw-83  101
+84     ftw-84  102
+85     ftw-85  103
+86     ftw-86  104
+87     ftw-87  105
+88     ftw-88  106
+89     ftw-89  107
+90     ftw-90  108
+91     ftw-91  109
+92     ftw-92  110
+93     ftw-93  111
+94     ftw-94  112
+95     ftw-95  113
+96     ftw-96  114
+97     ftw-97  115
+98     ftw-98  116
+99     ftw-99  117
+100    ftw-100 118
+101    ftw-101 119
+102    ftw-102 120
+103    ftw-103 121
+104    ftw-104 122
+105    ftw-105 123
+106    ftw-106 124
+107    ftw-107 125
+108    ftw-108 126
+109    ftw-109 127
+110    ftw-110 128
+111    ftw-111 129
+112    ftw-112 130
+113    ftw-113 131
+114    ftw-114 132
+115    ftw-115 133
+116    ftw-116 134
+117    ftw-117 135
+118    ftw-118 136
+119    ftw-119 137
+120    ftw-120 138
+121    ftw-121 139
+122    ftw-122 140
+123    ftw-123 141
+124    ftw-124 142
+125    ftw-125 143
+126    ftw-126 144
+127    ftw-127 145
+128    ftw-128 146
+129    ftw-129 147
+130    ftw-130 148
+131    ftw-131 149
+132    ftw-132 150
+133    ftw-133 151
+134    ftw-134 152
+135    ftw-135 153
+136    ftw-136 154
+137    ftw-137 155
+138    ftw-138 156
+139    ftw-139 157
+140    ftw-140 158
+141    ftw-141 159
+142    ftw-142 160
+143    ftw-143 161
+144    ftw-144 162
+145    ftw-145 163
+146    ftw-146 164
+147    ftw-147 165
+148    ftw-148 166
+149    ftw-149 167
+150    \N      \N
+
diff --git 
a/regression-test/suites/export_p0/test_export_data_consistency.groovy 
b/regression-test/suites/export_p0/test_export_data_consistency.groovy
new file mode 100644
index 00000000000..87d2eb4a7b7
--- /dev/null
+++ b/regression-test/suites/export_p0/test_export_data_consistency.groovy
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_export_data_consistency", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    def db = "regression_test_export_p0"
+
+    // check whether the FE config 'enable_outfile_to_local' is true
+    StringBuilder strBuilder = new StringBuilder()
+    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe")
+
+    String command = strBuilder.toString()
+    def process = command.toString().execute()
+    def code = process.waitFor()
+    def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+    def out = process.getText()
+    logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" 
+ err)
+    assertEquals(code, 0)
+    def response = parseJson(out.trim())
+    assertEquals(response.code, 0)
+    assertEquals(response.msg, "success")
+    def configJson = response.data.rows
+    boolean enableOutfileToLocal = false
+    for (Object conf: configJson) {
+        assert conf instanceof Map
+        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"enable_outfile_to_local") {
+            enableOutfileToLocal = ((Map<String, String>) 
conf).get("Value").toLowerCase() == "true"
+        }
+    }
+    if (!enableOutfileToLocal) {
+        logger.warn("Please set enable_outfile_to_local to true to run 
test_outfile")
+        return
+    }
+
+    def table_export_name = "test_export_data_consistency"
+    def table_load_name = "test_load_data_consistency"
+    def outfile_path_prefix = """/tmp/test_export_data_consistency"""
+
+    // create table and insert
+    sql """ DROP TABLE IF EXISTS ${table_export_name} """
+    sql """
+    CREATE TABLE IF NOT EXISTS ${table_export_name} (
+        `id` int(11) NULL,
+        `name` string NULL,
+        `age` int(11) NULL
+        )
+        PARTITION BY RANGE(id)
+        (
+            PARTITION less_than_20 VALUES LESS THAN ("20"),
+            PARTITION between_20_70 VALUES [("20"),("70")),
+            PARTITION more_than_70 VALUES LESS THAN ("151")
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 3
+        PROPERTIES("replication_num" = "1");
+    """
+    StringBuilder sb = new StringBuilder()
+    int i = 1
+    for (; i < 150; i ++) {
+        sb.append("""
+            (${i}, 'ftw-${i}', ${i + 18}),
+        """)
+    }
+    sb.append("""
+            (${i}, NULL, NULL)
+        """)
+    sql """ INSERT INTO ${table_export_name} VALUES
+            ${sb.toString()}
+        """
+    def insert_res = sql "show last insert;"
+    logger.info("insert result: " + insert_res.toString())
+    qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """
+
+
+    def check_path_exists = { dir_path ->
+        File path = new File(dir_path)
+        if (!path.exists()) {
+            assert path.mkdirs()
+        } else {
+            throw new IllegalStateException("""${dir_path} already exists! """)
+        }
+    }
+
+    def check_file_amounts = { dir_path, amount ->
+        File path = new File(dir_path)
+        File[] files = path.listFiles()
+        assert files.length == amount
+    }
+
+    def delete_files = { dir_path ->
+        File path = new File(dir_path)
+        if (path.exists()) {
+            for (File f: path.listFiles()) {
+                f.delete();
+            }
+            path.delete();
+        }
+    }
+
+    def waiting_export = { the_db, export_label ->
+        while (true) {
+            def res = sql """ show export from ${the_db} where label = 
"${export_label}" """
+            logger.info("export state: " + res[0][2])
+            if (res[0][2] == "FINISHED") {
+                break;
+            } else if (res[0][2] == "CANCELLED") {
+                throw new IllegalStateException("""export failed: 
${res[0][10]}""")
+            } else {
+                sleep(5000)
+            }
+        }
+    }
+
+    // 1. basic test
+    def uuid = UUID.randomUUID().toString()
+    def outFilePath = """${outfile_path_prefix}_${uuid}"""
+    def label = "label_${uuid}"
+    try {
+        // check export path
+        check_path_exists.call("${outFilePath}")
+
+        // exec export
+        sql """
+            EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
+            PROPERTIES(
+                "label" = "${label}",
+                "format" = "csv",
+                "column_separator" = ",",
+                "data_consistency" = "partition"
+            );
+        """
+        // do insert in parallel
+        sql """INSERT INTO ${table_export_name} VALUES
+            (10, 'test', 11),
+            (20, 'test', 21),
+            (40, 'test', 51),
+            (80, 'test', 51)
+            """
+
+        // wait export
+        waiting_export.call(db, label)
+
+        // check file amounts
+        check_file_amounts.call("${outFilePath}", 3)
+
+        // check data correctness
+        sql """ DROP TABLE IF EXISTS ${table_load_name} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${table_load_name} (
+            `id` int(11) NULL,
+            `name` string NULL,
+            `age` int(11) NULL
+            )
+            DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        """
+
+        File[] files = new File("${outFilePath}").listFiles()
+        for (exportLoadFile in files) {
+            String file_path = exportLoadFile.getAbsolutePath()
+            streamLoad {
+                table "${table_load_name}"
+
+                set 'column_separator', ','
+                set 'columns', 'id, name, age'
+                set 'strict_mode', 'true'
+
+                file "${file_path}"
+                time 10000 // limit inflight 10s
+
+                check { result, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(0, json.NumberFilteredRows)
+                }
+            }
+        }
+
+        qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${table_load_name}")
+        delete_files.call("${outFilePath}")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to