This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9aafcb18bd9 [fix](move-memtable) disable move memtable when light
schema change is false (#29362)
9aafcb18bd9 is described below
commit 9aafcb18bd950022e5810a480def5eb832d1cb96
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Jan 4 23:03:35 2024 +0800
[fix](move-memtable) disable move memtable when light schema change is
false (#29362)
---
.../apache/doris/load/loadv2/BrokerLoadJob.java | 4 +-
.../plans/commands/InsertIntoTableCommand.java | 7 +-
.../apache/doris/planner/StreamLoadPlanner.java | 11 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 4 +
.../java/org/apache/doris/qe/StmtExecutor.java | 8 +
.../test_disable_move_memtable.groovy | 324 +++++++++++++++++++++
6 files changed, 353 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index da89db859d5..0cbb4e0cfb5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -209,13 +209,15 @@ public class BrokerLoadJob extends BulkLoadJob {
List<BrokerFileGroup> brokerFileGroups = entry.getValue();
long tableId = aggKey.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
+ boolean isEnableMemtableOnSinkNode = ((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
+ ? this.enableMemTableOnSinkNode : false;
// Generate loading task and init the plan of task
LoadLoadingTask task = new LoadLoadingTask(db, table,
brokerDesc,
brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
isStrictMode(), isPartialUpdate(), transactionId,
this, getTimeZone(), getTimeout(),
getLoadParallelism(), getSendBatchParallelism(),
getMaxFilterRatio() <= 0, enableProfile ? jobProfile :
null, isSingleTabletLoadPerSink(),
- useNewLoadScanNode(), getPriority(),
enableMemTableOnSinkNode);
+ useNewLoadScanNode(), getPriority(),
isEnableMemtableOnSinkNode);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new
TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index 0427fea005a..d50ee2096ca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -136,6 +136,7 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
PhysicalOlapTableSink<?> physicalOlapTableSink;
DataSink sink;
InsertExecutor insertExecutor;
+ Table targetTable;
TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery,
ctx);
// should lock target table until we begin transaction.
targetTableIf.readLock();
@@ -159,7 +160,7 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
Preconditions.checkArgument(plan.isPresent(), "insert into command
must contain OlapTableSinkNode");
physicalOlapTableSink = plan.get();
- Table targetTable = physicalOlapTableSink.getTargetTable();
+ targetTable = physicalOlapTableSink.getTargetTable();
// check auth
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(),
targetTable.getQualifiedDbName(), targetTable.getName(),
@@ -187,6 +188,10 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
targetTableIf.readUnlock();
}
+ boolean isEnableMemtableOnSinkNode =
+ ((OlapTable)
targetTable).getTableProperty().getUseSchemaLightChange()
+ ?
insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode()
: false;
+
insertExecutor.getCoordinator().getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
executor.setProfileType(ProfileType.LOAD);
// We exposed @StmtExecutor#cancel as a unified entry point for
statement interruption
// so we need to set this here
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index bf3261d3944..ca4e3de9f58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -319,8 +319,10 @@ public class StreamLoadPlanner {
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
-
queryOptions.setEnableMemtableOnSinkNode(taskInfo.isMemtableOnSinkNode());
-
+ boolean isEnableMemtableOnSinkNode =
+ destTable.getTableProperty().getUseSchemaLightChange()
+ ? taskInfo.isMemtableOnSinkNode() : false;
+ queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
params.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
queryGlobals.setNowString(TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now()));
@@ -542,7 +544,10 @@ public class StreamLoadPlanner {
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
-
queryOptions.setEnableMemtableOnSinkNode(taskInfo.isMemtableOnSinkNode());
+ boolean isEnableMemtableOnSinkNode =
+ destTable.getTableProperty().getUseSchemaLightChange()
+ ? taskInfo.isMemtableOnSinkNode() : false;
+ queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
pipParams.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9bfca2802eb..d84b7038410 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -408,6 +408,10 @@ public class Coordinator implements CoordInterface {
return scanRangeNum;
}
+ public TQueryOptions getQueryOptions() {
+ return this.queryOptions;
+ }
+
public void setQueryId(TUniqueId queryId) {
this.queryId = queryId;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index a617d742f23..59909e59b41 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -79,6 +79,7 @@ import org.apache.doris.analysis.UseStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
@@ -1996,6 +1997,13 @@ public class StmtExecutor {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
coord);
+ Table table = insertStmt.getTargetTable();
+ if (table instanceof OlapTable) {
+ boolean isEnableMemtableOnSinkNode =
+ ((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
+ ?
coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
+
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+ }
coord.exec();
int execTimeout = context.getExecTimeout();
LOG.debug("Insert {} execution timeout:{}",
DebugUtil.printId(context.queryId()), execTimeout);
diff --git
a/regression-test/suites/fault_injection_p0/test_disable_move_memtable.groovy
b/regression-test/suites/fault_injection_p0/test_disable_move_memtable.groovy
new file mode 100644
index 00000000000..267c8fdaba8
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_disable_move_memtable.groovy
@@ -0,0 +1,324 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_disable_move_memtable", "nonConcurrent") {
+ sql """ set enable_memtable_on_sink_node=true """
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """ DROP TABLE IF EXISTS `baseall1` """
+ sql """ DROP TABLE IF EXISTS `test1` """
+ sql """ DROP TABLE IF EXISTS `brokerload` """
+ sql """ DROP TABLE IF EXISTS `brokerload1` """
+ sql """ sync """
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties(
+ "light_schema_change" = "true",
+ "replication_num" = "1"
+ )
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties(
+ "light_schema_change" = "true",
+ "replication_num" = "1"
+ )
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall1` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties(
+ "light_schema_change" = "false",
+ "replication_num" = "1"
+ )
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test1` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties(
+ "light_schema_change" = "false",
+ "replication_num" = "1"
+ )
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS brokerload (
+ user_id bigint,
+ date date,
+ group_id bigint,
+ modify_date date,
+ keyword VARCHAR(128)
+ ) ENGINE=OLAP
+ UNIQUE KEY(user_id, date, group_id)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH (user_id) BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1",
+ "light_schema_change" = "true"
+ );
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS brokerload1 (
+ user_id bigint,
+ date date,
+ group_id bigint,
+ modify_date date,
+ keyword VARCHAR(128)
+ ) ENGINE=OLAP
+ UNIQUE KEY(user_id, date, group_id)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH (user_id) BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1",
+ "light_schema_change" = "false"
+ );
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+ sql """ sync """
+
+ def insert_into_value_with_injection = { injection, tableName, error_msg->
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ sql """ insert into ${tableName} values(true, 10, 1000, 1, 1, 1,
'a', 2024-01-01, 2024-01-01, 'a', 1, 1, "hello", 1) """
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains(error_msg))
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injection)
+ }
+ }
+
+ def insert_into_select_with_injection = { injection, tableName, error_msg->
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ sql "insert into ${tableName} select * from baseall where k1 <= 3"
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains(error_msg))
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injection)
+ }
+ }
+
+ def stream_load_with_injection = { injection, tableName, res->
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ streamLoad {
+ table tableName
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ set 'memtable_on_sink_node', 'true'
+ file "baseall.txt"
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("res: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("${res}".toString(),
json.Status.toLowerCase().toString())
+ }
+ }
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injection)
+ }
+ }
+
+ def load_from_hdfs_norm = {tableName, label, hdfsFilePath, format,
brokerName, hdfsUser, hdfsPasswd ->
+ def result1= sql """
+ LOAD LABEL ${label} (
+ DATA INFILE("${hdfsFilePath}")
+ INTO TABLE ${tableName}
+ FORMAT as "${format}"
+ PROPERTIES ("num_as_string"="true")
+ )
+ with BROKER "${brokerName}" (
+ "username"="${hdfsUser}",
+ "password"="${hdfsPasswd}")
+ PROPERTIES (
+ "timeout"="1200",
+ "max_filter_ratio"="0.1");
+ """
+ log.info("result1: ${result1}")
+ assertTrue(result1.size() == 1)
+ assertTrue(result1[0].size() == 1)
+ assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
+ }
+
+ def check_load_result = {checklabel, testTablex, res ->
+ max_try_milli_secs = 10000
+ while(max_try_milli_secs) {
+ result = sql "show load where label = '${checklabel}'"
+ log.info("result: ${result}")
+ if(result[0][2].toString() == "${res}".toString()) {
+ break
+ } else {
+ sleep(1000) // wait 1 second every time
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertEquals(1, 2)
+ }
+ }
+ }
+ }
+
+ def broker_load_with_injection = { injection, tableName, res->
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ if (enableHdfs()) {
+ brokerName = getBrokerName()
+ hdfsUser = getHdfsUser()
+ hdfsPasswd = getHdfsPasswd()
+ def hdfs_csv_file_path = uploadToHdfs
"load_p0/broker_load/broker_load_with_properties.json"
+ def test_load_label =
UUID.randomUUID().toString().replaceAll("-", "")
+ load_from_hdfs_norm.call(tableName, test_load_label,
hdfs_csv_file_path, "json",
+ brokerName, hdfsUser, hdfsPasswd)
+ check_load_result.call(test_load_label, tableName, res)
+ }
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injection)
+ }
+ }
+
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_nereids_dml=true """
+
insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test", "unknown destination tuple descriptor")
+
insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test1", "success")
+ sql """ set group_commit = sync_mode """
+
insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test", "unknown destination tuple descriptor")
+
insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test1", "success")
+ sql """ set group_commit = off_mode """
+ sql """ set enable_nereids_planner=false """
+ sql """ set enable_nereids_dml=false """
+
insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test", "unknown destination tuple descriptor")
+
insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test1", "success")
+ sql """ set group_commit = sync_mode """
+
insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test", "unknown destination tuple descriptor")
+
insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test1", "success")
+ sql """ set group_commit = off_mode """
+
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_nereids_dml=true """
+
insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test", "unknown destination tuple descriptor")
+
insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test1", "success")
+ sql """ set group_commit = sync_mode """
+
insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test", "unknown destination tuple descriptor")
+
insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test1", "success")
+ sql """ set group_commit = off_mode """
+ sql """ set enable_nereids_planner=false """
+ sql """ set enable_nereids_dml=false """
+
insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test", "unknown destination tuple descriptor")
+
insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test1", "success")
+ sql """ set group_commit = sync_mode """
+
insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test", "unknown destination tuple descriptor")
+
insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"test1", "success")
+ sql """ set group_commit = off_mode """
+
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_nereids_dml=true """
+
stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"baseall", "fail")
+
stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"baseall1", "success")
+ sql """ set enable_nereids_planner=false """
+ sql """ set enable_nereids_dml=false """
+
stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"baseall", "fail")
+
stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"baseall1", "success")
+
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_nereids_dml=true """
+
broker_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"baseall", "CANCELLED")
+
broker_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"baseall1", "FINISHED")
+ sql """ set enable_nereids_planner=false """
+ sql """ set enable_nereids_dml=false """
+
broker_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"brokerload", "CANCELLED")
+
broker_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null",
"brokerload1", "FINISHED")
+
+ sql """ set enable_memtable_on_sink_node=false """
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """ DROP TABLE IF EXISTS `baseall1` """
+ sql """ DROP TABLE IF EXISTS `test1` """
+ sql """ DROP TABLE IF EXISTS `brokerload` """
+ sql """ DROP TABLE IF EXISTS `brokerload1` """
+ sql """ sync """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]