This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bbf58c5aa42 [improve](move-memtable) cancel load rapidly when stream
close wait (#29322)
bbf58c5aa42 is described below
commit bbf58c5aa42d40e66bc6ccc9ed91a4fcb4bdfff7
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sun Dec 31 16:26:41 2023 +0800
[improve](move-memtable) cancel load rapidly when stream close wait (#29322)
---
be/src/vec/sink/load_stream_stub.cpp | 11 ++-
be/src/vec/sink/load_stream_stub.h | 54 +++++++++--
be/src/vec/sink/load_stream_stub_pool.cpp | 13 ++-
be/src/vec/sink/load_stream_stub_pool.h | 4 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 +-
be/test/io/fs/stream_sink_file_writer_test.cpp | 6 +-
be/test/vec/exec/load_stream_stub_pool_test.cpp | 7 +-
...d_stream_stub_close_wait_fault_injection.groovy | 106 +++++++++++++++++++++
8 files changed, 185 insertions(+), 21 deletions(-)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 4b557cae93d..a5f4f59dda4 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -90,19 +90,22 @@ int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id;
std::lock_guard<bthread::Mutex> lock(_mutex);
+
DBUG_EXECUTE_IF("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait",
{ return; });
_is_closed.store(true);
_close_cv.notify_all();
}
-LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
- : _use_cnt(num_use),
+LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use,
RuntimeState* state)
+ : _state(state),
+ _use_cnt(num_use),
_load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>())
{};
-LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
- : _use_cnt(stub._use_cnt.load()),
+LoadStreamStub::LoadStreamStub(LoadStreamStub& stub, RuntimeState* state)
+ : _state(state),
+ _use_cnt(stub._use_cnt.load()),
_load_id(stub._load_id),
_src_id(stub._src_id),
_tablet_schema_for_index(stub._tablet_schema_for_index),
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index edbbbda1e64..9c45926d3cc 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -57,6 +57,7 @@
#include "gutil/ref_counted.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
+#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "util/countdown_latch.h"
@@ -101,11 +102,29 @@ private:
Status close_wait(int64_t timeout_ms) {
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
std::unique_lock<bthread::Mutex> lock(_mutex);
- if (_is_closed) {
- return Status::OK();
+ int ret = 0;
+ MonotonicStopWatch watch;
+ watch.start();
+ while (true) {
+ if (_is_closed) {
+ return Status::OK();
+ }
+ if (_stub->is_cancelled()) {
+ return _stub->cancelled_state();
+ }
+ if (_stub->runtime_state()->is_cancelled()) {
+ return
Status::Cancelled(_stub->runtime_state()->cancel_reason());
+ }
+ // wait 1s once time.
+ ret = _close_cv.wait_for(lock, 1);
+ if (ret == 0) {
+ return Status::OK();
+ }
+ if (watch.elapsed_time() / 1000 / 1000 >= timeout_ms) {
+ return Status::InternalError("stream close wait timeout,
result: {}", ret);
+ }
}
- int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
- return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream
close_wait timeout");
+ return Status::OK();
};
std::vector<int64_t> success_tablets() {
@@ -138,10 +157,28 @@ private:
public:
// construct new stub
- LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
+ LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use,
RuntimeState* state);
// copy constructor, shared_ptr members are shared
- LoadStreamStub(LoadStreamStub& stub);
+ LoadStreamStub(LoadStreamStub& stub, RuntimeState* state);
+
+ void cancel(Status status) {
+ _cancel = true;
+ _cancel_status = status;
+ }
+
+ RuntimeState* runtime_state() const { return _state; }
+
+ bool is_cancelled() const { return _cancel; }
+
+ Status cancelled_state() const { return _cancel_status; }
+
+ std::string cancel_reason() const {
+ if (_state == nullptr) {
+ return "";
+ }
+ return _state->cancel_reason();
+ }
// for mock this class in UT
#ifdef BE_TEST
@@ -231,6 +268,11 @@ private:
Status _send_with_retry(butil::IOBuf& buf);
protected:
+ RuntimeState* _state = nullptr;
+
+ bool _cancel = false;
+ Status _cancel_status;
+
std::atomic<bool> _is_init;
bthread::Mutex _mutex;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_stub_pool.cpp
index c867756d8fb..94838acdc08 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -54,13 +54,19 @@ void LoadStreams::release() {
}
}
+void LoadStreams::cancel(Status status) {
+ for (auto& stream : _streams) {
+ stream->cancel(status);
+ }
+}
+
LoadStreamStubPool::LoadStreamStubPool() = default;
LoadStreamStubPool::~LoadStreamStubPool() = default;
std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId
load_id, int64_t src_id,
int64_t dst_id,
int num_streams,
- int num_sink) {
+ int num_sink,
RuntimeState* state) {
auto key = std::make_pair(UniqueId(load_id), dst_id);
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<LoadStreams> streams = _pool[key];
@@ -69,11 +75,12 @@ std::shared_ptr<LoadStreams>
LoadStreamStubPool::get_or_create(PUniqueId load_id
}
DCHECK(num_streams > 0) << "stream num should be greater than 0";
DCHECK(num_sink > 0) << "sink num should be greater than 0";
- auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub
{load_id, src_id, num_sink});
+ auto [it, _] =
+ _template_stubs.emplace(load_id, new LoadStreamStub {load_id,
src_id, num_sink, state});
streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
for (int32_t i = 0; i < num_streams; i++) {
// copy construct, internal tablet schema map will be shared among all
stubs
- streams->streams().emplace_back(new LoadStreamStub {*it->second});
+ streams->streams().emplace_back(new LoadStreamStub {*it->second,
state});
}
_pool[key] = streams;
return streams;
diff --git a/be/src/vec/sink/load_stream_stub_pool.h
b/be/src/vec/sink/load_stream_stub_pool.h
index e41083825be..b548d31a14f 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -78,6 +78,8 @@ public:
void release();
+ void cancel(Status status);
+
Streams& streams() { return _streams; }
private:
@@ -95,7 +97,7 @@ public:
~LoadStreamStubPool();
std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t
src_id, int64_t dst_id,
- int num_streams, int num_sink);
+ int num_streams, int num_sink,
RuntimeState* state);
void erase(UniqueId load_id, int64_t dst_id);
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 77df9b07f18..b0a39ac2a76 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -113,7 +113,7 @@ Status VTabletWriterV2::_incremental_open_streams(
}
for (int64_t node_id : new_backends) {
auto load_streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
- _load_id, _backend_id, node_id, _stream_per_node,
_num_local_sink);
+ _load_id, _backend_id, node_id, _stream_per_node,
_num_local_sink, _state);
RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
_streams_for_node[node_id] = load_streams;
}
@@ -261,7 +261,7 @@ Status VTabletWriterV2::open(RuntimeState* state,
RuntimeProfile* profile) {
Status VTabletWriterV2::_open_streams(int64_t src_id) {
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
- _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
+ _load_id, src_id, dst_id, _stream_per_node, _num_local_sink,
_state);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
_streams_for_node[dst_id] = streams;
}
@@ -457,6 +457,7 @@ Status VTabletWriterV2::_cancel(Status status) {
}
for (const auto& [_, streams] : _streams_for_node) {
streams->release();
+ streams->cancel(status);
}
return Status::OK();
}
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index 7e5bdd350f5..a0daa15916e 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -51,7 +51,8 @@ static std::atomic<int64_t> g_num_request;
class StreamSinkFileWriterTest : public testing::Test {
class MockStreamStub : public LoadStreamStub {
public:
- MockStreamStub(PUniqueId load_id, int64_t src_id) :
LoadStreamStub(load_id, src_id, 1) {};
+ MockStreamStub(PUniqueId load_id, int64_t src_id, RuntimeState* state)
+ : LoadStreamStub(load_id, src_id, 1, state) {};
virtual ~MockStreamStub() = default;
@@ -85,8 +86,9 @@ protected:
virtual void SetUp() {
_load_id.set_hi(LOAD_ID_HI);
_load_id.set_lo(LOAD_ID_LO);
+ RuntimeState state;
for (int src_id = 0; src_id < NUM_STREAM; src_id++) {
- _streams.emplace_back(new MockStreamStub(_load_id, src_id));
+ _streams.emplace_back(new MockStreamStub(_load_id, src_id,
&state));
}
}
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index 24da3bb6999..7faa5fb2e5d 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -30,13 +30,14 @@ public:
TEST_F(LoadStreamStubPoolTest, test) {
LoadStreamStubPool pool;
+ RuntimeState state;
int64_t src_id = 100;
PUniqueId load_id;
load_id.set_hi(1);
load_id.set_hi(2);
- auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
- auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
- auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
+ auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1, &state);
+ auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1, &state);
+ auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1, &state);
EXPECT_EQ(2, pool.size());
EXPECT_EQ(1, pool.templates_size());
EXPECT_EQ(streams1, streams3);
diff --git
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
new file mode 100644
index 00000000000..85ad5f7bc9d
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") {
+ sql """ set enable_memtable_on_sink_node=true """
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait")
+ def thread1 = new Thread({
+ try {
+ def res = sql "insert into test select * from baseall where k1
<= 3"
+ logger.info(res.toString())
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains("Communications link
failure"))
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStub::LoadStreamReplyHandler::on_closed.close_wait")
+ }
+ })
+ thread1.start()
+
+ sleep(1000)
+
+ def processList = sql "show processlist"
+ logger.info(processList.toString())
+ processList.each { item ->
+ logger.info(item[1].toString())
+ logger.info(item[11].toString())
+ if (item[11].toString() == "insert into test select * from baseall
where k1 <= 3".toString()){
+ def res = sql "kill ${item[1]}"
+ logger.info(res.toString())
+ }
+ }
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ }
+
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """ set enable_memtable_on_sink_node=false """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]