This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9841ac5f1fa [fix](move-memtable) ignore single replica load when move 
memtable (#32845)
9841ac5f1fa is described below

commit 9841ac5f1faa67d09355b87866a247441de9c720
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Apr 17 10:52:11 2024 +0800

    [fix](move-memtable) ignore single replica load when move memtable (#32845)
    
    
    Co-authored-by: Xin Liao <[email protected]>
---
 .../apache/doris/analysis/NativeInsertStmt.java    |  7 ++++--
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |  2 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      | 10 +++++---
 .../apache/doris/planner/StreamLoadPlanner.java    | 28 ++++++++++++----------
 .../java/org/apache/doris/qe/SessionVariable.java  |  8 +++++++
 5 files changed, 37 insertions(+), 18 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 0004822d146..3cfc3341ae0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -1023,13 +1023,16 @@ public class NativeInsertStmt extends InsertStmt {
         }
         if (targetTable instanceof OlapTable) {
             OlapTableSink sink;
+            final boolean enableSingleReplicaLoad =
+                    
analyzer.getContext().getSessionVariable().isEnableMemtableOnSinkNode()
+                    ? false : 
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert();
             if (isGroupCommitStreamLoadSql) {
                 sink = new GroupCommitBlockSink((OlapTable) targetTable, 
olapTuple,
-                        targetPartitionIds, 
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(),
+                        targetPartitionIds, enableSingleReplicaLoad,
                         
ConnectContext.get().getSessionVariable().getGroupCommit(), 0);
             } else {
                 sink = new OlapTableSink((OlapTable) targetTable, olapTuple, 
targetPartitionIds,
-                        
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
+                        enableSingleReplicaLoad);
             }
             dataSink = sink;
             sink.setPartialUpdateInputColumns(isPartialUpdate, 
partialUpdateCols);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 28769741874..4bd396fad1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -118,7 +118,7 @@ public class LoadLoadingTask extends LoadTask {
         this.loadId = loadId;
         planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, 
db.getId(), table, brokerDesc, fileGroups,
                 strictMode, isPartialUpdate, timezone, this.timeoutS, 
this.loadParallelism, this.sendBatchParallelism,
-                this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink);
+                this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink, 
enableMemTableOnSinkNode);
         planner.plan(loadId, fileStatusList, fileNum);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 2cb61141b96..f773b9b7430 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -73,6 +73,7 @@ public class LoadingTaskPlanner {
     private final int sendBatchParallelism;
     private final boolean useNewLoadScanNode;
     private final boolean singleTabletLoadPerSink;
+    private final boolean enableMemtableOnSinkNode;
     private UserIdentity userInfo;
     // Something useful
     // ConnectContext here is just a dummy object to avoid some NPE problem, 
like ctx.getDatabase()
@@ -89,7 +90,7 @@ public class LoadingTaskPlanner {
             BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
             boolean strictMode, boolean isPartialUpdate, String timezone, long 
timeoutS, int loadParallelism,
             int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity 
userInfo,
-            boolean singleTabletLoadPerSink) {
+            boolean singleTabletLoadPerSink, boolean enableMemtableOnSinkNode) 
{
         this.loadJobId = loadJobId;
         this.txnId = txnId;
         this.dbId = dbId;
@@ -103,8 +104,9 @@ public class LoadingTaskPlanner {
         this.loadParallelism = loadParallelism;
         this.sendBatchParallelism = sendBatchParallelism;
         this.useNewLoadScanNode = useNewLoadScanNode;
-        this.singleTabletLoadPerSink = singleTabletLoadPerSink;
         this.userInfo = userInfo;
+        this.singleTabletLoadPerSink = singleTabletLoadPerSink;
+        this.enableMemtableOnSinkNode = enableMemtableOnSinkNode;
         if (Env.getCurrentEnv().getAccessManager()
                 .checkDbPriv(userInfo, InternalCatalog.INTERNAL_CATALOG_NAME,
                         
Env.getCurrentInternalCatalog().getDbNullable(dbId).getFullName(),
@@ -206,8 +208,10 @@ public class LoadingTaskPlanner {
 
         // 2. Olap table sink
         List<Long> partitionIds = getAllPartitionIds();
+        final boolean enableSingleReplicaLoad = this.enableMemtableOnSinkNode
+                ? false : Config.enable_single_replica_load;
         OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, 
partitionIds,
-                Config.enable_single_replica_load);
+                enableSingleReplicaLoad);
         long txnTimeout = timeoutS == 0 ? 
ConnectContext.get().getExecTimeout() : timeoutS;
         olapTableSink.init(loadId, txnId, dbId, timeoutS, 
sendBatchParallelism, singleTabletLoadPerSink, strictMode,
                 txnTimeout);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index a7a57aa8900..ac93d18243e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -260,15 +260,20 @@ public class StreamLoadPlanner {
             timeout *= 2;
         }
 
+        final boolean enableMemtableOnSinkNode =
+                destTable.getTableProperty().getUseSchemaLightChange()
+                ? taskInfo.isMemtableOnSinkNode() : false;
+        final boolean enableSingleReplicaLoad = enableMemtableOnSinkNode
+                ? false : Config.enable_single_replica_load;
         // create dest sink
         List<Long> partitionIds = getAllPartitionIds();
         OlapTableSink olapTableSink;
         if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) 
taskInfo).getGroupCommit() != null) {
             olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, 
partitionIds,
-                    Config.enable_single_replica_load, ((StreamLoadTask) 
taskInfo).getGroupCommit(),
+                    enableSingleReplicaLoad, ((StreamLoadTask) 
taskInfo).getGroupCommit(),
                     taskInfo.getMaxFilterRatio());
         } else {
-            olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds, Config.enable_single_replica_load);
+            olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds, enableSingleReplicaLoad);
         }
         int txnTimeout = timeout == 0 ? ConnectContext.get().getExecTimeout() 
: timeout;
         olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, 
taskInfo.getSendBatchParallelism(),
@@ -324,10 +329,7 @@ public class StreamLoadPlanner {
         queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
         queryOptions.setBeExecVersion(Config.be_exec_version);
         queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
-        boolean isEnableMemtableOnSinkNode =
-                destTable.getTableProperty().getUseSchemaLightChange()
-                ? taskInfo.isMemtableOnSinkNode() : false;
-        queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+        queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);
         params.setQueryOptions(queryOptions);
         TQueryGlobals queryGlobals = new TQueryGlobals();
         
queryGlobals.setNowString(TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now()));
@@ -489,15 +491,20 @@ public class StreamLoadPlanner {
             timeout *= 2;
         }
 
+        final boolean enableMemtableOnSinkNode =
+                destTable.getTableProperty().getUseSchemaLightChange()
+                ? taskInfo.isMemtableOnSinkNode() : false;
+        final boolean enableSingleReplicaLoad = enableMemtableOnSinkNode
+                ? false : Config.enable_single_replica_load;
         // create dest sink
         List<Long> partitionIds = getAllPartitionIds();
         OlapTableSink olapTableSink;
         if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) 
taskInfo).getGroupCommit() != null) {
             olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, 
partitionIds,
-                    Config.enable_single_replica_load, ((StreamLoadTask) 
taskInfo).getGroupCommit(),
+                    enableSingleReplicaLoad, ((StreamLoadTask) 
taskInfo).getGroupCommit(),
                     taskInfo.getMaxFilterRatio());
         } else {
-            olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds, Config.enable_single_replica_load);
+            olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds, enableSingleReplicaLoad);
         }
         int txnTimeout = timeout == 0 ? ConnectContext.get().getExecTimeout() 
: timeout;
         olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
@@ -556,10 +563,7 @@ public class StreamLoadPlanner {
         queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
         queryOptions.setBeExecVersion(Config.be_exec_version);
         queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
-        boolean isEnableMemtableOnSinkNode =
-                destTable.getTableProperty().getUseSchemaLightChange()
-                ? taskInfo.isMemtableOnSinkNode() : false;
-        queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+        queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);
 
         pipParams.setQueryOptions(queryOptions);
         TQueryGlobals queryGlobals = new TQueryGlobals();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index eb1d511f45e..4ab10095866 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -2893,6 +2893,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableSingleReplicaInsert = enableSingleReplicaInsert;
     }
 
+    public boolean isEnableMemtableOnSinkNode() {
+        return enableMemtableOnSinkNode;
+    }
+
+    public void setEnableMemtableOnSinkNode(boolean enableMemtableOnSinkNode) {
+        this.enableMemtableOnSinkNode = enableMemtableOnSinkNode;
+    }
+
     public boolean isEnableRuntimeFilterPrune() {
         return enableRuntimeFilterPrune;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to