This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 69a01e0cf5c [improve](move-memtable) skip load stream stub close wait 
when cancel (#29427)
69a01e0cf5c is described below

commit 69a01e0cf5c704c07b78a87ea315effacfceb08b
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Tue Jan 2 23:35:50 2024 +0800

    [improve](move-memtable) skip load stream stub close wait when cancel 
(#29427)
---
 be/src/vec/sink/load_stream_stub.cpp               |  1 -
 be/src/vec/sink/load_stream_stub.h                 |  5 ++
 be/src/vec/sink/load_stream_stub_pool.cpp          | 16 ++--
 be/src/vec/sink/load_stream_stub_pool.h            |  2 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  4 +-
 be/test/vec/exec/load_stream_stub_pool_test.cpp    |  7 +-
 ...d_stream_stub_close_wait_fault_injection.groovy | 88 ++++++++++++++++++++++
 7 files changed, 109 insertions(+), 14 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 6863faa4ac3..9ad8d8805ca 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -21,7 +21,6 @@
 
 #include "olap/rowset/rowset_writer.h"
 #include "util/brpc_client_cache.h"
-#include "util/debug_points.h"
 #include "util/network_util.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 5c9b3d2f31d..a5c9d7464a4 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -60,6 +60,7 @@
 #include "runtime/thread_context.h"
 #include "runtime/types.h"
 #include "util/countdown_latch.h"
+#include "util/debug_points.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
 #include "vec/columns/column.h"
@@ -183,6 +184,10 @@ public:
     // wait remote to close stream,
     // remote will close stream when it receives CLOSE_LOAD
     Status close_wait(int64_t timeout_ms = 0) {
+        DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", {
+            while (true) {
+            };
+        });
         if (!_is_init.load() || _handler.is_closed()) {
             return Status::OK();
         }
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index c867756d8fb..3cf168a131e 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -26,7 +26,7 @@ class TExpr;
 LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, 
LoadStreamStubPool* pool)
         : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
 
-void LoadStreams::release() {
+void LoadStreams::release(Status status) {
     int num_use = --_use_cnt;
     DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
     if (num_use == 0) {
@@ -39,12 +39,14 @@ void LoadStreams::release() {
                 LOG(WARNING) << "close stream failed " << st;
             }
         }
-        for (auto& stream : _streams) {
-            auto st = stream->close_wait();
-            DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed",
-                            { st = Status::InternalError("stream close wait 
timeout"); });
-            if (!st.ok()) {
-                LOG(WARNING) << "close wait failed " << st;
+        if (status.ok()) {
+            for (auto& stream : _streams) {
+                auto st = stream->close_wait();
+                DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed",
+                                { st = Status::InternalError("stream close 
wait timeout"); });
+                if (!st.ok()) {
+                    LOG(WARNING) << "close wait failed " << st;
+                }
             }
         }
         _pool->erase(_load_id, _dst_id);
diff --git a/be/src/vec/sink/load_stream_stub_pool.h 
b/be/src/vec/sink/load_stream_stub_pool.h
index e41083825be..b34383b25f9 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -76,7 +76,7 @@ class LoadStreams {
 public:
     LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, 
LoadStreamStubPool* pool);
 
-    void release();
+    void release(Status status);
 
     Streams& streams() { return _streams; }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index d1e65c804d7..e02fba7c21c 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -456,7 +456,7 @@ Status VTabletWriterV2::_cancel(Status status) {
         _delta_writer_for_tablet.reset();
     }
     for (const auto& [_, streams] : _streams_for_node) {
-        streams->release();
+        streams->release(status);
     }
     return Status::OK();
 }
@@ -513,7 +513,7 @@ Status VTabletWriterV2::close(Status exec_status) {
         // defer stream release to prevent memory leak
         Defer defer([&] {
             for (const auto& [_, streams] : _streams_for_node) {
-                streams->release();
+                streams->release(status);
             }
             _streams_for_node.clear();
         });
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp 
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index 24da3bb6999..bea5443b4ff 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -32,6 +32,7 @@ TEST_F(LoadStreamStubPoolTest, test) {
     LoadStreamStubPool pool;
     int64_t src_id = 100;
     PUniqueId load_id;
+    Status st = Status::OK();
     load_id.set_hi(1);
     load_id.set_hi(2);
     auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
@@ -41,9 +42,9 @@ TEST_F(LoadStreamStubPoolTest, test) {
     EXPECT_EQ(1, pool.templates_size());
     EXPECT_EQ(streams1, streams3);
     EXPECT_NE(streams1, streams2);
-    streams1->release();
-    streams2->release();
-    streams3->release();
+    streams1->release(st);
+    streams2->release(st);
+    streams3->release(st);
     EXPECT_EQ(0, pool.size());
     EXPECT_EQ(0, pool.templates_size());
 }
diff --git 
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
new file mode 100644
index 00000000000..4a87f1daf6b
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") {
+    sql """ set enable_memtable_on_sink_node=true """
+    sql """ DROP TABLE IF EXISTS `baseall` """
+    sql """ DROP TABLE IF EXISTS `test` """
+    sql """
+        CREATE TABLE IF NOT EXISTS `baseall` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+    sql """
+        CREATE TABLE IF NOT EXISTS `test` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace_if_not_null null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    streamLoad {
+        table "baseall"
+        db "regression_test_fault_injection_p0"
+        set 'column_separator', ','
+        file "baseall.txt"
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("VTabletWriterV2.close.cancel")
+        
GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait")
+        def res = sql "insert into test select * from baseall where k1 <= 3"
+        logger.info(res.toString())
+    } catch(Exception e) {
+        logger.info(e.getMessage())
+        assertTrue(e.getMessage().contains("cancel"))
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("VTabletWriterV2.close.cancel")
+        
GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait")
+    }
+
+    sql """ DROP TABLE IF EXISTS `baseall` """
+    sql """ DROP TABLE IF EXISTS `test` """
+    sql """ set enable_memtable_on_sink_node=false """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to