This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new c23b6744812 [fix](broker load) pass loadToSingleTablet to 
olapTableSink (#26680) (#26869)
c23b6744812 is described below

commit c23b674481276b0e1b137deac752bdd727c6e7b9
Author: qiye <[email protected]>
AuthorDate: Mon Nov 13 17:17:59 2023 +0800

    [fix](broker load) pass loadToSingleTablet to olapTableSink (#26680) 
(#26869)
---
 .../main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java    | 2 +-
 .../main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java | 7 +++++--
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 9a2fdb647f4..3bdbe52209b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -111,7 +111,7 @@ public class LoadLoadingTask extends LoadTask {
         this.loadId = loadId;
         planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, 
db.getId(), table, brokerDesc, fileGroups,
                 strictMode, isPartialUpdate, timezone, this.timeoutS, 
this.loadParallelism, this.sendBatchParallelism,
-                this.useNewLoadScanNode, userInfo);
+                this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink);
         planner.plan(loadId, fileStatusList, fileNum);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index a11ea7634f2..304dd2478c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -72,6 +72,7 @@ public class LoadingTaskPlanner {
     private final int loadParallelism;
     private final int sendBatchParallelism;
     private final boolean useNewLoadScanNode;
+    private final boolean singleTabletLoadPerSink;
     private UserIdentity userInfo;
     // Something useful
     // ConnectContext here is just a dummy object to avoid some NPE problem, 
like ctx.getDatabase()
@@ -87,7 +88,8 @@ public class LoadingTaskPlanner {
     public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable 
table,
             BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
             boolean strictMode, boolean isPartialUpdate, String timezone, long 
timeoutS, int loadParallelism,
-            int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity 
userInfo) {
+            int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity 
userInfo,
+            boolean singleTabletLoadPerSink) {
         this.loadJobId = loadJobId;
         this.txnId = txnId;
         this.dbId = dbId;
@@ -101,6 +103,7 @@ public class LoadingTaskPlanner {
         this.loadParallelism = loadParallelism;
         this.sendBatchParallelism = sendBatchParallelism;
         this.useNewLoadScanNode = useNewLoadScanNode;
+        this.singleTabletLoadPerSink = singleTabletLoadPerSink;
         this.userInfo = userInfo;
         if (Env.getCurrentEnv().getAccessManager()
                 .checkDbPriv(userInfo, 
Env.getCurrentInternalCatalog().getDbNullable(dbId).getFullName(),
@@ -211,7 +214,7 @@ public class LoadingTaskPlanner {
         List<Long> partitionIds = getAllPartitionIds();
         OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, 
partitionIds,
                 Config.enable_single_replica_load);
-        olapTableSink.init(loadId, txnId, dbId, timeoutS, 
sendBatchParallelism, false, strictMode);
+        olapTableSink.init(loadId, txnId, dbId, timeoutS, 
sendBatchParallelism, singleTabletLoadPerSink, strictMode);
         olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, 
partialUpdateInputColumns);
         olapTableSink.complete(analyzer);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to