This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 966766f3b75 [enhancement](broker-load) 
fix-move-memtable-session-var-for-s3 (#28894)
966766f3b75 is described below

commit 966766f3b752dfcd66a20351830a8b75d1f7e1b2
Author: Siyang Tang <[email protected]>
AuthorDate: Fri Dec 22 23:25:06 2023 +0800

    [enhancement](broker-load) fix-move-memtable-session-var-for-s3 (#28894)
---
 .../main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java    | 9 ++++++---
 .../main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java  | 6 +++++-
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java    | 4 ++++
 3 files changed, 15 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index fd416b8e449..da89db859d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -83,6 +83,8 @@ public class BrokerLoadJob extends BulkLoadJob {
     // If set to true, the profile of load job with be pushed to ProfileManager
     private boolean enableProfile = false;
 
+    private boolean enableMemTableOnSinkNode = false;
+
     // for log replay and unit test
     public BrokerLoadJob() {
         super(EtlJobType.BROKER);
@@ -93,8 +95,9 @@ public class BrokerLoadJob extends BulkLoadJob {
             throws MetaNotFoundException {
         super(EtlJobType.BROKER, dbId, label, originStmt, userInfo);
         this.brokerDesc = brokerDesc;
-        if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().enableProfile()) {
-            enableProfile = true;
+        if (ConnectContext.get() != null) {
+            enableProfile = 
ConnectContext.get().getSessionVariable().enableProfile();
+            enableMemTableOnSinkNode = 
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
         }
     }
 
@@ -212,7 +215,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                         isStrictMode(), isPartialUpdate(), transactionId, 
this, getTimeZone(), getTimeout(),
                         getLoadParallelism(), getSendBatchParallelism(),
                         getMaxFilterRatio() <= 0, enableProfile ? jobProfile : 
null, isSingleTabletLoadPerSink(),
-                        useNewLoadScanNode(), getPriority());
+                        useNewLoadScanNode(), getPriority(), 
enableMemTableOnSinkNode);
 
                 UUID uuid = UUID.randomUUID();
                 TUniqueId loadId = new 
TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index d98ba2ba4a3..fa2eadcfa65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -72,6 +72,8 @@ public class LoadLoadingTask extends LoadTask {
     private final boolean singleTabletLoadPerSink;
     private final boolean useNewLoadScanNode;
 
+    private final boolean enableMemTableOnSinkNode;
+
     private LoadingTaskPlanner planner;
 
     private Profile jobProfile;
@@ -83,7 +85,7 @@ public class LoadLoadingTask extends LoadTask {
             long txnId, LoadTaskCallback callback, String timezone,
             long timeoutS, int loadParallelism, int sendBatchParallelism,
             boolean loadZeroTolerance, Profile jobProfile, boolean 
singleTabletLoadPerSink,
-            boolean useNewLoadScanNode, Priority priority) {
+            boolean useNewLoadScanNode, Priority priority, boolean 
enableMemTableOnSinkNode) {
         super(callback, TaskType.LOADING, priority);
         this.db = db;
         this.table = table;
@@ -104,6 +106,7 @@ public class LoadLoadingTask extends LoadTask {
         this.jobProfile = jobProfile;
         this.singleTabletLoadPerSink = singleTabletLoadPerSink;
         this.useNewLoadScanNode = useNewLoadScanNode;
+        this.enableMemTableOnSinkNode = enableMemTableOnSinkNode;
     }
 
     public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> 
fileStatusList,
@@ -152,6 +155,7 @@ public class LoadLoadingTask extends LoadTask {
          */
         curCoordinator.setLoadMemLimit(execMemLimit);
         curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000));
+        curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode);
 
         try {
             QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9564461d756..ed7f73e1735 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2676,6 +2676,10 @@ public class Coordinator implements CoordInterface {
         }
     }
 
+    public void setMemTableOnSinkNode(boolean enableMemTableOnSinkNode) {
+        
this.queryOptions.setEnableMemtableOnSinkNode(enableMemTableOnSinkNode);
+    }
+
     // map from a BE host address to the per-node assigned scan ranges;
     // records scan range assignment for a single fragment
     static class FragmentScanRangeAssignment


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to