This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5367f7ed8b1 [performance](broker-load) increase default broker load
batch size (#36477)
5367f7ed8b1 is described below
commit 5367f7ed8b15f91e63ee5c2ff98de82ec6450742
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jul 11 17:18:41 2024 +0800
[performance](broker-load) increase default broker load batch size (#36477)
---
.../org/apache/doris/cloud/load/CloudBrokerLoadJob.java | 6 +++---
.../org/apache/doris/cloud/load/CloudLoadLoadingTask.java | 5 +++--
.../java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 14 ++++++++------
.../java/org/apache/doris/load/loadv2/LoadLoadingTask.java | 5 ++++-
.../src/main/java/org/apache/doris/qe/Coordinator.java | 4 ++++
.../src/main/java/org/apache/doris/qe/SessionVariable.java | 5 +++++
6 files changed, 27 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
index 04560df0fd8..0c8e2082716 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
@@ -137,15 +137,15 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
@Override
protected LoadLoadingTask createTask(Database db, OlapTable table,
List<BrokerFileGroup> brokerFileGroups,
- boolean isEnableMemtableOnSinkNode, FileGroupAggKey aggKey,
BrokerPendingTaskAttachment attachment)
- throws UserException {
+ boolean isEnableMemtableOnSinkNode, int batchSize, FileGroupAggKey
aggKey,
+ BrokerPendingTaskAttachment attachment) throws UserException {
cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID);
LoadLoadingTask task = new CloudLoadLoadingTask(db, table, brokerDesc,
brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
isStrictMode(), isPartialUpdate(), transactionId, this,
getTimeZone(), getTimeout(),
getLoadParallelism(), getSendBatchParallelism(),
getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null,
isSingleTabletLoadPerSink(),
- useNewLoadScanNode(), getPriority(),
isEnableMemtableOnSinkNode, cloudClusterId);
+ useNewLoadScanNode(), getPriority(),
isEnableMemtableOnSinkNode, batchSize, cloudClusterId);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java
index 912de173ecc..2316a072fa8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java
@@ -47,10 +47,11 @@ public class CloudLoadLoadingTask extends LoadLoadingTask {
long txnId, LoadTaskCallback callback, String timezone,
long timeoutS, int loadParallelism, int sendBatchParallelism,
boolean loadZeroTolerance, Profile jobProfile, boolean
singleTabletLoadPerSink,
- boolean useNewLoadScanNode, Priority priority, boolean
enableMemTableOnSinkNode, String clusterId) {
+ boolean useNewLoadScanNode, Priority priority, boolean
enableMemTableOnSinkNode, int batchSize,
+ String clusterId) {
super(db, table, brokerDesc, fileGroups, jobDeadlineMs, execMemLimit,
strictMode, isPartialUpdate,
txnId, callback, timezone, timeoutS, loadParallelism,
sendBatchParallelism, loadZeroTolerance,
- jobProfile, singleTabletLoadPerSink, useNewLoadScanNode,
priority, enableMemTableOnSinkNode);
+ jobProfile, singleTabletLoadPerSink, useNewLoadScanNode,
priority, enableMemTableOnSinkNode, batchSize);
this.cloudClusterId = clusterId;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 21686e52383..8b030f04aa2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -87,6 +87,7 @@ public class BrokerLoadJob extends BulkLoadJob {
protected boolean enableProfile = false;
private boolean enableMemTableOnSinkNode = false;
+ private int batchSize = 0;
// for log replay and unit test
public BrokerLoadJob() {
@@ -105,6 +106,7 @@ public class BrokerLoadJob extends BulkLoadJob {
if (ConnectContext.get() != null) {
enableProfile =
ConnectContext.get().getSessionVariable().enableProfile();
enableMemTableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+ batchSize =
ConnectContext.get().getSessionVariable().brokerLoadBatchSize;
}
}
@@ -216,14 +218,14 @@ public class BrokerLoadJob extends BulkLoadJob {
}
protected LoadLoadingTask createTask(Database db, OlapTable table,
List<BrokerFileGroup> brokerFileGroups,
- boolean isEnableMemtableOnSinkNode, FileGroupAggKey aggKey,
BrokerPendingTaskAttachment attachment)
- throws UserException {
+ boolean isEnableMemtableOnSinkNode, int batchSize, FileGroupAggKey
aggKey,
+ BrokerPendingTaskAttachment attachment) throws UserException {
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
isStrictMode(), isPartialUpdate(), transactionId, this,
getTimeZone(), getTimeout(),
getLoadParallelism(), getSendBatchParallelism(),
getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null,
isSingleTabletLoadPerSink(),
- useNewLoadScanNode(), getPriority(),
isEnableMemtableOnSinkNode);
+ useNewLoadScanNode(), getPriority(),
isEnableMemtableOnSinkNode, batchSize);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
@@ -254,11 +256,11 @@ public class BrokerLoadJob extends BulkLoadJob {
List<BrokerFileGroup> brokerFileGroups = entry.getValue();
long tableId = aggKey.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
- boolean isEnableMemtableOnSinkNode = ((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
- ? this.enableMemTableOnSinkNode : false;
+ boolean isEnableMemtableOnSinkNode =
+ table.getTableProperty().getUseSchemaLightChange() &&
this.enableMemTableOnSinkNode;
// Generate loading task and init the plan of task
LoadLoadingTask task = createTask(db, table, brokerFileGroups,
- isEnableMemtableOnSinkNode, aggKey, attachment);
+ isEnableMemtableOnSinkNode, batchSize, aggKey,
attachment);
idToTasks.put(task.getSignature(), task);
// idToTasks contains previous LoadPendingTasks, so idToTasks
is just used to save all tasks.
// use newLoadingTasks to save new created loading tasks and
submit them later.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 1475b54c136..df66dfdd232 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -76,6 +76,7 @@ public class LoadLoadingTask extends LoadTask {
private final boolean useNewLoadScanNode;
private final boolean enableMemTableOnSinkNode;
+ private final int batchSize;
private LoadingTaskPlanner planner;
@@ -90,7 +91,7 @@ public class LoadLoadingTask extends LoadTask {
long txnId, LoadTaskCallback callback, String timezone,
long timeoutS, int loadParallelism, int sendBatchParallelism,
boolean loadZeroTolerance, Profile jobProfile, boolean
singleTabletLoadPerSink,
- boolean useNewLoadScanNode, Priority priority, boolean
enableMemTableOnSinkNode) {
+ boolean useNewLoadScanNode, Priority priority, boolean
enableMemTableOnSinkNode, int batchSize) {
super(callback, TaskType.LOADING, priority);
this.db = db;
this.table = table;
@@ -112,6 +113,7 @@ public class LoadLoadingTask extends LoadTask {
this.singleTabletLoadPerSink = singleTabletLoadPerSink;
this.useNewLoadScanNode = useNewLoadScanNode;
this.enableMemTableOnSinkNode = enableMemTableOnSinkNode;
+ this.batchSize = batchSize;
}
public void init(TUniqueId loadId, List<List<TBrokerFileStatus>>
fileStatusList,
@@ -163,6 +165,7 @@ public class LoadLoadingTask extends LoadTask {
*/
curCoordinator.setLoadMemLimit(execMemLimit);
curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode);
+ curCoordinator.setBatchSize(batchSize);
long leftTimeMs = getLeftTimeMs();
if (leftTimeMs <= 0) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e845503fd73..8459cb3b68e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2474,6 +2474,10 @@ public class Coordinator implements CoordInterface {
this.queryOptions.setEnableMemtableOnSinkNode(enableMemTableOnSinkNode);
}
+ public void setBatchSize(int batchSize) {
+ this.queryOptions.setBatchSize(batchSize);
+ }
+
// map from a BE host address to the per-node assigned scan ranges;
// records scan range assignment for a single fragment
static class FragmentScanRangeAssignment
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index eb12d86a6bd..dfce3dfabae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -128,6 +128,7 @@ public class SessionVariable implements Serializable,
Writable {
// mem limit can't smaller than bufferpool's default page size
public static final int MIN_EXEC_MEM_LIMIT = 2097152;
public static final String BATCH_SIZE = "batch_size";
+ public static final String BROKER_LOAD_BATCH_SIZE =
"broker_load_batch_size";
public static final String DISABLE_STREAMING_PREAGGREGATIONS =
"disable_streaming_preaggregations";
public static final String ENABLE_DISTINCT_STREAMING_AGGREGATION =
"enable_distinct_streaming_aggregation";
public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
@@ -848,6 +849,10 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = BATCH_SIZE, fuzzy = true, checker =
"checkBatchSize")
public int batchSize = 4064;
+ // 16352 + 16 + 16 = 16384
+ @VariableMgr.VarAttr(name = BROKER_LOAD_BATCH_SIZE, fuzzy = true, checker
= "checkBatchSize")
+ public int brokerLoadBatchSize = 16352;
+
@VariableMgr.VarAttr(name = DISABLE_STREAMING_PREAGGREGATIONS, fuzzy =
true)
public boolean disableStreamPreaggregations = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]