This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 966766f3b75 [enhancement](broker-load)
fix-move-memtable-session-var-for-s3 (#28894)
966766f3b75 is described below
commit 966766f3b752dfcd66a20351830a8b75d1f7e1b2
Author: Siyang Tang <[email protected]>
AuthorDate: Fri Dec 22 23:25:06 2023 +0800
[enhancement](broker-load) fix-move-memtable-session-var-for-s3 (#28894)
---
.../main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 9 ++++++---
.../main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java | 6 +++++-
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 4 ++++
3 files changed, 15 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index fd416b8e449..da89db859d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -83,6 +83,8 @@ public class BrokerLoadJob extends BulkLoadJob {
// If set to true, the profile of load job with be pushed to ProfileManager
private boolean enableProfile = false;
+ private boolean enableMemTableOnSinkNode = false;
+
// for log replay and unit test
public BrokerLoadJob() {
super(EtlJobType.BROKER);
@@ -93,8 +95,9 @@ public class BrokerLoadJob extends BulkLoadJob {
throws MetaNotFoundException {
super(EtlJobType.BROKER, dbId, label, originStmt, userInfo);
this.brokerDesc = brokerDesc;
- if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().enableProfile()) {
- enableProfile = true;
+ if (ConnectContext.get() != null) {
+ enableProfile =
ConnectContext.get().getSessionVariable().enableProfile();
+ enableMemTableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
}
}
@@ -212,7 +215,7 @@ public class BrokerLoadJob extends BulkLoadJob {
isStrictMode(), isPartialUpdate(), transactionId,
this, getTimeZone(), getTimeout(),
getLoadParallelism(), getSendBatchParallelism(),
getMaxFilterRatio() <= 0, enableProfile ? jobProfile :
null, isSingleTabletLoadPerSink(),
- useNewLoadScanNode(), getPriority());
+ useNewLoadScanNode(), getPriority(),
enableMemTableOnSinkNode);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new
TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index d98ba2ba4a3..fa2eadcfa65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -72,6 +72,8 @@ public class LoadLoadingTask extends LoadTask {
private final boolean singleTabletLoadPerSink;
private final boolean useNewLoadScanNode;
+ private final boolean enableMemTableOnSinkNode;
+
private LoadingTaskPlanner planner;
private Profile jobProfile;
@@ -83,7 +85,7 @@ public class LoadLoadingTask extends LoadTask {
long txnId, LoadTaskCallback callback, String timezone,
long timeoutS, int loadParallelism, int sendBatchParallelism,
boolean loadZeroTolerance, Profile jobProfile, boolean
singleTabletLoadPerSink,
- boolean useNewLoadScanNode, Priority priority) {
+ boolean useNewLoadScanNode, Priority priority, boolean
enableMemTableOnSinkNode) {
super(callback, TaskType.LOADING, priority);
this.db = db;
this.table = table;
@@ -104,6 +106,7 @@ public class LoadLoadingTask extends LoadTask {
this.jobProfile = jobProfile;
this.singleTabletLoadPerSink = singleTabletLoadPerSink;
this.useNewLoadScanNode = useNewLoadScanNode;
+ this.enableMemTableOnSinkNode = enableMemTableOnSinkNode;
}
public void init(TUniqueId loadId, List<List<TBrokerFileStatus>>
fileStatusList,
@@ -152,6 +155,7 @@ public class LoadLoadingTask extends LoadTask {
*/
curCoordinator.setLoadMemLimit(execMemLimit);
curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000));
+ curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode);
try {
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9564461d756..ed7f73e1735 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2676,6 +2676,10 @@ public class Coordinator implements CoordInterface {
}
}
+ public void setMemTableOnSinkNode(boolean enableMemTableOnSinkNode) {
+
this.queryOptions.setEnableMemtableOnSinkNode(enableMemTableOnSinkNode);
+ }
+
// map from a BE host address to the per-node assigned scan ranges;
// records scan range assignment for a single fragment
static class FragmentScanRangeAssignment
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]