This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 60ce22f15e869d8d1a8ed5a9a0048f67d6a79422
Author: zclllyybb <[email protected]>
AuthorDate: Mon Jan 22 17:46:26 2024 +0800

    [fix](auto-partition) Fix a concurrent bug (#30086)
    
    when incremental opening, it may be a mistake to send new packet from 
sender to reciever cuz' re-constructed send closure.
    now fixed it.
---
 be/src/runtime/tablets_channel.cpp                 |   2 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |  51 +++++----
 be/src/vec/sink/writer/vtablet_writer.h            |   4 +-
 .../auto_partition/ddl/stress_destination.sql      |  20 ++++
 .../auto_partition/ddl/stress_source.sql           |  19 ++++
 .../doris_dbgen_conf/stress_test_insert_into.yaml  |  21 ++++
 .../auto_partition/sql/multi_thread_load.groovy    |   3 +-
 .../sql/stress_test_insert_into.groovy             | 118 +++++++++++++++++++++
 8 files changed, 215 insertions(+), 23 deletions(-)

diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 7ead68d916f..d248ad09d7a 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -152,7 +152,7 @@ Status BaseTabletsChannel::open(const 
PTabletWriterOpenRequest& request) {
 Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& 
params) {
     SCOPED_TIMER(_incremental_open_timer);
     if (_state == kInitialized) { // haven't opened
-        return open(params);
+        RETURN_IF_ERROR(open(params));
     }
     std::lock_guard<std::mutex> l(_lock);
     std::vector<SlotDescriptor*>* index_slots = nullptr;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 0ca2f7d8ffb..8eb1c4d743a 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -310,6 +310,10 @@ void VNodeChannel::clear_all_blocks() {
 // no need to set _cancel_msg because the error will be
 // returned directly via "TabletSink::prepare()" method.
 Status VNodeChannel::init(RuntimeState* state) {
+    if (_inited) {
+        return Status::OK();
+    }
+
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     _task_exec_ctx = state->get_task_execution_context();
     _tuple_desc = _parent->_output_tuple_desc;
@@ -348,6 +352,25 @@ Status VNodeChannel::init(RuntimeState* state) {
     _cur_add_block_request->set_backend_id(_node_id);
     _cur_add_block_request->set_eos(false);
 
+    // add block closure
+    _send_block_callback = 
WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared();
+    _send_block_callback->addFailedHandler([this](bool is_last_rpc) {
+        auto ctx_lock = _task_exec_ctx.lock();
+        if (ctx_lock == nullptr) {
+            return;
+        }
+        _add_block_failed_callback(is_last_rpc);
+    });
+
+    _send_block_callback->addSuccessHandler(
+            [this](const PTabletWriterAddBlockResult& result, bool 
is_last_rpc) {
+                auto ctx_lock = _task_exec_ctx.lock();
+                if (ctx_lock == nullptr) {
+                    return;
+                }
+                _add_block_success_callback(result, is_last_rpc);
+            });
+
     _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, 
_node_id);
     // The node channel will send _batch_size rows of data each rpc. When the
     // number of tablets is large, the number of data rows received by each
@@ -356,18 +379,23 @@ Status VNodeChannel::init(RuntimeState* state) {
     // a relatively large value to improve the import performance.
     _batch_size = std::max(_batch_size, 8192);
 
+    _inited = true;
     return Status::OK();
 }
 
 void VNodeChannel::_open_internal(bool is_incremental) {
+    if (_tablets_wait_open.empty()) {
+        return;
+    }
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     auto request = std::make_shared<PTabletWriterOpenRequest>();
     request->set_allocated_id(&_parent->_load_id);
     request->set_index_id(_index_channel->_index_id);
     request->set_txn_id(_parent->_txn_id);
     request->set_allocated_schema(_parent->_schema->to_protobuf());
+
     std::set<int64_t> deduper;
-    for (auto& tablet : _all_tablets) {
+    for (auto& tablet : _tablets_wait_open) {
         if (deduper.contains(tablet.tablet_id)) {
             continue;
         }
@@ -375,7 +403,10 @@ void VNodeChannel::_open_internal(bool is_incremental) {
         ptablet->set_partition_id(tablet.partition_id);
         ptablet->set_tablet_id(tablet.tablet_id);
         deduper.insert(tablet.tablet_id);
+        _all_tablets.push_back(std::move(tablet));
     }
+    _tablets_wait_open.clear();
+
     request->set_num_senders(_parent->_num_senders);
     request->set_need_gen_rollup(false); // Useless but it is a required field 
in pb
     request->set_load_mem_limit(_parent->_load_mem_limit);
@@ -444,24 +475,6 @@ Status VNodeChannel::open_wait() {
         }
     }
 
-    // add block closure
-    _send_block_callback = 
WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared();
-    _send_block_callback->addFailedHandler([this](bool is_last_rpc) {
-        auto ctx_lock = _task_exec_ctx.lock();
-        if (ctx_lock == nullptr) {
-            return;
-        }
-        _add_block_failed_callback(is_last_rpc);
-    });
-
-    _send_block_callback->addSuccessHandler(
-            [this](const PTabletWriterAddBlockResult& result, bool 
is_last_rpc) {
-                auto ctx_lock = _task_exec_ctx.lock();
-                if (ctx_lock == nullptr) {
-                    return;
-                }
-                _add_block_success_callback(result, is_last_rpc);
-            });
     return status;
 }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index cd2eafb1f24..68534cd3a16 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -219,7 +219,7 @@ public:
     ~VNodeChannel();
 
     // called before open, used to add tablet located in this backend. called 
by IndexChannel::init
-    void add_tablet(const TTabletWithPartition& tablet) { 
_all_tablets.emplace_back(tablet); }
+    void add_tablet(const TTabletWithPartition& tablet) { 
_tablets_wait_open.emplace_back(tablet); }
     std::string debug_tablets() const {
         std::stringstream ss;
         for (const auto& tab : _all_tablets) {
@@ -368,6 +368,7 @@ protected:
     std::vector<std::shared_ptr<DummyBrpcCallback<PTabletWriterOpenResult>>> 
_open_callbacks;
 
     std::vector<TTabletWithPartition> _all_tablets;
+    std::vector<TTabletWithPartition> _tablets_wait_open;
     // map from tablet_id to node_id where slave replicas locate in
     std::unordered_map<int64_t, std::vector<int64_t>> _slave_tablet_nodes;
     std::vector<TTabletCommitInfo> _tablet_commit_infos;
@@ -388,6 +389,7 @@ protected:
     // The IndexChannel is definitely accessible until the NodeChannel is 
closed.
     std::mutex _closed_lock;
     bool _is_closed = false;
+    bool _inited = false;
 
     RuntimeState* _state = nullptr;
     // A context lock for callbacks, the callback has to lock the ctx, to avoid
diff --git 
a/regression-test/suites/partition_p1/auto_partition/ddl/stress_destination.sql 
b/regression-test/suites/partition_p1/auto_partition/ddl/stress_destination.sql
new file mode 100644
index 00000000000..f1dbb790987
--- /dev/null
+++ 
b/regression-test/suites/partition_p1/auto_partition/ddl/stress_destination.sql
@@ -0,0 +1,20 @@
+CREATE TABLE `stress_destination` (
+  `create_date` DATE NOT NULL,
+  `parent_org_id` VARCHAR(96) NULL,
+  `org_id` VARCHAR(100) NULL,
+  `org_name` VARCHAR(192) NULL,
+  `create_month` VARCHAR(11) NOT NULL,
+  `org_type` VARCHAR(192) NULL,
+  `sms_total` INT NULL,
+  `success_sms_total` INT NULL,
+  `sms_price_total` DOUBLE NULL,
+  `sms_total_sum` INT NULL,
+  `has_sub` INT NULL,
+  `order_num` INT NULL
+) ENGINE=OLAP
+UNIQUE KEY(`create_date`, `parent_org_id`, `org_id`)
+AUTO PARTITION BY RANGE date_trunc(`create_date`,'day')()
+DISTRIBUTED BY HASH(`create_date`, `org_id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
diff --git 
a/regression-test/suites/partition_p1/auto_partition/ddl/stress_source.sql 
b/regression-test/suites/partition_p1/auto_partition/ddl/stress_source.sql
new file mode 100644
index 00000000000..d095e4119c7
--- /dev/null
+++ b/regression-test/suites/partition_p1/auto_partition/ddl/stress_source.sql
@@ -0,0 +1,19 @@
+CREATE TABLE `stress_source` (
+  `create_date` DATE NOT NULL,
+  `parent_org_id` VARCHAR(96) NULL,
+  `org_id` VARCHAR(100) NULL,
+  `org_name` VARCHAR(192) NULL,
+  `create_month` VARCHAR(11) NOT NULL,
+  `org_type` VARCHAR(192) NULL,
+  `sms_total` INT NULL,
+  `success_sms_total` INT NULL,
+  `sms_price_total` DOUBLE NULL,
+  `sms_total_sum` INT NULL,
+  `has_sub` INT NULL,
+  `order_num` INT NULL
+) ENGINE=OLAP
+UNIQUE KEY(`create_date`, `parent_org_id`, `org_id`)
+DISTRIBUTED BY HASH(`create_date`, `org_id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
diff --git 
a/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/stress_test_insert_into.yaml
 
b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/stress_test_insert_into.yaml
new file mode 100644
index 00000000000..a45fb01c316
--- /dev/null
+++ 
b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/stress_test_insert_into.yaml
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+tables:
+        stress_source:
+                create_date:
+                        range: {min: "2023-07-01", max: "2024-01-10"}
\ No newline at end of file
diff --git 
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
 
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
index 25a9a49305d..aebc022feeb 100644
--- 
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
+++ 
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
@@ -20,9 +20,8 @@ import java.nio.file.Files
 import java.nio.file.Paths
 import java.net.URL
 import java.io.File
-import java.util.concurrent.locks.ReentrantLock
 
-suite("multi_thread_load") {
+suite("multi_thread_load", "nonConcurrent") { // stress case should use 
resource fully
     // get doris-db from s3
     def dirPath = context.file.parent
     def fatherPath = context.file.parentFile.parentFile.getPath()
diff --git 
a/regression-test/suites/partition_p1/auto_partition/sql/stress_test_insert_into.groovy
 
b/regression-test/suites/partition_p1/auto_partition/sql/stress_test_insert_into.groovy
new file mode 100644
index 00000000000..d2ce87248e0
--- /dev/null
+++ 
b/regression-test/suites/partition_p1/auto_partition/sql/stress_test_insert_into.groovy
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.io.FileType
+import java.nio.file.Files
+import java.nio.file.Paths
+import java.net.URL
+import java.io.File
+
+suite("stress_test_insert_into") {
+    // get doris-db from s3
+    def dirPath = context.file.parent
+    def fatherPath = context.file.parentFile.parentFile.getPath()
+    def fileName = "doris-dbgen"
+    def fileUrl = 
"${getS3Url()}/regression/doris-dbgen-23-10-18/doris-dbgen-23-10-20/doris-dbgen"
+    def filePath = Paths.get(dirPath, fileName)
+    if (!Files.exists(filePath)) {
+        new URL(fileUrl).withInputStream { inputStream ->
+            Files.copy(inputStream, filePath)
+        }
+        def file = new File(dirPath + "/" + fileName)
+        file.setExecutable(true)
+    }
+
+    def rows = 100000
+
+    // load data via doris-dbgen
+    def doris_dbgen_create_data = { db_name, tb_name ->
+        def tableName = tb_name
+
+        def jdbcUrl = context.config.jdbcUrl
+        def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
+        def sql_ip = urlWithoutSchema.substring(0, 
urlWithoutSchema.indexOf(":"))
+        def sql_port
+        if (urlWithoutSchema.indexOf("/") >= 0) {
+            // e.g: jdbc:mysql://locahost:8080/?a=b
+            sql_port = 
urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, 
urlWithoutSchema.indexOf("/"))
+        } else {
+            // e.g: jdbc:mysql://locahost:8080
+            sql_port = 
urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1)
+        }
+        String feHttpAddress = context.config.feHttpAddress
+        def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1)
+
+        String realDb = db_name
+        String user = context.config.jdbcUser
+        String password = context.config.jdbcPassword
+
+        def cm 
+        if (password) {
+            cm = """
+                ${dirPath}/doris-dbgen gen
+                    --host ${sql_ip}
+                    --sql-port ${sql_port}
+                    --user ${user}
+                    --password ${password}
+                    --database ${realDb}
+                    --table ${tableName}
+                    --rows ${rows}
+                    --http-port ${http_port}
+                    --config 
${fatherPath}/doris_dbgen_conf/stress_test_insert_into.yaml
+                """
+        } else {
+            cm = """
+                ${dirPath}/doris-dbgen gen
+                    --host ${sql_ip}
+                    --sql-port ${sql_port}
+                    --user ${user}
+                    --database ${realDb}
+                    --table ${tableName}
+                    --rows ${rows}
+                    --http-port ${http_port}
+                    --config 
${fatherPath}/doris_dbgen_conf/stress_test_insert_into.yaml
+                """
+        }
+
+        logger.info("datagen: " + cm)
+        def proc = cm.execute()
+        def sout = new StringBuilder(), serr = new StringBuilder()
+        proc.consumeProcessOutput(sout, serr)
+        proc.waitForOrKill(1800000)
+    }
+
+    def database_name = "regression_test_auto_partition_concurrent"
+    def table_src = "stress_source"
+    def table_dest = "stress_destination"
+
+    sql """create database if not exists ${database_name};"""
+    sql """use ${database_name};"""
+    sql """drop table if exists ${table_src};"""
+    sql """drop table if exists ${table_dest};"""
+    sql new File("""${fatherPath}/ddl/stress_source.sql""").text
+    sql new File("""${fatherPath}/ddl/stress_destination.sql""").text
+    doris_dbgen_create_data(database_name, table_src)
+
+    // TEST-BODY
+    def count_src = sql " select count() from ${table_src}; "
+    sql " insert into ${table_dest} select * from ${table_src} "
+    def count_dest = sql " select count() from ${table_dest}; "
+    // check data count
+    assertTrue(count_src[0][0] > 0)
+    assertEquals(count_src[0][0], count_dest[0][0])
+    logger.info("got rows: ${count_src[0][0]}")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to