This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b7487430da1 Revert "[improve](move-memtable) cancel load rapidly when 
stream close wait (#29322)" (#29371)
b7487430da1 is described below

commit b7487430da1386e7fa73e14179a43acc0d646767
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Tue Jan 2 11:32:14 2024 +0800

    Revert "[improve](move-memtable) cancel load rapidly when stream close wait 
(#29322)" (#29371)
    
    This reverts commit bbf58c5aa42d40e66bc6ccc9ed91a4fcb4bdfff7.
---
 be/src/vec/sink/load_stream_stub.cpp               |  11 +--
 be/src/vec/sink/load_stream_stub.h                 |  54 ++---------
 be/src/vec/sink/load_stream_stub_pool.cpp          |  13 +--
 be/src/vec/sink/load_stream_stub_pool.h            |   4 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   5 +-
 be/test/io/fs/stream_sink_file_writer_test.cpp     |   6 +-
 be/test/vec/exec/load_stream_stub_pool_test.cpp    |   7 +-
 ...d_stream_stub_close_wait_fault_injection.groovy | 106 ---------------------
 8 files changed, 21 insertions(+), 185 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index a5f4f59dda4..4b557cae93d 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -90,22 +90,19 @@ int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
 void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
     LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id;
     std::lock_guard<bthread::Mutex> lock(_mutex);
-    
DBUG_EXECUTE_IF("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait", 
{ return; });
     _is_closed.store(true);
     _close_cv.notify_all();
 }
 
-LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use, 
RuntimeState* state)
-        : _state(state),
-          _use_cnt(num_use),
+LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
+        : _use_cnt(num_use),
           _load_id(load_id),
           _src_id(src_id),
           _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
           _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) 
{};
 
-LoadStreamStub::LoadStreamStub(LoadStreamStub& stub, RuntimeState* state)
-        : _state(state),
-          _use_cnt(stub._use_cnt.load()),
+LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
+        : _use_cnt(stub._use_cnt.load()),
           _load_id(stub._load_id),
           _src_id(stub._src_id),
           _tablet_schema_for_index(stub._tablet_schema_for_index),
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 9c45926d3cc..edbbbda1e64 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -57,7 +57,6 @@
 #include "gutil/ref_counted.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
-#include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "runtime/types.h"
 #include "util/countdown_latch.h"
@@ -102,29 +101,11 @@ private:
         Status close_wait(int64_t timeout_ms) {
             DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
             std::unique_lock<bthread::Mutex> lock(_mutex);
-            int ret = 0;
-            MonotonicStopWatch watch;
-            watch.start();
-            while (true) {
-                if (_is_closed) {
-                    return Status::OK();
-                }
-                if (_stub->is_cancelled()) {
-                    return _stub->cancelled_state();
-                }
-                if (_stub->runtime_state()->is_cancelled()) {
-                    return 
Status::Cancelled(_stub->runtime_state()->cancel_reason());
-                }
-                // wait 1s once time.
-                ret = _close_cv.wait_for(lock, 1);
-                if (ret == 0) {
-                    return Status::OK();
-                }
-                if (watch.elapsed_time() / 1000 / 1000 >= timeout_ms) {
-                    return Status::InternalError("stream close wait timeout, 
result: {}", ret);
-                }
+            if (_is_closed) {
+                return Status::OK();
             }
-            return Status::OK();
+            int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
+            return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream 
close_wait timeout");
         };
 
         std::vector<int64_t> success_tablets() {
@@ -157,28 +138,10 @@ private:
 
 public:
     // construct new stub
-    LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use, 
RuntimeState* state);
+    LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
 
     // copy constructor, shared_ptr members are shared
-    LoadStreamStub(LoadStreamStub& stub, RuntimeState* state);
-
-    void cancel(Status status) {
-        _cancel = true;
-        _cancel_status = status;
-    }
-
-    RuntimeState* runtime_state() const { return _state; }
-
-    bool is_cancelled() const { return _cancel; }
-
-    Status cancelled_state() const { return _cancel_status; }
-
-    std::string cancel_reason() const {
-        if (_state == nullptr) {
-            return "";
-        }
-        return _state->cancel_reason();
-    }
+    LoadStreamStub(LoadStreamStub& stub);
 
 // for mock this class in UT
 #ifdef BE_TEST
@@ -268,11 +231,6 @@ private:
     Status _send_with_retry(butil::IOBuf& buf);
 
 protected:
-    RuntimeState* _state = nullptr;
-
-    bool _cancel = false;
-    Status _cancel_status;
-
     std::atomic<bool> _is_init;
     bthread::Mutex _mutex;
 
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index 94838acdc08..c867756d8fb 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -54,19 +54,13 @@ void LoadStreams::release() {
     }
 }
 
-void LoadStreams::cancel(Status status) {
-    for (auto& stream : _streams) {
-        stream->cancel(status);
-    }
-}
-
 LoadStreamStubPool::LoadStreamStubPool() = default;
 
 LoadStreamStubPool::~LoadStreamStubPool() = default;
 
 std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId 
load_id, int64_t src_id,
                                                                int64_t dst_id, 
int num_streams,
-                                                               int num_sink, 
RuntimeState* state) {
+                                                               int num_sink) {
     auto key = std::make_pair(UniqueId(load_id), dst_id);
     std::lock_guard<std::mutex> lock(_mutex);
     std::shared_ptr<LoadStreams> streams = _pool[key];
@@ -75,12 +69,11 @@ std::shared_ptr<LoadStreams> 
LoadStreamStubPool::get_or_create(PUniqueId load_id
     }
     DCHECK(num_streams > 0) << "stream num should be greater than 0";
     DCHECK(num_sink > 0) << "sink num should be greater than 0";
-    auto [it, _] =
-            _template_stubs.emplace(load_id, new LoadStreamStub {load_id, 
src_id, num_sink, state});
+    auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub 
{load_id, src_id, num_sink});
     streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
     for (int32_t i = 0; i < num_streams; i++) {
         // copy construct, internal tablet schema map will be shared among all 
stubs
-        streams->streams().emplace_back(new LoadStreamStub {*it->second, 
state});
+        streams->streams().emplace_back(new LoadStreamStub {*it->second});
     }
     _pool[key] = streams;
     return streams;
diff --git a/be/src/vec/sink/load_stream_stub_pool.h 
b/be/src/vec/sink/load_stream_stub_pool.h
index b548d31a14f..e41083825be 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -78,8 +78,6 @@ public:
 
     void release();
 
-    void cancel(Status status);
-
     Streams& streams() { return _streams; }
 
 private:
@@ -97,7 +95,7 @@ public:
     ~LoadStreamStubPool();
 
     std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t 
src_id, int64_t dst_id,
-                                               int num_streams, int num_sink, 
RuntimeState* state);
+                                               int num_streams, int num_sink);
 
     void erase(UniqueId load_id, int64_t dst_id);
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 2550fb172c5..7a83f670fbe 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -113,7 +113,7 @@ Status VTabletWriterV2::_incremental_open_streams(
     }
     for (int64_t node_id : new_backends) {
         auto load_streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
-                _load_id, _backend_id, node_id, _stream_per_node, 
_num_local_sink, _state);
+                _load_id, _backend_id, node_id, _stream_per_node, 
_num_local_sink);
         RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
         _streams_for_node[node_id] = load_streams;
     }
@@ -261,7 +261,7 @@ Status VTabletWriterV2::open(RuntimeState* state, 
RuntimeProfile* profile) {
 Status VTabletWriterV2::_open_streams(int64_t src_id) {
     for (auto& [dst_id, _] : _tablets_for_node) {
         auto streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
-                _load_id, src_id, dst_id, _stream_per_node, _num_local_sink, 
_state);
+                _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
         RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
         _streams_for_node[dst_id] = streams;
     }
@@ -457,7 +457,6 @@ Status VTabletWriterV2::_cancel(Status status) {
     }
     for (const auto& [_, streams] : _streams_for_node) {
         streams->release();
-        streams->cancel(status);
     }
     return Status::OK();
 }
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp 
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index a0daa15916e..7e5bdd350f5 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -51,8 +51,7 @@ static std::atomic<int64_t> g_num_request;
 class StreamSinkFileWriterTest : public testing::Test {
     class MockStreamStub : public LoadStreamStub {
     public:
-        MockStreamStub(PUniqueId load_id, int64_t src_id, RuntimeState* state)
-                : LoadStreamStub(load_id, src_id, 1, state) {};
+        MockStreamStub(PUniqueId load_id, int64_t src_id) : 
LoadStreamStub(load_id, src_id, 1) {};
 
         virtual ~MockStreamStub() = default;
 
@@ -86,9 +85,8 @@ protected:
     virtual void SetUp() {
         _load_id.set_hi(LOAD_ID_HI);
         _load_id.set_lo(LOAD_ID_LO);
-        RuntimeState state;
         for (int src_id = 0; src_id < NUM_STREAM; src_id++) {
-            _streams.emplace_back(new MockStreamStub(_load_id, src_id, 
&state));
+            _streams.emplace_back(new MockStreamStub(_load_id, src_id));
         }
     }
 
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp 
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index 7faa5fb2e5d..24da3bb6999 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -30,14 +30,13 @@ public:
 
 TEST_F(LoadStreamStubPoolTest, test) {
     LoadStreamStubPool pool;
-    RuntimeState state;
     int64_t src_id = 100;
     PUniqueId load_id;
     load_id.set_hi(1);
     load_id.set_hi(2);
-    auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1, &state);
-    auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1, &state);
-    auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1, &state);
+    auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
+    auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
+    auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
     EXPECT_EQ(2, pool.size());
     EXPECT_EQ(1, pool.templates_size());
     EXPECT_EQ(streams1, streams3);
diff --git 
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
deleted file mode 100644
index 85ad5f7bc9d..00000000000
--- 
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
+++ /dev/null
@@ -1,106 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-import org.codehaus.groovy.runtime.IOGroovyMethods
-import org.apache.doris.regression.util.Http
-
-suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") {
-    sql """ set enable_memtable_on_sink_node=true """
-    sql """ DROP TABLE IF EXISTS `baseall` """
-    sql """ DROP TABLE IF EXISTS `test` """
-    sql """
-        CREATE TABLE IF NOT EXISTS `baseall` (
-            `k0` boolean null comment "",
-            `k1` tinyint(4) null comment "",
-            `k2` smallint(6) null comment "",
-            `k3` int(11) null comment "",
-            `k4` bigint(20) null comment "",
-            `k5` decimal(9, 3) null comment "",
-            `k6` char(5) null comment "",
-            `k10` date null comment "",
-            `k11` datetime null comment "",
-            `k7` varchar(20) null comment "",
-            `k8` double max null comment "",
-            `k9` float sum null comment "",
-            `k12` string replace null comment "",
-            `k13` largeint(40) replace null comment ""
-        ) engine=olap
-        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
-        """
-    sql """
-        CREATE TABLE IF NOT EXISTS `test` (
-            `k0` boolean null comment "",
-            `k1` tinyint(4) null comment "",
-            `k2` smallint(6) null comment "",
-            `k3` int(11) null comment "",
-            `k4` bigint(20) null comment "",
-            `k5` decimal(9, 3) null comment "",
-            `k6` char(5) null comment "",
-            `k10` date null comment "",
-            `k11` datetime null comment "",
-            `k7` varchar(20) null comment "",
-            `k8` double max null comment "",
-            `k9` float sum null comment "",
-            `k12` string replace_if_not_null null comment "",
-            `k13` largeint(40) replace null comment ""
-        ) engine=olap
-        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
-        """
-
-    GetDebugPoint().clearDebugPointsForAllBEs()
-    streamLoad {
-        table "baseall"
-        db "regression_test_fault_injection_p0"
-        set 'column_separator', ','
-        file "baseall.txt"
-    }
-
-    try {
-        
GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait")
-        def thread1 = new Thread({
-            try {
-                def res = sql "insert into test select * from baseall where k1 
<= 3"
-                logger.info(res.toString())
-            } catch(Exception e) {
-                logger.info(e.getMessage())
-                assertTrue(e.getMessage().contains("Communications link 
failure"))
-            } finally {
-                
GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait")
-            }
-        })
-        thread1.start()
-
-        sleep(1000)
-
-        def processList = sql "show processlist"
-        logger.info(processList.toString())
-        processList.each { item ->
-            logger.info(item[1].toString())
-            logger.info(item[11].toString())
-            if (item[11].toString() == "insert into test select * from baseall 
where k1 <= 3".toString()){
-                def res = sql "kill ${item[1]}"
-                logger.info(res.toString())
-            }
-        }
-    } catch(Exception e) {
-        logger.info(e.getMessage())
-    }
-
-    sql """ DROP TABLE IF EXISTS `baseall` """
-    sql """ DROP TABLE IF EXISTS `test` """
-    sql """ set enable_memtable_on_sink_node=false """
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to