This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c1f28b73d52 [fix](nereids) fix broker load planner don't support multi 
file group (#56372)
c1f28b73d52 is described below

commit c1f28b73d52e050492892e6868dc5bb1ff56fea7
Author: Xin Liao <[email protected]>
AuthorDate: Sat Sep 27 12:29:22 2025 +0800

    [fix](nereids) fix broker load planner don't support multi file group 
(#56372)
---
 .../glue/translator/PlanTranslatorContext.java     | 15 +++-
 .../nereids/load/NereidsLoadPlanInfoCollector.java | 60 ++++++++-------
 .../nereids/load/NereidsLoadingTaskPlanner.java    | 63 ++++++++-------
 .../nereids/load/NereidsStreamLoadPlanner.java     | 12 ++-
 .../org/apache/doris/planner/FileLoadScanNode.java | 23 +++---
 .../test_broker_load_multi_filegroup.out           |  7 ++
 .../test_broker_load_multi_filegroup.groovy        | 89 ++++++++++++++++++++++
 7 files changed, 204 insertions(+), 65 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index a800603a7c3..89e1da82950 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -69,7 +69,7 @@ public class PlanTranslatorContext {
     private final ConnectContext connectContext;
     private final List<PlanFragment> planFragments = Lists.newArrayList();
 
-    private final DescriptorTable descTable = new DescriptorTable();
+    private DescriptorTable descTable;
 
     private final RuntimeFilterTranslator translator;
 
@@ -124,8 +124,20 @@ public class PlanTranslatorContext {
         this.translator = new 
RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
         this.topnFilterContext = ctx.getTopnFilterContext();
         this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
+        this.descTable = new DescriptorTable();
     }
 
+    public PlanTranslatorContext(CascadesContext ctx, DescriptorTable 
descTable) {
+        this.connectContext = ctx.getConnectContext();
+        this.translator = new 
RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
+        this.topnFilterContext = ctx.getTopnFilterContext();
+        this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
+        this.descTable = descTable;
+    }
+
+    /**
+     * Constructor for testing purposes with default values.
+     */
     @VisibleForTesting
     public PlanTranslatorContext() {
         this.connectContext = null;
@@ -133,6 +145,7 @@ public class PlanTranslatorContext {
         this.topnFilterContext = new TopnFilterContext();
         IdGenerator<RuntimeFilterId> runtimeFilterIdGen = 
RuntimeFilterId.createGenerator();
         this.runtimeFilterV2Context = new 
RuntimeFilterContextV2(runtimeFilterIdGen);
+        this.descTable = new DescriptorTable();
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java
index 5cab4bd0a2d..83b5f885fbd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java
@@ -96,12 +96,12 @@ public class NereidsLoadPlanInfoCollector extends 
DefaultPlanVisitor<Void, PlanT
      * store OlapTableSink and required information for FileLoadScanNode
      */
     public static class LoadPlanInfo {
-        private DescriptorTable descriptorTable;
         // the file source tuple's id, the tuple represents original columns 
reading from file
         private TupleId srcTupleId;
         // the file source tuple's slots
         private List<SlotId> srcSlotIds;
         // FileLoadScanNode's output tuple, represents remapped columns
+        // For multiple file groups in Broker load, this destTuple is shared 
across all file groups
         private TupleDescriptor destTuple;
         // the map of slots in destTuple and its mapping expression
         private Map<SlotId, Expr> destSlotIdToExprMap;
@@ -119,10 +119,6 @@ public class NereidsLoadPlanInfoCollector extends 
DefaultPlanVisitor<Void, PlanT
         // OlapTableSink for dest table
         private OlapTableSink olapTableSink;
 
-        public DescriptorTable getDescriptorTable() {
-            return descriptorTable;
-        }
-
         public TupleDescriptor getDestTuple() {
             return destTuple;
         }
@@ -264,13 +260,14 @@ public class NereidsLoadPlanInfoCollector extends 
DefaultPlanVisitor<Void, PlanT
     /**
      * visit logical plan tree and create a LoadPlanInfo
      */
-    public LoadPlanInfo collectLoadPlanInfo(LogicalPlan logicalPlan) {
+    public LoadPlanInfo collectLoadPlanInfo(LogicalPlan logicalPlan, 
DescriptorTable descTable,
+            TupleDescriptor scanDescriptor) {
         this.logicalPlan = logicalPlan;
         CascadesContext cascadesContext = CascadesContext.initContext(new 
StatementContext(),
                 logicalPlan, PhysicalProperties.ANY);
-        PlanTranslatorContext context = new 
PlanTranslatorContext(cascadesContext);
+        PlanTranslatorContext context = new 
PlanTranslatorContext(cascadesContext, descTable);
+        loadPlanInfo.destTuple = scanDescriptor;
         logicalPlan.accept(this, context);
-        loadPlanInfo.descriptorTable = context.getDescTable();
         return loadPlanInfo;
     }
 
@@ -344,29 +341,40 @@ public class NereidsLoadPlanInfoCollector extends 
DefaultPlanVisitor<Void, PlanT
 
         List<Expr> projectList = outputs.stream().map(e -> 
ExpressionTranslator.translate(e, context))
                 .collect(Collectors.toList());
-        List<Slot> slotList = 
outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList());
-
-        // ignore projectList's nullability and set the expr's nullable info 
same as dest table column
-        // why do this? looks like be works in this way...
-        // and we have to do some extra work in visitLogicalFilter because 
this ood behavior
-        int size = slotList.size();
-        List<Slot> newSlotList = new ArrayList<>(size);
-        for (int i = 0; i < size; ++i) {
-            SlotReference slot = (SlotReference) slotList.get(i);
-            Column col = destTable.getColumn(slot.getName());
-            if (col != null) {
-                slot = slot.withColumn(col);
-                if (col.isAutoInc()) {
-                    newSlotList.add(slot.withNullable(true));
+
+        // For Broker load with multiple file groups, all file groups share 
the same destTuple.
+        // Create slots for destTuple only when processing the first file 
group (when slots are empty).
+        // Subsequent file groups will reuse the slots created by the first 
file group.
+        if (loadPlanInfo.destTuple.getSlots().isEmpty()) {
+            List<Slot> slotList = 
outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList());
+
+            // ignore projectList's nullability and set the expr's nullable 
info same as
+            // dest table column
+            // why do this? looks like be works in this way...
+            // and we have to do some extra work in visitLogicalFilter because 
this ood
+            // behavior
+            int size = slotList.size();
+            List<Slot> newSlotList = new ArrayList<>(size);
+            for (int i = 0; i < size; ++i) {
+                SlotReference slot = (SlotReference) slotList.get(i);
+                Column col = destTable.getColumn(slot.getName());
+                if (col != null) {
+                    slot = slot.withColumn(col);
+                    if (col.isAutoInc()) {
+                        newSlotList.add(slot.withNullable(true));
+                    } else {
+                        newSlotList.add(slot.withNullable(col.isAllowNull()));
+                    }
                 } else {
-                    newSlotList.add(slot.withNullable(col.isAllowNull()));
+                    newSlotList.add(slot);
                 }
-            } else {
-                newSlotList.add(slot);
+            }
+
+            for (Slot slot : newSlotList) {
+                context.createSlotDesc(loadPlanInfo.destTuple, (SlotReference) 
slot, destTable);
             }
         }
 
-        loadPlanInfo.destTuple = generateTupleDesc(newSlotList, destTable, 
context);
         loadPlanInfo.destSlotIdToExprMap = Maps.newHashMap();
         List<SlotDescriptor> slotDescriptorList = 
loadPlanInfo.destTuple.getSlots();
         for (int i = 0; i < slotDescriptorList.size(); ++i) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
index 39b3c81d548..877f58f5ec3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.load;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.PartitionNames;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.OlapTable;
@@ -74,7 +75,7 @@ public class NereidsLoadingTaskPlanner {
     private final boolean singleTabletLoadPerSink;
     private final boolean enableMemtableOnSinkNode;
     private UserIdentity userInfo;
-    private DescriptorTable descTable;
+    private final DescriptorTable descTable = new DescriptorTable();
 
     // Output params
     private List<PlanFragment> fragments = Lists.newArrayList();
@@ -144,14 +145,8 @@ public class NereidsLoadingTaskPlanner {
         }
 
         Preconditions.checkState(!fileGroups.isEmpty() && fileGroups.size() == 
fileStatusesList.size());
-        NereidsFileGroupInfo fileGroupInfo = new 
NereidsFileGroupInfo(loadJobId, txnId, table, brokerDesc,
-                fileGroups.get(0), fileStatusesList.get(0), filesAdded, 
strictMode, loadParallelism);
-        NereidsLoadScanProvider loadScanProvider = new 
NereidsLoadScanProvider(fileGroupInfo,
-                partialUpdateInputColumns);
-        NereidsParamCreateContext context = 
loadScanProvider.createLoadContext();
+
         PartitionNames partitionNames = getPartitionNames();
-        LogicalPlan streamLoadPlan = 
NereidsLoadUtils.createLoadPlan(fileGroupInfo, partitionNames, context,
-                isPartialUpdate, partialUpdateNewKeyPolicy);
         long txnTimeout = timeoutS == 0 ? 
ConnectContext.get().getExecTimeoutS() : timeoutS;
         if (txnTimeout > Integer.MAX_VALUE) {
             txnTimeout = Integer.MAX_VALUE;
@@ -159,34 +154,50 @@ public class NereidsLoadingTaskPlanner {
         NereidsBrokerLoadTask nereidsBrokerLoadTask = new 
NereidsBrokerLoadTask(txnId, (int) txnTimeout,
                 sendBatchParallelism,
                 strictMode, enableMemtableOnSinkNode, partitionNames);
-        NereidsLoadPlanInfoCollector planInfoCollector = new 
NereidsLoadPlanInfoCollector(table, nereidsBrokerLoadTask,
-                loadId, dbId, isPartialUpdate ? 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS : TUniqueKeyUpdateMode.UPSERT,
-                partialUpdateNewKeyPolicy, partialUpdateInputColumns, 
context.exprMap);
-        NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = 
planInfoCollector.collectLoadPlanInfo(streamLoadPlan);
-        descTable = loadPlanInfo.getDescriptorTable();
-        FileLoadScanNode fileScanNode = new FileLoadScanNode(new 
PlanNodeId(0), loadPlanInfo.getDestTuple());
+
+        TupleDescriptor scanTupleDesc = descTable.createTupleDescriptor();
+        scanTupleDesc.setTable(table);
+        // Collect all file group infos, contexts, and load plan infos
         List<NereidsFileGroupInfo> fileGroupInfos = new 
ArrayList<>(fileGroups.size());
         List<NereidsParamCreateContext> contexts = new 
ArrayList<>(fileGroups.size());
-        fileGroupInfos.add(fileGroupInfo);
-        contexts.add(context);
-        for (int i = 1; i < fileGroups.size(); ++i) {
-            fileGroupInfos.add(new NereidsFileGroupInfo(loadJobId, txnId, 
table, brokerDesc,
-                    fileGroups.get(i), fileStatusesList.get(i), filesAdded, 
strictMode, loadParallelism));
-            NereidsParamCreateContext paramCreateContext = new 
NereidsParamCreateContext();
-            paramCreateContext.fileGroup = fileGroups.get(i);
-            contexts.add(paramCreateContext);
+        List<NereidsLoadPlanInfoCollector.LoadPlanInfo> loadPlanInfos =
+                new ArrayList<>(fileGroups.size());
+
+        // Create a separate plan for each file group
+        for (int i = 0; i < fileGroups.size(); ++i) {
+            NereidsFileGroupInfo fileGroupInfo = new 
NereidsFileGroupInfo(loadJobId, txnId, table, brokerDesc,
+                    fileGroups.get(i), fileStatusesList.get(i), filesAdded, 
strictMode, loadParallelism);
+            NereidsLoadScanProvider loadScanProvider = new 
NereidsLoadScanProvider(fileGroupInfo,
+                    partialUpdateInputColumns);
+            NereidsParamCreateContext context = 
loadScanProvider.createLoadContext();
+            LogicalPlan loadPlan = 
NereidsLoadUtils.createLoadPlan(fileGroupInfo, partitionNames, context,
+                    isPartialUpdate, partialUpdateNewKeyPolicy);
+
+            NereidsLoadPlanInfoCollector planInfoCollector = new 
NereidsLoadPlanInfoCollector(table,
+                    nereidsBrokerLoadTask, loadId, dbId,
+                    isPartialUpdate ? 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS : TUniqueKeyUpdateMode.UPSERT,
+                    partialUpdateNewKeyPolicy, partialUpdateInputColumns, 
context.exprMap);
+            NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = 
planInfoCollector.collectLoadPlanInfo(loadPlan,
+                    descTable, scanTupleDesc);
+
+            fileGroupInfos.add(fileGroupInfo);
+            contexts.add(context);
+            loadPlanInfos.add(loadPlanInfo);
         }
-        fileScanNode.finalizeForNereids(loadId, fileGroupInfos, contexts, 
loadPlanInfo);
+
+        // Create a single FileLoadScanNode for all file groups
+        FileLoadScanNode fileScanNode = new FileLoadScanNode(new 
PlanNodeId(0), loadPlanInfos.get(0).getDestTuple());
+        fileScanNode.finalizeForNereids(loadId, fileGroupInfos, contexts, 
loadPlanInfos);
         scanNodes.add(fileScanNode);
 
-        // 3. Plan fragment
+        // Create plan fragment
         PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0), 
fileScanNode, DataPartition.RANDOM);
         sinkFragment.setParallelExecNum(loadParallelism);
-        sinkFragment.setSink(loadPlanInfo.getOlapTableSink());
+        sinkFragment.setSink(loadPlanInfos.get(0).getOlapTableSink());
 
         fragments.add(sinkFragment);
 
-        // 4. finalize
+        // finalize
         for (PlanFragment fragment : fragments) {
             fragment.finalize(null);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
index 2a017dbf075..a576a8113f4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
@@ -18,6 +18,8 @@
 package org.apache.doris.nereids.load;
 
 import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
@@ -245,10 +247,14 @@ public class NereidsStreamLoadPlanner {
         NereidsLoadPlanInfoCollector planInfoCollector = new 
NereidsLoadPlanInfoCollector(destTable, taskInfo, loadId,
                 db.getId(), uniquekeyUpdateMode, partialUpdateNewRowPolicy, 
partialUpdateInputColumns,
                 context.exprMap);
-        NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = 
planInfoCollector.collectLoadPlanInfo(streamLoadPlan);
+        DescriptorTable descriptorTable = new DescriptorTable();
+        TupleDescriptor scanTupleDesc = 
descriptorTable.createTupleDescriptor();
+        scanTupleDesc.setTable(destTable);
+        NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = 
planInfoCollector.collectLoadPlanInfo(streamLoadPlan,
+                descriptorTable, scanTupleDesc);
         FileLoadScanNode fileScanNode = new FileLoadScanNode(new 
PlanNodeId(0), loadPlanInfo.getDestTuple());
         fileScanNode.finalizeForNereids(loadId, 
Lists.newArrayList(fileGroupInfo), Lists.newArrayList(context),
-                loadPlanInfo);
+                Lists.newArrayList(loadPlanInfo));
         scanNode = fileScanNode;
 
         // for stream load, we only need one fragment, ScanNode -> DataSink.
@@ -262,7 +268,7 @@ public class NereidsStreamLoadPlanner {
         params.setProtocolVersion(PaloInternalServiceVersion.V1);
         params.setFragment(fragment.toThrift());
 
-        params.setDescTbl(loadPlanInfo.getDescriptorTable().toThrift());
+        params.setDescTbl(descriptorTable.toThrift());
         params.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
         params.setCurrentConnectFe(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
index 391d3a10e1a..c2e873c6613 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
@@ -64,18 +64,13 @@ public class FileLoadScanNode extends FileScanNode {
     }
 
     public void finalizeForNereids(TUniqueId loadId, 
List<NereidsFileGroupInfo> fileGroupInfos,
-            List<NereidsParamCreateContext> contexts, 
NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo)
+            List<NereidsParamCreateContext> contexts, 
List<NereidsLoadPlanInfoCollector.LoadPlanInfo> loadPlanInfos)
             throws UserException {
         Preconditions.checkState(contexts.size() == fileGroupInfos.size(),
                 contexts.size() + " vs. " + fileGroupInfos.size());
-        List<Expr> preFilterList = loadPlanInfo.getPreFilterExprList();
-        if (preFilterList != null) {
-            addPreFilterConjuncts(preFilterList);
-        }
-        List<Expr> postFilterList = loadPlanInfo.getPostFilterExprList();
-        if (postFilterList != null) {
-            addConjuncts(postFilterList);
-        }
+        Preconditions.checkState(loadPlanInfos.size() == fileGroupInfos.size(),
+                loadPlanInfos.size() + " vs. " + fileGroupInfos.size());
+
         // ATTN: for load scan node, do not use backend policy in 
ExternalScanNode.
         // Because backend policy in ExternalScanNode may only contain compute 
backend.
         // But for load job, we should select backends from all backends, both 
compute and mix.
@@ -88,6 +83,16 @@ public class FileLoadScanNode extends FileScanNode {
         for (int i = 0; i < contexts.size(); ++i) {
             NereidsParamCreateContext context = contexts.get(i);
             NereidsFileGroupInfo fileGroupInfo = fileGroupInfos.get(i);
+            NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = 
loadPlanInfos.get(i);
+            // Add filters for each file group's load plan info
+            List<Expr> preFilterList = loadPlanInfo.getPreFilterExprList();
+            if (preFilterList != null) {
+                addPreFilterConjuncts(preFilterList);
+            }
+            List<Expr> postFilterList = loadPlanInfo.getPostFilterExprList();
+            if (postFilterList != null) {
+                addConjuncts(postFilterList);
+            }
             context.params = loadPlanInfo.toFileScanRangeParams(loadId, 
fileGroupInfo);
             createScanRangeLocations(context, fileGroupInfo, 
localBackendPolicy);
             this.selectedSplitNum += fileGroupInfo.getFileStatuses().size();
diff --git 
a/regression-test/data/load_p0/broker_load/test_broker_load_multi_filegroup.out 
b/regression-test/data/load_p0/broker_load/test_broker_load_multi_filegroup.out
new file mode 100644
index 00000000000..e408e27718c
--- /dev/null
+++ 
b/regression-test/data/load_p0/broker_load/test_broker_load_multi_filegroup.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !pr22666_1 --
+99510
+
+-- !pr22666_2 --
+100490
+
diff --git 
a/regression-test/suites/load_p0/broker_load/test_broker_load_multi_filegroup.groovy
 
b/regression-test/suites/load_p0/broker_load/test_broker_load_multi_filegroup.groovy
new file mode 100644
index 00000000000..545f7d2ce81
--- /dev/null
+++ 
b/regression-test/suites/load_p0/broker_load/test_broker_load_multi_filegroup.groovy
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_broker_load_multi_filegroup", "p0") {
+    def s3BucketName = getS3BucketName()
+    def s3Endpoint = getS3Endpoint()
+    def s3Region = getS3Region()
+
+    // pr 22666
+    def tbl_22666 = "part_22666"
+    sql """drop table if exists ${tbl_22666} force"""
+    sql """
+        CREATE TABLE ${tbl_22666} (
+            p_partkey          int NULL,
+            p_name        VARCHAR(55) NULL,
+            p_mfgr        VARCHAR(25) NULL,
+            p_brand       VARCHAR(50) NULL,
+            p_type        VARCHAR(25) NULL,
+            p_size        int NULL,
+            p_container   VARCHAR(10) NULL,
+            p_retailprice decimal(15, 2) NULL,
+            p_comment     VARCHAR(23) NULL
+        )ENGINE=OLAP
+        DUPLICATE KEY(`p_partkey`)
+        DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+    """
+
+    def label_22666 = "part_" + UUID.randomUUID().toString().replace("-", "0")
+    sql """
+        LOAD LABEL ${label_22666} (
+            DATA 
INFILE("s3://${s3BucketName}/regression/load/data/part0.parquet")
+            INTO TABLE ${tbl_22666}
+            FORMAT AS "PARQUET"
+            (p_partkey, p_name, p_mfgr),
+            DATA 
INFILE("s3://${s3BucketName}/regression/load/data/part1.parquet")
+            INTO TABLE ${tbl_22666}
+            FORMAT AS "PARQUET"
+            (p_partkey, p_brand, p_type)
+        )
+        WITH S3 (
+            "AWS_ACCESS_KEY" = "${getS3AK()}",
+            "AWS_SECRET_KEY" = "${getS3SK()}",
+            "AWS_ENDPOINT" = "${s3Endpoint}",
+            "AWS_REGION" = "${s3Region}",
+            "provider" = "${getS3Provider()}"
+        );
+    """
+
+    def max_try_milli_secs = 600000
+    while (max_try_milli_secs > 0) {
+        def String[][] result = sql """ show load where label="$label_22666" 
order by createtime desc limit 1; """
+        logger.info("Load status: " + result[0])
+        if (result[0][2].equals("FINISHED")) {
+            logger.info("Load FINISHED " + label_22666)
+            break;
+        }
+        if (result[0][2].equals("CANCELLED")) {
+            assertTrue(false, "load failed: $result")
+            break;
+        }
+        Thread.sleep(1000)
+        max_try_milli_secs -= 1000
+        if(max_try_milli_secs <= 0) {
+            assertTrue(1 == 2, "load Timeout: $label_22666")
+        }
+    }
+
+    order_qt_pr22666_1 """ select count(*) from ${tbl_22666} where p_brand is 
not null limit 10;"""
+    order_qt_pr22666_2 """ select count(*) from ${tbl_22666} where p_name is 
not null limit 10;"""
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to