This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 69a01e0cf5c [improve](move-memtable) skip load stream stub close wait
when cancel (#29427)
69a01e0cf5c is described below
commit 69a01e0cf5c704c07b78a87ea315effacfceb08b
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Tue Jan 2 23:35:50 2024 +0800
[improve](move-memtable) skip load stream stub close wait when cancel
(#29427)
---
be/src/vec/sink/load_stream_stub.cpp | 1 -
be/src/vec/sink/load_stream_stub.h | 5 ++
be/src/vec/sink/load_stream_stub_pool.cpp | 16 ++--
be/src/vec/sink/load_stream_stub_pool.h | 2 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +-
be/test/vec/exec/load_stream_stub_pool_test.cpp | 7 +-
...d_stream_stub_close_wait_fault_injection.groovy | 88 ++++++++++++++++++++++
7 files changed, 109 insertions(+), 14 deletions(-)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 6863faa4ac3..9ad8d8805ca 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -21,7 +21,6 @@
#include "olap/rowset/rowset_writer.h"
#include "util/brpc_client_cache.h"
-#include "util/debug_points.h"
#include "util/network_util.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 5c9b3d2f31d..a5c9d7464a4 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -60,6 +60,7 @@
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "util/countdown_latch.h"
+#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
@@ -183,6 +184,10 @@ public:
// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
Status close_wait(int64_t timeout_ms = 0) {
+ DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", {
+ while (true) {
+ };
+ });
if (!_is_init.load() || _handler.is_closed()) {
return Status::OK();
}
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_stub_pool.cpp
index c867756d8fb..3cf168a131e 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -26,7 +26,7 @@ class TExpr;
LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use,
LoadStreamStubPool* pool)
: _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
-void LoadStreams::release() {
+void LoadStreams::release(Status status) {
int num_use = --_use_cnt;
DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
if (num_use == 0) {
@@ -39,12 +39,14 @@ void LoadStreams::release() {
LOG(WARNING) << "close stream failed " << st;
}
}
- for (auto& stream : _streams) {
- auto st = stream->close_wait();
- DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed",
- { st = Status::InternalError("stream close wait
timeout"); });
- if (!st.ok()) {
- LOG(WARNING) << "close wait failed " << st;
+ if (status.ok()) {
+ for (auto& stream : _streams) {
+ auto st = stream->close_wait();
+ DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed",
+ { st = Status::InternalError("stream close
wait timeout"); });
+ if (!st.ok()) {
+ LOG(WARNING) << "close wait failed " << st;
+ }
}
}
_pool->erase(_load_id, _dst_id);
diff --git a/be/src/vec/sink/load_stream_stub_pool.h
b/be/src/vec/sink/load_stream_stub_pool.h
index e41083825be..b34383b25f9 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -76,7 +76,7 @@ class LoadStreams {
public:
LoadStreams(UniqueId load_id, int64_t dst_id, int num_use,
LoadStreamStubPool* pool);
- void release();
+ void release(Status status);
Streams& streams() { return _streams; }
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index d1e65c804d7..e02fba7c21c 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -456,7 +456,7 @@ Status VTabletWriterV2::_cancel(Status status) {
_delta_writer_for_tablet.reset();
}
for (const auto& [_, streams] : _streams_for_node) {
- streams->release();
+ streams->release(status);
}
return Status::OK();
}
@@ -513,7 +513,7 @@ Status VTabletWriterV2::close(Status exec_status) {
// defer stream release to prevent memory leak
Defer defer([&] {
for (const auto& [_, streams] : _streams_for_node) {
- streams->release();
+ streams->release(status);
}
_streams_for_node.clear();
});
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index 24da3bb6999..bea5443b4ff 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -32,6 +32,7 @@ TEST_F(LoadStreamStubPoolTest, test) {
LoadStreamStubPool pool;
int64_t src_id = 100;
PUniqueId load_id;
+ Status st = Status::OK();
load_id.set_hi(1);
load_id.set_hi(2);
auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
@@ -41,9 +42,9 @@ TEST_F(LoadStreamStubPoolTest, test) {
EXPECT_EQ(1, pool.templates_size());
EXPECT_EQ(streams1, streams3);
EXPECT_NE(streams1, streams2);
- streams1->release();
- streams2->release();
- streams3->release();
+ streams1->release(st);
+ streams2->release(st);
+ streams3->release(st);
EXPECT_EQ(0, pool.size());
EXPECT_EQ(0, pool.templates_size());
}
diff --git
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
new file mode 100644
index 00000000000..4a87f1daf6b
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") {
+ sql """ set enable_memtable_on_sink_node=true """
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("VTabletWriterV2.close.cancel")
+
GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait")
+ def res = sql "insert into test select * from baseall where k1 <= 3"
+ logger.info(res.toString())
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains("cancel"))
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("VTabletWriterV2.close.cancel")
+
GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait")
+ }
+
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """ set enable_memtable_on_sink_node=false """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]