This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c1f28b73d52 [fix](nereids) fix broker load planner don't support multi
file group (#56372)
c1f28b73d52 is described below
commit c1f28b73d52e050492892e6868dc5bb1ff56fea7
Author: Xin Liao <[email protected]>
AuthorDate: Sat Sep 27 12:29:22 2025 +0800
[fix](nereids) fix broker load planner don't support multi file group
(#56372)
---
.../glue/translator/PlanTranslatorContext.java | 15 +++-
.../nereids/load/NereidsLoadPlanInfoCollector.java | 60 ++++++++-------
.../nereids/load/NereidsLoadingTaskPlanner.java | 63 ++++++++-------
.../nereids/load/NereidsStreamLoadPlanner.java | 12 ++-
.../org/apache/doris/planner/FileLoadScanNode.java | 23 +++---
.../test_broker_load_multi_filegroup.out | 7 ++
.../test_broker_load_multi_filegroup.groovy | 89 ++++++++++++++++++++++
7 files changed, 204 insertions(+), 65 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index a800603a7c3..89e1da82950 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -69,7 +69,7 @@ public class PlanTranslatorContext {
private final ConnectContext connectContext;
private final List<PlanFragment> planFragments = Lists.newArrayList();
- private final DescriptorTable descTable = new DescriptorTable();
+ private DescriptorTable descTable;
private final RuntimeFilterTranslator translator;
@@ -124,8 +124,20 @@ public class PlanTranslatorContext {
this.translator = new
RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
this.topnFilterContext = ctx.getTopnFilterContext();
this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
+ this.descTable = new DescriptorTable();
}
+ public PlanTranslatorContext(CascadesContext ctx, DescriptorTable
descTable) {
+ this.connectContext = ctx.getConnectContext();
+ this.translator = new
RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
+ this.topnFilterContext = ctx.getTopnFilterContext();
+ this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
+ this.descTable = descTable;
+ }
+
+ /**
+ * Constructor for testing purposes with default values.
+ */
@VisibleForTesting
public PlanTranslatorContext() {
this.connectContext = null;
@@ -133,6 +145,7 @@ public class PlanTranslatorContext {
this.topnFilterContext = new TopnFilterContext();
IdGenerator<RuntimeFilterId> runtimeFilterIdGen =
RuntimeFilterId.createGenerator();
this.runtimeFilterV2Context = new
RuntimeFilterContextV2(runtimeFilterIdGen);
+ this.descTable = new DescriptorTable();
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java
index 5cab4bd0a2d..83b5f885fbd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java
@@ -96,12 +96,12 @@ public class NereidsLoadPlanInfoCollector extends
DefaultPlanVisitor<Void, PlanT
* store OlapTableSink and required information for FileLoadScanNode
*/
public static class LoadPlanInfo {
- private DescriptorTable descriptorTable;
// the file source tuple's id, the tuple represents original columns
reading from file
private TupleId srcTupleId;
// the file source tuple's slots
private List<SlotId> srcSlotIds;
// FileLoadScanNode's output tuple, represents remapped columns
+ // For multiple file groups in Broker load, this destTuple is shared
across all file groups
private TupleDescriptor destTuple;
// the map of slots in destTuple and its mapping expression
private Map<SlotId, Expr> destSlotIdToExprMap;
@@ -119,10 +119,6 @@ public class NereidsLoadPlanInfoCollector extends
DefaultPlanVisitor<Void, PlanT
// OlapTableSink for dest table
private OlapTableSink olapTableSink;
- public DescriptorTable getDescriptorTable() {
- return descriptorTable;
- }
-
public TupleDescriptor getDestTuple() {
return destTuple;
}
@@ -264,13 +260,14 @@ public class NereidsLoadPlanInfoCollector extends
DefaultPlanVisitor<Void, PlanT
/**
* visit logical plan tree and create a LoadPlanInfo
*/
- public LoadPlanInfo collectLoadPlanInfo(LogicalPlan logicalPlan) {
+ public LoadPlanInfo collectLoadPlanInfo(LogicalPlan logicalPlan,
DescriptorTable descTable,
+ TupleDescriptor scanDescriptor) {
this.logicalPlan = logicalPlan;
CascadesContext cascadesContext = CascadesContext.initContext(new
StatementContext(),
logicalPlan, PhysicalProperties.ANY);
- PlanTranslatorContext context = new
PlanTranslatorContext(cascadesContext);
+ PlanTranslatorContext context = new
PlanTranslatorContext(cascadesContext, descTable);
+ loadPlanInfo.destTuple = scanDescriptor;
logicalPlan.accept(this, context);
- loadPlanInfo.descriptorTable = context.getDescTable();
return loadPlanInfo;
}
@@ -344,29 +341,40 @@ public class NereidsLoadPlanInfoCollector extends
DefaultPlanVisitor<Void, PlanT
List<Expr> projectList = outputs.stream().map(e ->
ExpressionTranslator.translate(e, context))
.collect(Collectors.toList());
- List<Slot> slotList =
outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList());
-
- // ignore projectList's nullability and set the expr's nullable info
same as dest table column
- // why do this? looks like be works in this way...
- // and we have to do some extra work in visitLogicalFilter because
this ood behavior
- int size = slotList.size();
- List<Slot> newSlotList = new ArrayList<>(size);
- for (int i = 0; i < size; ++i) {
- SlotReference slot = (SlotReference) slotList.get(i);
- Column col = destTable.getColumn(slot.getName());
- if (col != null) {
- slot = slot.withColumn(col);
- if (col.isAutoInc()) {
- newSlotList.add(slot.withNullable(true));
+
+ // For Broker load with multiple file groups, all file groups share
the same destTuple.
+ // Create slots for destTuple only when processing the first file
group (when slots are empty).
+ // Subsequent file groups will reuse the slots created by the first
file group.
+ if (loadPlanInfo.destTuple.getSlots().isEmpty()) {
+ List<Slot> slotList =
outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList());
+
+ // ignore projectList's nullability and set the expr's nullable
info same as
+ // dest table column
+ // why do this? looks like be works in this way...
+ // and we have to do some extra work in visitLogicalFilter because
this ood
+ // behavior
+ int size = slotList.size();
+ List<Slot> newSlotList = new ArrayList<>(size);
+ for (int i = 0; i < size; ++i) {
+ SlotReference slot = (SlotReference) slotList.get(i);
+ Column col = destTable.getColumn(slot.getName());
+ if (col != null) {
+ slot = slot.withColumn(col);
+ if (col.isAutoInc()) {
+ newSlotList.add(slot.withNullable(true));
+ } else {
+ newSlotList.add(slot.withNullable(col.isAllowNull()));
+ }
} else {
- newSlotList.add(slot.withNullable(col.isAllowNull()));
+ newSlotList.add(slot);
}
- } else {
- newSlotList.add(slot);
+ }
+
+ for (Slot slot : newSlotList) {
+ context.createSlotDesc(loadPlanInfo.destTuple, (SlotReference)
slot, destTable);
}
}
- loadPlanInfo.destTuple = generateTupleDesc(newSlotList, destTable,
context);
loadPlanInfo.destSlotIdToExprMap = Maps.newHashMap();
List<SlotDescriptor> slotDescriptorList =
loadPlanInfo.destTuple.getSlots();
for (int i = 0; i < slotDescriptorList.size(); ++i) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
index 39b3c81d548..877f58f5ec3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.load;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.PartitionNames;
+import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
@@ -74,7 +75,7 @@ public class NereidsLoadingTaskPlanner {
private final boolean singleTabletLoadPerSink;
private final boolean enableMemtableOnSinkNode;
private UserIdentity userInfo;
- private DescriptorTable descTable;
+ private final DescriptorTable descTable = new DescriptorTable();
// Output params
private List<PlanFragment> fragments = Lists.newArrayList();
@@ -144,14 +145,8 @@ public class NereidsLoadingTaskPlanner {
}
Preconditions.checkState(!fileGroups.isEmpty() && fileGroups.size() ==
fileStatusesList.size());
- NereidsFileGroupInfo fileGroupInfo = new
NereidsFileGroupInfo(loadJobId, txnId, table, brokerDesc,
- fileGroups.get(0), fileStatusesList.get(0), filesAdded,
strictMode, loadParallelism);
- NereidsLoadScanProvider loadScanProvider = new
NereidsLoadScanProvider(fileGroupInfo,
- partialUpdateInputColumns);
- NereidsParamCreateContext context =
loadScanProvider.createLoadContext();
+
PartitionNames partitionNames = getPartitionNames();
- LogicalPlan streamLoadPlan =
NereidsLoadUtils.createLoadPlan(fileGroupInfo, partitionNames, context,
- isPartialUpdate, partialUpdateNewKeyPolicy);
long txnTimeout = timeoutS == 0 ?
ConnectContext.get().getExecTimeoutS() : timeoutS;
if (txnTimeout > Integer.MAX_VALUE) {
txnTimeout = Integer.MAX_VALUE;
@@ -159,34 +154,50 @@ public class NereidsLoadingTaskPlanner {
NereidsBrokerLoadTask nereidsBrokerLoadTask = new
NereidsBrokerLoadTask(txnId, (int) txnTimeout,
sendBatchParallelism,
strictMode, enableMemtableOnSinkNode, partitionNames);
- NereidsLoadPlanInfoCollector planInfoCollector = new
NereidsLoadPlanInfoCollector(table, nereidsBrokerLoadTask,
- loadId, dbId, isPartialUpdate ?
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS : TUniqueKeyUpdateMode.UPSERT,
- partialUpdateNewKeyPolicy, partialUpdateInputColumns,
context.exprMap);
- NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo =
planInfoCollector.collectLoadPlanInfo(streamLoadPlan);
- descTable = loadPlanInfo.getDescriptorTable();
- FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfo.getDestTuple());
+
+ TupleDescriptor scanTupleDesc = descTable.createTupleDescriptor();
+ scanTupleDesc.setTable(table);
+ // Collect all file group infos, contexts, and load plan infos
List<NereidsFileGroupInfo> fileGroupInfos = new
ArrayList<>(fileGroups.size());
List<NereidsParamCreateContext> contexts = new
ArrayList<>(fileGroups.size());
- fileGroupInfos.add(fileGroupInfo);
- contexts.add(context);
- for (int i = 1; i < fileGroups.size(); ++i) {
- fileGroupInfos.add(new NereidsFileGroupInfo(loadJobId, txnId,
table, brokerDesc,
- fileGroups.get(i), fileStatusesList.get(i), filesAdded,
strictMode, loadParallelism));
- NereidsParamCreateContext paramCreateContext = new
NereidsParamCreateContext();
- paramCreateContext.fileGroup = fileGroups.get(i);
- contexts.add(paramCreateContext);
+ List<NereidsLoadPlanInfoCollector.LoadPlanInfo> loadPlanInfos =
+ new ArrayList<>(fileGroups.size());
+
+ // Create a separate plan for each file group
+ for (int i = 0; i < fileGroups.size(); ++i) {
+ NereidsFileGroupInfo fileGroupInfo = new
NereidsFileGroupInfo(loadJobId, txnId, table, brokerDesc,
+ fileGroups.get(i), fileStatusesList.get(i), filesAdded,
strictMode, loadParallelism);
+ NereidsLoadScanProvider loadScanProvider = new
NereidsLoadScanProvider(fileGroupInfo,
+ partialUpdateInputColumns);
+ NereidsParamCreateContext context =
loadScanProvider.createLoadContext();
+ LogicalPlan loadPlan =
NereidsLoadUtils.createLoadPlan(fileGroupInfo, partitionNames, context,
+ isPartialUpdate, partialUpdateNewKeyPolicy);
+
+ NereidsLoadPlanInfoCollector planInfoCollector = new
NereidsLoadPlanInfoCollector(table,
+ nereidsBrokerLoadTask, loadId, dbId,
+ isPartialUpdate ?
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS : TUniqueKeyUpdateMode.UPSERT,
+ partialUpdateNewKeyPolicy, partialUpdateInputColumns,
context.exprMap);
+ NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo =
planInfoCollector.collectLoadPlanInfo(loadPlan,
+ descTable, scanTupleDesc);
+
+ fileGroupInfos.add(fileGroupInfo);
+ contexts.add(context);
+ loadPlanInfos.add(loadPlanInfo);
}
- fileScanNode.finalizeForNereids(loadId, fileGroupInfos, contexts,
loadPlanInfo);
+
+ // Create a single FileLoadScanNode for all file groups
+ FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfos.get(0).getDestTuple());
+ fileScanNode.finalizeForNereids(loadId, fileGroupInfos, contexts,
loadPlanInfos);
scanNodes.add(fileScanNode);
- // 3. Plan fragment
+ // Create plan fragment
PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0),
fileScanNode, DataPartition.RANDOM);
sinkFragment.setParallelExecNum(loadParallelism);
- sinkFragment.setSink(loadPlanInfo.getOlapTableSink());
+ sinkFragment.setSink(loadPlanInfos.get(0).getOlapTableSink());
fragments.add(sinkFragment);
- // 4. finalize
+ // finalize
for (PlanFragment fragment : fragments) {
fragment.finalize(null);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
index 2a017dbf075..a576a8113f4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
@@ -18,6 +18,8 @@
package org.apache.doris.nereids.load;
import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@@ -245,10 +247,14 @@ public class NereidsStreamLoadPlanner {
NereidsLoadPlanInfoCollector planInfoCollector = new
NereidsLoadPlanInfoCollector(destTable, taskInfo, loadId,
db.getId(), uniquekeyUpdateMode, partialUpdateNewRowPolicy,
partialUpdateInputColumns,
context.exprMap);
- NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo =
planInfoCollector.collectLoadPlanInfo(streamLoadPlan);
+ DescriptorTable descriptorTable = new DescriptorTable();
+ TupleDescriptor scanTupleDesc =
descriptorTable.createTupleDescriptor();
+ scanTupleDesc.setTable(destTable);
+ NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo =
planInfoCollector.collectLoadPlanInfo(streamLoadPlan,
+ descriptorTable, scanTupleDesc);
FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfo.getDestTuple());
fileScanNode.finalizeForNereids(loadId,
Lists.newArrayList(fileGroupInfo), Lists.newArrayList(context),
- loadPlanInfo);
+ Lists.newArrayList(loadPlanInfo));
scanNode = fileScanNode;
// for stream load, we only need one fragment, ScanNode -> DataSink.
@@ -262,7 +268,7 @@ public class NereidsStreamLoadPlanner {
params.setProtocolVersion(PaloInternalServiceVersion.V1);
params.setFragment(fragment.toThrift());
- params.setDescTbl(loadPlanInfo.getDescriptorTable().toThrift());
+ params.setDescTbl(descriptorTable.toThrift());
params.setCoord(new
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
params.setCurrentConnectFe(new
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
index 391d3a10e1a..c2e873c6613 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
@@ -64,18 +64,13 @@ public class FileLoadScanNode extends FileScanNode {
}
public void finalizeForNereids(TUniqueId loadId,
List<NereidsFileGroupInfo> fileGroupInfos,
- List<NereidsParamCreateContext> contexts,
NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo)
+ List<NereidsParamCreateContext> contexts,
List<NereidsLoadPlanInfoCollector.LoadPlanInfo> loadPlanInfos)
throws UserException {
Preconditions.checkState(contexts.size() == fileGroupInfos.size(),
contexts.size() + " vs. " + fileGroupInfos.size());
- List<Expr> preFilterList = loadPlanInfo.getPreFilterExprList();
- if (preFilterList != null) {
- addPreFilterConjuncts(preFilterList);
- }
- List<Expr> postFilterList = loadPlanInfo.getPostFilterExprList();
- if (postFilterList != null) {
- addConjuncts(postFilterList);
- }
+ Preconditions.checkState(loadPlanInfos.size() == fileGroupInfos.size(),
+ loadPlanInfos.size() + " vs. " + fileGroupInfos.size());
+
// ATTN: for load scan node, do not use backend policy in
ExternalScanNode.
// Because backend policy in ExternalScanNode may only contain compute
backend.
// But for load job, we should select backends from all backends, both
compute and mix.
@@ -88,6 +83,16 @@ public class FileLoadScanNode extends FileScanNode {
for (int i = 0; i < contexts.size(); ++i) {
NereidsParamCreateContext context = contexts.get(i);
NereidsFileGroupInfo fileGroupInfo = fileGroupInfos.get(i);
+ NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo =
loadPlanInfos.get(i);
+ // Add filters for each file group's load plan info
+ List<Expr> preFilterList = loadPlanInfo.getPreFilterExprList();
+ if (preFilterList != null) {
+ addPreFilterConjuncts(preFilterList);
+ }
+ List<Expr> postFilterList = loadPlanInfo.getPostFilterExprList();
+ if (postFilterList != null) {
+ addConjuncts(postFilterList);
+ }
context.params = loadPlanInfo.toFileScanRangeParams(loadId,
fileGroupInfo);
createScanRangeLocations(context, fileGroupInfo,
localBackendPolicy);
this.selectedSplitNum += fileGroupInfo.getFileStatuses().size();
diff --git
a/regression-test/data/load_p0/broker_load/test_broker_load_multi_filegroup.out
b/regression-test/data/load_p0/broker_load/test_broker_load_multi_filegroup.out
new file mode 100644
index 00000000000..e408e27718c
--- /dev/null
+++
b/regression-test/data/load_p0/broker_load/test_broker_load_multi_filegroup.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !pr22666_1 --
+99510
+
+-- !pr22666_2 --
+100490
+
diff --git
a/regression-test/suites/load_p0/broker_load/test_broker_load_multi_filegroup.groovy
b/regression-test/suites/load_p0/broker_load/test_broker_load_multi_filegroup.groovy
new file mode 100644
index 00000000000..545f7d2ce81
--- /dev/null
+++
b/regression-test/suites/load_p0/broker_load/test_broker_load_multi_filegroup.groovy
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_broker_load_multi_filegroup", "p0") {
+ def s3BucketName = getS3BucketName()
+ def s3Endpoint = getS3Endpoint()
+ def s3Region = getS3Region()
+
+ // pr 22666
+ def tbl_22666 = "part_22666"
+ sql """drop table if exists ${tbl_22666} force"""
+ sql """
+ CREATE TABLE ${tbl_22666} (
+ p_partkey int NULL,
+ p_name VARCHAR(55) NULL,
+ p_mfgr VARCHAR(25) NULL,
+ p_brand VARCHAR(50) NULL,
+ p_type VARCHAR(25) NULL,
+ p_size int NULL,
+ p_container VARCHAR(10) NULL,
+ p_retailprice decimal(15, 2) NULL,
+ p_comment VARCHAR(23) NULL
+ )ENGINE=OLAP
+ DUPLICATE KEY(`p_partkey`)
+ DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ def label_22666 = "part_" + UUID.randomUUID().toString().replace("-", "0")
+ sql """
+ LOAD LABEL ${label_22666} (
+ DATA
INFILE("s3://${s3BucketName}/regression/load/data/part0.parquet")
+ INTO TABLE ${tbl_22666}
+ FORMAT AS "PARQUET"
+ (p_partkey, p_name, p_mfgr),
+ DATA
INFILE("s3://${s3BucketName}/regression/load/data/part1.parquet")
+ INTO TABLE ${tbl_22666}
+ FORMAT AS "PARQUET"
+ (p_partkey, p_brand, p_type)
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "${getS3AK()}",
+ "AWS_SECRET_KEY" = "${getS3SK()}",
+ "AWS_ENDPOINT" = "${s3Endpoint}",
+ "AWS_REGION" = "${s3Region}",
+ "provider" = "${getS3Provider()}"
+ );
+ """
+
+ def max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ def String[][] result = sql """ show load where label="$label_22666"
order by createtime desc limit 1; """
+ logger.info("Load status: " + result[0])
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("Load FINISHED " + label_22666)
+ break;
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ assertTrue(false, "load failed: $result")
+ break;
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertTrue(1 == 2, "load Timeout: $label_22666")
+ }
+ }
+
+ order_qt_pr22666_1 """ select count(*) from ${tbl_22666} where p_brand is
not null limit 10;"""
+ order_qt_pr22666_2 """ select count(*) from ${tbl_22666} where p_name is
not null limit 10;"""
+
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]