This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5367f7ed8b1 [performance](broker-load) increase default broker load 
batch size (#36477)
5367f7ed8b1 is described below

commit 5367f7ed8b15f91e63ee5c2ff98de82ec6450742
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jul 11 17:18:41 2024 +0800

    [performance](broker-load) increase default broker load batch size (#36477)
---
 .../org/apache/doris/cloud/load/CloudBrokerLoadJob.java    |  6 +++---
 .../org/apache/doris/cloud/load/CloudLoadLoadingTask.java  |  5 +++--
 .../java/org/apache/doris/load/loadv2/BrokerLoadJob.java   | 14 ++++++++------
 .../java/org/apache/doris/load/loadv2/LoadLoadingTask.java |  5 ++++-
 .../src/main/java/org/apache/doris/qe/Coordinator.java     |  4 ++++
 .../src/main/java/org/apache/doris/qe/SessionVariable.java |  5 +++++
 6 files changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
index 04560df0fd8..0c8e2082716 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
@@ -137,15 +137,15 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
 
     @Override
     protected LoadLoadingTask createTask(Database db, OlapTable table, 
List<BrokerFileGroup> brokerFileGroups,
-            boolean isEnableMemtableOnSinkNode, FileGroupAggKey aggKey, 
BrokerPendingTaskAttachment attachment)
-            throws UserException {
+            boolean isEnableMemtableOnSinkNode, int batchSize, FileGroupAggKey 
aggKey,
+            BrokerPendingTaskAttachment attachment) throws UserException {
         cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID);
         LoadLoadingTask task = new CloudLoadLoadingTask(db, table, brokerDesc,
                 brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
                 isStrictMode(), isPartialUpdate(), transactionId, this, 
getTimeZone(), getTimeout(),
                 getLoadParallelism(), getSendBatchParallelism(),
                 getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, 
isSingleTabletLoadPerSink(),
-                useNewLoadScanNode(), getPriority(), 
isEnableMemtableOnSinkNode, cloudClusterId);
+                useNewLoadScanNode(), getPriority(), 
isEnableMemtableOnSinkNode, batchSize, cloudClusterId);
         UUID uuid = UUID.randomUUID();
         TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java
index 912de173ecc..2316a072fa8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java
@@ -47,10 +47,11 @@ public class CloudLoadLoadingTask extends LoadLoadingTask {
             long txnId, LoadTaskCallback callback, String timezone,
             long timeoutS, int loadParallelism, int sendBatchParallelism,
             boolean loadZeroTolerance, Profile jobProfile, boolean 
singleTabletLoadPerSink,
-            boolean useNewLoadScanNode, Priority priority, boolean 
enableMemTableOnSinkNode, String clusterId) {
+            boolean useNewLoadScanNode, Priority priority, boolean 
enableMemTableOnSinkNode, int batchSize,
+            String clusterId) {
         super(db, table, brokerDesc, fileGroups, jobDeadlineMs, execMemLimit, 
strictMode, isPartialUpdate,
                 txnId, callback, timezone, timeoutS, loadParallelism, 
sendBatchParallelism, loadZeroTolerance,
-                jobProfile, singleTabletLoadPerSink, useNewLoadScanNode, 
priority, enableMemTableOnSinkNode);
+                jobProfile, singleTabletLoadPerSink, useNewLoadScanNode, 
priority, enableMemTableOnSinkNode, batchSize);
         this.cloudClusterId = clusterId;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 21686e52383..8b030f04aa2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -87,6 +87,7 @@ public class BrokerLoadJob extends BulkLoadJob {
     protected boolean enableProfile = false;
 
     private boolean enableMemTableOnSinkNode = false;
+    private int batchSize = 0;
 
     // for log replay and unit test
     public BrokerLoadJob() {
@@ -105,6 +106,7 @@ public class BrokerLoadJob extends BulkLoadJob {
         if (ConnectContext.get() != null) {
             enableProfile = 
ConnectContext.get().getSessionVariable().enableProfile();
             enableMemTableOnSinkNode = 
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+            batchSize = 
ConnectContext.get().getSessionVariable().brokerLoadBatchSize;
         }
     }
 
@@ -216,14 +218,14 @@ public class BrokerLoadJob extends BulkLoadJob {
     }
 
     protected LoadLoadingTask createTask(Database db, OlapTable table, 
List<BrokerFileGroup> brokerFileGroups,
-            boolean isEnableMemtableOnSinkNode, FileGroupAggKey aggKey, 
BrokerPendingTaskAttachment attachment)
-            throws UserException {
+            boolean isEnableMemtableOnSinkNode, int batchSize, FileGroupAggKey 
aggKey,
+            BrokerPendingTaskAttachment attachment) throws UserException {
         LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
                 brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
                 isStrictMode(), isPartialUpdate(), transactionId, this, 
getTimeZone(), getTimeout(),
                 getLoadParallelism(), getSendBatchParallelism(),
                 getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, 
isSingleTabletLoadPerSink(),
-                useNewLoadScanNode(), getPriority(), 
isEnableMemtableOnSinkNode);
+                useNewLoadScanNode(), getPriority(), 
isEnableMemtableOnSinkNode, batchSize);
 
         UUID uuid = UUID.randomUUID();
         TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
@@ -254,11 +256,11 @@ public class BrokerLoadJob extends BulkLoadJob {
                 List<BrokerFileGroup> brokerFileGroups = entry.getValue();
                 long tableId = aggKey.getTableId();
                 OlapTable table = (OlapTable) db.getTableNullable(tableId);
-                boolean isEnableMemtableOnSinkNode = ((OlapTable) 
table).getTableProperty().getUseSchemaLightChange()
-                        ? this.enableMemTableOnSinkNode : false;
+                boolean isEnableMemtableOnSinkNode =
+                        table.getTableProperty().getUseSchemaLightChange() && 
this.enableMemTableOnSinkNode;
                 // Generate loading task and init the plan of task
                 LoadLoadingTask task = createTask(db, table, brokerFileGroups,
-                        isEnableMemtableOnSinkNode, aggKey, attachment);
+                        isEnableMemtableOnSinkNode, batchSize, aggKey, 
attachment);
                 idToTasks.put(task.getSignature(), task);
                 // idToTasks contains previous LoadPendingTasks, so idToTasks 
is just used to save all tasks.
                 // use newLoadingTasks to save new created loading tasks and 
submit them later.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 1475b54c136..df66dfdd232 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -76,6 +76,7 @@ public class LoadLoadingTask extends LoadTask {
     private final boolean useNewLoadScanNode;
 
     private final boolean enableMemTableOnSinkNode;
+    private final int batchSize;
 
     private LoadingTaskPlanner planner;
 
@@ -90,7 +91,7 @@ public class LoadLoadingTask extends LoadTask {
             long txnId, LoadTaskCallback callback, String timezone,
             long timeoutS, int loadParallelism, int sendBatchParallelism,
             boolean loadZeroTolerance, Profile jobProfile, boolean 
singleTabletLoadPerSink,
-            boolean useNewLoadScanNode, Priority priority, boolean 
enableMemTableOnSinkNode) {
+            boolean useNewLoadScanNode, Priority priority, boolean 
enableMemTableOnSinkNode, int batchSize) {
         super(callback, TaskType.LOADING, priority);
         this.db = db;
         this.table = table;
@@ -112,6 +113,7 @@ public class LoadLoadingTask extends LoadTask {
         this.singleTabletLoadPerSink = singleTabletLoadPerSink;
         this.useNewLoadScanNode = useNewLoadScanNode;
         this.enableMemTableOnSinkNode = enableMemTableOnSinkNode;
+        this.batchSize = batchSize;
     }
 
     public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> 
fileStatusList,
@@ -163,6 +165,7 @@ public class LoadLoadingTask extends LoadTask {
          */
         curCoordinator.setLoadMemLimit(execMemLimit);
         curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode);
+        curCoordinator.setBatchSize(batchSize);
 
         long leftTimeMs = getLeftTimeMs();
         if (leftTimeMs <= 0) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e845503fd73..8459cb3b68e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2474,6 +2474,10 @@ public class Coordinator implements CoordInterface {
         
this.queryOptions.setEnableMemtableOnSinkNode(enableMemTableOnSinkNode);
     }
 
+    public void setBatchSize(int batchSize) {
+        this.queryOptions.setBatchSize(batchSize);
+    }
+
     // map from a BE host address to the per-node assigned scan ranges;
     // records scan range assignment for a single fragment
     static class FragmentScanRangeAssignment
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index eb12d86a6bd..dfce3dfabae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -128,6 +128,7 @@ public class SessionVariable implements Serializable, 
Writable {
     // mem limit can't smaller than bufferpool's default page size
     public static final int MIN_EXEC_MEM_LIMIT = 2097152;
     public static final String BATCH_SIZE = "batch_size";
+    public static final String BROKER_LOAD_BATCH_SIZE = 
"broker_load_batch_size";
     public static final String DISABLE_STREAMING_PREAGGREGATIONS = 
"disable_streaming_preaggregations";
     public static final String ENABLE_DISTINCT_STREAMING_AGGREGATION = 
"enable_distinct_streaming_aggregation";
     public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
@@ -848,6 +849,10 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = BATCH_SIZE, fuzzy = true, checker = 
"checkBatchSize")
     public int batchSize = 4064;
 
+    // 16352 + 16 + 16 = 16384
+    @VariableMgr.VarAttr(name = BROKER_LOAD_BATCH_SIZE, fuzzy = true, checker 
= "checkBatchSize")
+    public int brokerLoadBatchSize = 16352;
+
     @VariableMgr.VarAttr(name = DISABLE_STREAMING_PREAGGREGATIONS, fuzzy = 
true)
     public boolean disableStreamPreaggregations = false;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to