This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d046a49ba5 [refactor](move-memtable) simplify LoadStreamStub::open 
(#34488)
8d046a49ba5 is described below

commit 8d046a49ba5c4084a4933e9a8a8266179e5539d4
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed May 8 09:49:07 2024 +0800

    [refactor](move-memtable) simplify LoadStreamStub::open (#34488)
---
 be/src/vec/sink/load_stream_stub.cpp         |  5 ++---
 be/src/vec/sink/load_stream_stub.h           |  5 ++---
 be/src/vec/sink/writer/vtablet_writer_v2.cpp | 12 ++++++------
 3 files changed, 10 insertions(+), 12 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 78e1bc691cc..155ce2de349 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -141,8 +141,7 @@ LoadStreamStub::~LoadStreamStub() {
 }
 
 // open_load_stream
-Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> self,
-                            BrpcClientCache<PBackendService_Stub>* 
client_cache,
+Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* 
client_cache,
                             const NodeInfo& node_info, int64_t txn_id,
                             const OlapTableSchemaParam& schema,
                             const std::vector<PTabletID>& tablets_for_schema, 
int total_streams,
@@ -157,7 +156,7 @@ Status LoadStreamStub::open(std::shared_ptr<LoadStreamStub> 
self,
     opt.max_buf_size = config::load_stream_max_buf_size;
     opt.idle_timeout_ms = idle_timeout_ms;
     opt.messages_in_batch = config::load_stream_messages_in_batch;
-    opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, self);
+    opt.handler = new LoadStreamReplyHandler(_load_id, _dst_id, 
shared_from_this());
     brpc::Controller cntl;
     if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) {
         delete opt.handler;
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 8ef40b84145..1f0d2e459d3 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -104,7 +104,7 @@ private:
     std::weak_ptr<LoadStreamStub> _stub;
 };
 
-class LoadStreamStub {
+class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
     friend class LoadStreamReplyHandler;
 
 public:
@@ -125,8 +125,7 @@ public:
             ~LoadStreamStub();
 
     // open_load_stream
-    Status open(std::shared_ptr<LoadStreamStub> self,
-                BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
+    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
                 int64_t txn_id, const OlapTableSchemaParam& schema,
                 const std::vector<PTabletID>& tablets_for_schema, int 
total_streams,
                 int64_t idle_timeout_ms, bool enable_profile);
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 603066154fb..f00626dd723 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -278,15 +278,15 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t 
dst_id, Streams& stream
     // get tablet schema from each backend only in the 1st stream
     for (auto& stream : streams | std::ranges::views::take(1)) {
         const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
-        RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
-                                     *node_info, _txn_id, *_schema, 
tablets_for_schema,
-                                     _total_streams, idle_timeout_ms, 
_state->enable_profile()));
+        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
+                                     _txn_id, *_schema, tablets_for_schema, 
_total_streams,
+                                     idle_timeout_ms, 
_state->enable_profile()));
     }
     // for the rest streams, open without getting tablet schema
     for (auto& stream : streams | std::ranges::views::drop(1)) {
-        RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
-                                     *node_info, _txn_id, *_schema, {}, 
_total_streams,
-                                     idle_timeout_ms, 
_state->enable_profile()));
+        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
+                                     _txn_id, *_schema, {}, _total_streams, 
idle_timeout_ms,
+                                     _state->enable_profile()));
     }
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to