This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new c23b6744812 [fix](broker load) pass loadToSingleTablet to
olapTableSink (#26680) (#26869)
c23b6744812 is described below
commit c23b674481276b0e1b137deac752bdd727c6e7b9
Author: qiye <[email protected]>
AuthorDate: Mon Nov 13 17:17:59 2023 +0800
[fix](broker load) pass loadToSingleTablet to olapTableSink (#26680)
(#26869)
---
.../main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java | 2 +-
.../main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java | 7 +++++--
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 9a2fdb647f4..3bdbe52209b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -111,7 +111,7 @@ public class LoadLoadingTask extends LoadTask {
this.loadId = loadId;
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId,
db.getId(), table, brokerDesc, fileGroups,
strictMode, isPartialUpdate, timezone, this.timeoutS,
this.loadParallelism, this.sendBatchParallelism,
- this.useNewLoadScanNode, userInfo);
+ this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink);
planner.plan(loadId, fileStatusList, fileNum);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index a11ea7634f2..304dd2478c7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -72,6 +72,7 @@ public class LoadingTaskPlanner {
private final int loadParallelism;
private final int sendBatchParallelism;
private final boolean useNewLoadScanNode;
+ private final boolean singleTabletLoadPerSink;
private UserIdentity userInfo;
// Something useful
// ConnectContext here is just a dummy object to avoid some NPE problem,
like ctx.getDatabase()
@@ -87,7 +88,8 @@ public class LoadingTaskPlanner {
public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable
table,
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
boolean strictMode, boolean isPartialUpdate, String timezone, long
timeoutS, int loadParallelism,
- int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity
userInfo) {
+ int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity
userInfo,
+ boolean singleTabletLoadPerSink) {
this.loadJobId = loadJobId;
this.txnId = txnId;
this.dbId = dbId;
@@ -101,6 +103,7 @@ public class LoadingTaskPlanner {
this.loadParallelism = loadParallelism;
this.sendBatchParallelism = sendBatchParallelism;
this.useNewLoadScanNode = useNewLoadScanNode;
+ this.singleTabletLoadPerSink = singleTabletLoadPerSink;
this.userInfo = userInfo;
if (Env.getCurrentEnv().getAccessManager()
.checkDbPriv(userInfo,
Env.getCurrentInternalCatalog().getDbNullable(dbId).getFullName(),
@@ -211,7 +214,7 @@ public class LoadingTaskPlanner {
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc,
partitionIds,
Config.enable_single_replica_load);
- olapTableSink.init(loadId, txnId, dbId, timeoutS,
sendBatchParallelism, false, strictMode);
+ olapTableSink.init(loadId, txnId, dbId, timeoutS,
sendBatchParallelism, singleTabletLoadPerSink, strictMode);
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate,
partialUpdateInputColumns);
olapTableSink.complete(analyzer);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]