This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 60ce22f15e869d8d1a8ed5a9a0048f67d6a79422 Author: zclllyybb <[email protected]> AuthorDate: Mon Jan 22 17:46:26 2024 +0800 [fix](auto-partition) Fix a concurrent bug (#30086) when incremental opening, it may be a mistake to send new packet from sender to reciever cuz' re-constructed send closure. now fixed it. --- be/src/runtime/tablets_channel.cpp | 2 +- be/src/vec/sink/writer/vtablet_writer.cpp | 51 +++++---- be/src/vec/sink/writer/vtablet_writer.h | 4 +- .../auto_partition/ddl/stress_destination.sql | 20 ++++ .../auto_partition/ddl/stress_source.sql | 19 ++++ .../doris_dbgen_conf/stress_test_insert_into.yaml | 21 ++++ .../auto_partition/sql/multi_thread_load.groovy | 3 +- .../sql/stress_test_insert_into.groovy | 118 +++++++++++++++++++++ 8 files changed, 215 insertions(+), 23 deletions(-) diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 7ead68d916f..d248ad09d7a 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -152,7 +152,7 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) { SCOPED_TIMER(_incremental_open_timer); if (_state == kInitialized) { // haven't opened - return open(params); + RETURN_IF_ERROR(open(params)); } std::lock_guard<std::mutex> l(_lock); std::vector<SlotDescriptor*>* index_slots = nullptr; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 0ca2f7d8ffb..8eb1c4d743a 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -310,6 +310,10 @@ void VNodeChannel::clear_all_blocks() { // no need to set _cancel_msg because the error will be // returned directly via "TabletSink::prepare()" method. Status VNodeChannel::init(RuntimeState* state) { + if (_inited) { + return Status::OK(); + } + SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); _task_exec_ctx = state->get_task_execution_context(); _tuple_desc = _parent->_output_tuple_desc; @@ -348,6 +352,25 @@ Status VNodeChannel::init(RuntimeState* state) { _cur_add_block_request->set_backend_id(_node_id); _cur_add_block_request->set_eos(false); + // add block closure + _send_block_callback = WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared(); + _send_block_callback->addFailedHandler([this](bool is_last_rpc) { + auto ctx_lock = _task_exec_ctx.lock(); + if (ctx_lock == nullptr) { + return; + } + _add_block_failed_callback(is_last_rpc); + }); + + _send_block_callback->addSuccessHandler( + [this](const PTabletWriterAddBlockResult& result, bool is_last_rpc) { + auto ctx_lock = _task_exec_ctx.lock(); + if (ctx_lock == nullptr) { + return; + } + _add_block_success_callback(result, is_last_rpc); + }); + _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, _node_id); // The node channel will send _batch_size rows of data each rpc. When the // number of tablets is large, the number of data rows received by each @@ -356,18 +379,23 @@ Status VNodeChannel::init(RuntimeState* state) { // a relatively large value to improve the import performance. _batch_size = std::max(_batch_size, 8192); + _inited = true; return Status::OK(); } void VNodeChannel::_open_internal(bool is_incremental) { + if (_tablets_wait_open.empty()) { + return; + } SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); auto request = std::make_shared<PTabletWriterOpenRequest>(); request->set_allocated_id(&_parent->_load_id); request->set_index_id(_index_channel->_index_id); request->set_txn_id(_parent->_txn_id); request->set_allocated_schema(_parent->_schema->to_protobuf()); + std::set<int64_t> deduper; - for (auto& tablet : _all_tablets) { + for (auto& tablet : _tablets_wait_open) { if (deduper.contains(tablet.tablet_id)) { continue; } @@ -375,7 +403,10 @@ void VNodeChannel::_open_internal(bool is_incremental) { ptablet->set_partition_id(tablet.partition_id); ptablet->set_tablet_id(tablet.tablet_id); deduper.insert(tablet.tablet_id); + _all_tablets.push_back(std::move(tablet)); } + _tablets_wait_open.clear(); + request->set_num_senders(_parent->_num_senders); request->set_need_gen_rollup(false); // Useless but it is a required field in pb request->set_load_mem_limit(_parent->_load_mem_limit); @@ -444,24 +475,6 @@ Status VNodeChannel::open_wait() { } } - // add block closure - _send_block_callback = WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared(); - _send_block_callback->addFailedHandler([this](bool is_last_rpc) { - auto ctx_lock = _task_exec_ctx.lock(); - if (ctx_lock == nullptr) { - return; - } - _add_block_failed_callback(is_last_rpc); - }); - - _send_block_callback->addSuccessHandler( - [this](const PTabletWriterAddBlockResult& result, bool is_last_rpc) { - auto ctx_lock = _task_exec_ctx.lock(); - if (ctx_lock == nullptr) { - return; - } - _add_block_success_callback(result, is_last_rpc); - }); return status; } diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index cd2eafb1f24..68534cd3a16 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -219,7 +219,7 @@ public: ~VNodeChannel(); // called before open, used to add tablet located in this backend. called by IndexChannel::init - void add_tablet(const TTabletWithPartition& tablet) { _all_tablets.emplace_back(tablet); } + void add_tablet(const TTabletWithPartition& tablet) { _tablets_wait_open.emplace_back(tablet); } std::string debug_tablets() const { std::stringstream ss; for (const auto& tab : _all_tablets) { @@ -368,6 +368,7 @@ protected: std::vector<std::shared_ptr<DummyBrpcCallback<PTabletWriterOpenResult>>> _open_callbacks; std::vector<TTabletWithPartition> _all_tablets; + std::vector<TTabletWithPartition> _tablets_wait_open; // map from tablet_id to node_id where slave replicas locate in std::unordered_map<int64_t, std::vector<int64_t>> _slave_tablet_nodes; std::vector<TTabletCommitInfo> _tablet_commit_infos; @@ -388,6 +389,7 @@ protected: // The IndexChannel is definitely accessible until the NodeChannel is closed. std::mutex _closed_lock; bool _is_closed = false; + bool _inited = false; RuntimeState* _state = nullptr; // A context lock for callbacks, the callback has to lock the ctx, to avoid diff --git a/regression-test/suites/partition_p1/auto_partition/ddl/stress_destination.sql b/regression-test/suites/partition_p1/auto_partition/ddl/stress_destination.sql new file mode 100644 index 00000000000..f1dbb790987 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/ddl/stress_destination.sql @@ -0,0 +1,20 @@ +CREATE TABLE `stress_destination` ( + `create_date` DATE NOT NULL, + `parent_org_id` VARCHAR(96) NULL, + `org_id` VARCHAR(100) NULL, + `org_name` VARCHAR(192) NULL, + `create_month` VARCHAR(11) NOT NULL, + `org_type` VARCHAR(192) NULL, + `sms_total` INT NULL, + `success_sms_total` INT NULL, + `sms_price_total` DOUBLE NULL, + `sms_total_sum` INT NULL, + `has_sub` INT NULL, + `order_num` INT NULL +) ENGINE=OLAP +UNIQUE KEY(`create_date`, `parent_org_id`, `org_id`) +AUTO PARTITION BY RANGE date_trunc(`create_date`,'day')() +DISTRIBUTED BY HASH(`create_date`, `org_id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); diff --git a/regression-test/suites/partition_p1/auto_partition/ddl/stress_source.sql b/regression-test/suites/partition_p1/auto_partition/ddl/stress_source.sql new file mode 100644 index 00000000000..d095e4119c7 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/ddl/stress_source.sql @@ -0,0 +1,19 @@ +CREATE TABLE `stress_source` ( + `create_date` DATE NOT NULL, + `parent_org_id` VARCHAR(96) NULL, + `org_id` VARCHAR(100) NULL, + `org_name` VARCHAR(192) NULL, + `create_month` VARCHAR(11) NOT NULL, + `org_type` VARCHAR(192) NULL, + `sms_total` INT NULL, + `success_sms_total` INT NULL, + `sms_price_total` DOUBLE NULL, + `sms_total_sum` INT NULL, + `has_sub` INT NULL, + `order_num` INT NULL +) ENGINE=OLAP +UNIQUE KEY(`create_date`, `parent_org_id`, `org_id`) +DISTRIBUTED BY HASH(`create_date`, `org_id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); diff --git a/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/stress_test_insert_into.yaml b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/stress_test_insert_into.yaml new file mode 100644 index 00000000000..a45fb01c316 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/stress_test_insert_into.yaml @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +tables: + stress_source: + create_date: + range: {min: "2023-07-01", max: "2024-01-10"} \ No newline at end of file diff --git a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy index 25a9a49305d..aebc022feeb 100644 --- a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy +++ b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy @@ -20,9 +20,8 @@ import java.nio.file.Files import java.nio.file.Paths import java.net.URL import java.io.File -import java.util.concurrent.locks.ReentrantLock -suite("multi_thread_load") { +suite("multi_thread_load", "nonConcurrent") { // stress case should use resource fully // get doris-db from s3 def dirPath = context.file.parent def fatherPath = context.file.parentFile.parentFile.getPath() diff --git a/regression-test/suites/partition_p1/auto_partition/sql/stress_test_insert_into.groovy b/regression-test/suites/partition_p1/auto_partition/sql/stress_test_insert_into.groovy new file mode 100644 index 00000000000..d2ce87248e0 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/sql/stress_test_insert_into.groovy @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.io.FileType +import java.nio.file.Files +import java.nio.file.Paths +import java.net.URL +import java.io.File + +suite("stress_test_insert_into") { + // get doris-db from s3 + def dirPath = context.file.parent + def fatherPath = context.file.parentFile.parentFile.getPath() + def fileName = "doris-dbgen" + def fileUrl = "${getS3Url()}/regression/doris-dbgen-23-10-18/doris-dbgen-23-10-20/doris-dbgen" + def filePath = Paths.get(dirPath, fileName) + if (!Files.exists(filePath)) { + new URL(fileUrl).withInputStream { inputStream -> + Files.copy(inputStream, filePath) + } + def file = new File(dirPath + "/" + fileName) + file.setExecutable(true) + } + + def rows = 100000 + + // load data via doris-dbgen + def doris_dbgen_create_data = { db_name, tb_name -> + def tableName = tb_name + + def jdbcUrl = context.config.jdbcUrl + def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + String feHttpAddress = context.config.feHttpAddress + def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1) + + String realDb = db_name + String user = context.config.jdbcUser + String password = context.config.jdbcPassword + + def cm + if (password) { + cm = """ + ${dirPath}/doris-dbgen gen + --host ${sql_ip} + --sql-port ${sql_port} + --user ${user} + --password ${password} + --database ${realDb} + --table ${tableName} + --rows ${rows} + --http-port ${http_port} + --config ${fatherPath}/doris_dbgen_conf/stress_test_insert_into.yaml + """ + } else { + cm = """ + ${dirPath}/doris-dbgen gen + --host ${sql_ip} + --sql-port ${sql_port} + --user ${user} + --database ${realDb} + --table ${tableName} + --rows ${rows} + --http-port ${http_port} + --config ${fatherPath}/doris_dbgen_conf/stress_test_insert_into.yaml + """ + } + + logger.info("datagen: " + cm) + def proc = cm.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(1800000) + } + + def database_name = "regression_test_auto_partition_concurrent" + def table_src = "stress_source" + def table_dest = "stress_destination" + + sql """create database if not exists ${database_name};""" + sql """use ${database_name};""" + sql """drop table if exists ${table_src};""" + sql """drop table if exists ${table_dest};""" + sql new File("""${fatherPath}/ddl/stress_source.sql""").text + sql new File("""${fatherPath}/ddl/stress_destination.sql""").text + doris_dbgen_create_data(database_name, table_src) + + // TEST-BODY + def count_src = sql " select count() from ${table_src}; " + sql " insert into ${table_dest} select * from ${table_src} " + def count_dest = sql " select count() from ${table_dest}; " + // check data count + assertTrue(count_src[0][0] > 0) + assertEquals(count_src[0][0], count_dest[0][0]) + logger.info("got rows: ${count_src[0][0]}") +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
