This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9841ac5f1fa [fix](move-memtable) ignore single replica load when move
memtable (#32845)
9841ac5f1fa is described below
commit 9841ac5f1faa67d09355b87866a247441de9c720
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Apr 17 10:52:11 2024 +0800
[fix](move-memtable) ignore single replica load when move memtable (#32845)
Co-authored-by: Xin Liao <[email protected]>
---
.../apache/doris/analysis/NativeInsertStmt.java | 7 ++++--
.../apache/doris/load/loadv2/LoadLoadingTask.java | 2 +-
.../doris/load/loadv2/LoadingTaskPlanner.java | 10 +++++---
.../apache/doris/planner/StreamLoadPlanner.java | 28 ++++++++++++----------
.../java/org/apache/doris/qe/SessionVariable.java | 8 +++++++
5 files changed, 37 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 0004822d146..3cfc3341ae0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -1023,13 +1023,16 @@ public class NativeInsertStmt extends InsertStmt {
}
if (targetTable instanceof OlapTable) {
OlapTableSink sink;
+ final boolean enableSingleReplicaLoad =
+
analyzer.getContext().getSessionVariable().isEnableMemtableOnSinkNode()
+ ? false :
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert();
if (isGroupCommitStreamLoadSql) {
sink = new GroupCommitBlockSink((OlapTable) targetTable,
olapTuple,
- targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(),
+ targetPartitionIds, enableSingleReplicaLoad,
ConnectContext.get().getSessionVariable().getGroupCommit(), 0);
} else {
sink = new OlapTableSink((OlapTable) targetTable, olapTuple,
targetPartitionIds,
-
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
+ enableSingleReplicaLoad);
}
dataSink = sink;
sink.setPartialUpdateInputColumns(isPartialUpdate,
partialUpdateCols);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 28769741874..4bd396fad1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -118,7 +118,7 @@ public class LoadLoadingTask extends LoadTask {
this.loadId = loadId;
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId,
db.getId(), table, brokerDesc, fileGroups,
strictMode, isPartialUpdate, timezone, this.timeoutS,
this.loadParallelism, this.sendBatchParallelism,
- this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink);
+ this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink,
enableMemTableOnSinkNode);
planner.plan(loadId, fileStatusList, fileNum);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 2cb61141b96..f773b9b7430 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -73,6 +73,7 @@ public class LoadingTaskPlanner {
private final int sendBatchParallelism;
private final boolean useNewLoadScanNode;
private final boolean singleTabletLoadPerSink;
+ private final boolean enableMemtableOnSinkNode;
private UserIdentity userInfo;
// Something useful
// ConnectContext here is just a dummy object to avoid some NPE problem,
like ctx.getDatabase()
@@ -89,7 +90,7 @@ public class LoadingTaskPlanner {
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
boolean strictMode, boolean isPartialUpdate, String timezone, long
timeoutS, int loadParallelism,
int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity
userInfo,
- boolean singleTabletLoadPerSink) {
+ boolean singleTabletLoadPerSink, boolean enableMemtableOnSinkNode)
{
this.loadJobId = loadJobId;
this.txnId = txnId;
this.dbId = dbId;
@@ -103,8 +104,9 @@ public class LoadingTaskPlanner {
this.loadParallelism = loadParallelism;
this.sendBatchParallelism = sendBatchParallelism;
this.useNewLoadScanNode = useNewLoadScanNode;
- this.singleTabletLoadPerSink = singleTabletLoadPerSink;
this.userInfo = userInfo;
+ this.singleTabletLoadPerSink = singleTabletLoadPerSink;
+ this.enableMemtableOnSinkNode = enableMemtableOnSinkNode;
if (Env.getCurrentEnv().getAccessManager()
.checkDbPriv(userInfo, InternalCatalog.INTERNAL_CATALOG_NAME,
Env.getCurrentInternalCatalog().getDbNullable(dbId).getFullName(),
@@ -206,8 +208,10 @@ public class LoadingTaskPlanner {
// 2. Olap table sink
List<Long> partitionIds = getAllPartitionIds();
+ final boolean enableSingleReplicaLoad = this.enableMemtableOnSinkNode
+ ? false : Config.enable_single_replica_load;
OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc,
partitionIds,
- Config.enable_single_replica_load);
+ enableSingleReplicaLoad);
long txnTimeout = timeoutS == 0 ?
ConnectContext.get().getExecTimeout() : timeoutS;
olapTableSink.init(loadId, txnId, dbId, timeoutS,
sendBatchParallelism, singleTabletLoadPerSink, strictMode,
txnTimeout);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index a7a57aa8900..ac93d18243e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -260,15 +260,20 @@ public class StreamLoadPlanner {
timeout *= 2;
}
+ final boolean enableMemtableOnSinkNode =
+ destTable.getTableProperty().getUseSchemaLightChange()
+ ? taskInfo.isMemtableOnSinkNode() : false;
+ final boolean enableSingleReplicaLoad = enableMemtableOnSinkNode
+ ? false : Config.enable_single_replica_load;
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink;
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask)
taskInfo).getGroupCommit() != null) {
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc,
partitionIds,
- Config.enable_single_replica_load, ((StreamLoadTask)
taskInfo).getGroupCommit(),
+ enableSingleReplicaLoad, ((StreamLoadTask)
taskInfo).getGroupCommit(),
taskInfo.getMaxFilterRatio());
} else {
- olapTableSink = new OlapTableSink(destTable, tupleDesc,
partitionIds, Config.enable_single_replica_load);
+ olapTableSink = new OlapTableSink(destTable, tupleDesc,
partitionIds, enableSingleReplicaLoad);
}
int txnTimeout = timeout == 0 ? ConnectContext.get().getExecTimeout()
: timeout;
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(),
@@ -324,10 +329,7 @@ public class StreamLoadPlanner {
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
- boolean isEnableMemtableOnSinkNode =
- destTable.getTableProperty().getUseSchemaLightChange()
- ? taskInfo.isMemtableOnSinkNode() : false;
- queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+ queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);
params.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
queryGlobals.setNowString(TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now()));
@@ -489,15 +491,20 @@ public class StreamLoadPlanner {
timeout *= 2;
}
+ final boolean enableMemtableOnSinkNode =
+ destTable.getTableProperty().getUseSchemaLightChange()
+ ? taskInfo.isMemtableOnSinkNode() : false;
+ final boolean enableSingleReplicaLoad = enableMemtableOnSinkNode
+ ? false : Config.enable_single_replica_load;
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink;
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask)
taskInfo).getGroupCommit() != null) {
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc,
partitionIds,
- Config.enable_single_replica_load, ((StreamLoadTask)
taskInfo).getGroupCommit(),
+ enableSingleReplicaLoad, ((StreamLoadTask)
taskInfo).getGroupCommit(),
taskInfo.getMaxFilterRatio());
} else {
- olapTableSink = new OlapTableSink(destTable, tupleDesc,
partitionIds, Config.enable_single_replica_load);
+ olapTableSink = new OlapTableSink(destTable, tupleDesc,
partitionIds, enableSingleReplicaLoad);
}
int txnTimeout = timeout == 0 ? ConnectContext.get().getExecTimeout()
: timeout;
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
@@ -556,10 +563,7 @@ public class StreamLoadPlanner {
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
- boolean isEnableMemtableOnSinkNode =
- destTable.getTableProperty().getUseSchemaLightChange()
- ? taskInfo.isMemtableOnSinkNode() : false;
- queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+ queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);
pipParams.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index eb1d511f45e..4ab10095866 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -2893,6 +2893,14 @@ public class SessionVariable implements Serializable,
Writable {
this.enableSingleReplicaInsert = enableSingleReplicaInsert;
}
+ public boolean isEnableMemtableOnSinkNode() {
+ return enableMemtableOnSinkNode;
+ }
+
+ public void setEnableMemtableOnSinkNode(boolean enableMemtableOnSinkNode) {
+ this.enableMemtableOnSinkNode = enableMemtableOnSinkNode;
+ }
+
public boolean isEnableRuntimeFilterPrune() {
return enableRuntimeFilterPrune;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]