This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 03a4fe6f39 [enhancement](streamload) make stream load context as 
shared ptr and save it in global load mgr (#16996)
03a4fe6f39 is described below

commit 03a4fe6f39d86419b05697db2d45857fc9f5de89
Author: yiguolei <[email protected]>
AuthorDate: Fri Feb 24 11:15:29 2023 +0800

    [enhancement](streamload) make stream load context as shared ptr and save 
it in global load mgr (#16996)
---
 be/src/http/action/stream_load.cpp                 | 42 ++++++------
 be/src/http/action/stream_load.h                   | 10 +--
 be/src/http/action/stream_load_2pc.cpp             | 20 +-----
 be/src/http/action/stream_load_2pc.h               |  1 -
 be/src/http/http_handler.h                         |  4 +-
 be/src/http/http_request.h                         |  7 +-
 be/src/io/file_factory.cpp                         |  6 +-
 be/src/runtime/fragment_mgr.cpp                    |  7 +-
 be/src/runtime/routine_load/data_consumer.cpp      |  8 +--
 be/src/runtime/routine_load/data_consumer.h        | 27 ++++----
 .../runtime/routine_load/data_consumer_group.cpp   |  4 +-
 be/src/runtime/routine_load/data_consumer_group.h  |  6 +-
 be/src/runtime/routine_load/data_consumer_pool.cpp |  5 +-
 be/src/runtime/routine_load/data_consumer_pool.h   |  5 +-
 .../routine_load/routine_load_task_executor.cpp    | 78 +++++++++-------------
 .../routine_load/routine_load_task_executor.h      | 16 +++--
 be/src/runtime/stream_load/new_load_stream_mgr.h   | 15 ++---
 be/src/runtime/stream_load/stream_load_context.h   | 12 ++--
 .../runtime/stream_load/stream_load_executor.cpp   | 13 ++--
 be/src/runtime/stream_load/stream_load_executor.h  |  2 +-
 20 files changed, 130 insertions(+), 158 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index a55c0e22c2..c0fc86b68b 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -144,7 +144,8 @@ StreamLoadAction::~StreamLoadAction() {
 }
 
 void StreamLoadAction::handle(HttpRequest* req) {
-    StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx();
+    std::shared_ptr<StreamLoadContext> ctx =
+            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
     if (ctx == nullptr) {
         return;
     }
@@ -161,7 +162,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
 
     if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
         if (ctx->need_rollback) {
-            _exec_env->stream_load_executor()->rollback_txn(ctx);
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
             ctx->need_rollback = false;
         }
         if (ctx->body_sink.get() != nullptr) {
@@ -185,7 +186,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
     streaming_load_current_processing->increment(-1);
 }
 
-Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
+Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
     if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
         LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
                      << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
@@ -206,12 +207,12 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
 
     if (ctx->two_phase_commit) {
         int64_t pre_commit_start_time = MonotonicNanos();
-        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx));
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
         ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
     } else {
         // If put file success we need commit this load
         int64_t commit_and_publish_start_time = MonotonicNanos();
-        RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx));
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
         ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
     }
     return Status::OK();
@@ -220,8 +221,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
 int StreamLoadAction::on_header(HttpRequest* req) {
     streaming_load_current_processing->increment(1);
 
-    StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
-    ctx->ref();
+    std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
     req->set_handler_ctx(ctx);
 
     ctx->load_type = TLoadType::MANUL_LOAD;
@@ -243,7 +243,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
     if (!st.ok()) {
         ctx->status = std::move(st);
         if (ctx->need_rollback) {
-            _exec_env->stream_load_executor()->rollback_txn(ctx);
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
             ctx->need_rollback = false;
         }
         if (ctx->body_sink.get() != nullptr) {
@@ -265,7 +265,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
     return 0;
 }
 
-Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* 
ctx) {
+Status StreamLoadAction::_on_header(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx) {
     // auth information
     if (!parse_basic_auth(*http_req, &ctx->auth)) {
         LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
@@ -334,7 +334,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
StreamLoadContext* ct
 
     // begin transaction
     int64_t begin_txn_start_time = MonotonicNanos();
-    RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx));
+    RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
     ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
 
     // process put file
@@ -342,7 +342,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
StreamLoadContext* ct
 }
 
 void StreamLoadAction::on_chunk_data(HttpRequest* req) {
-    StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx();
+    std::shared_ptr<StreamLoadContext> ctx =
+            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
     if (ctx == nullptr || !ctx->status.ok()) {
         return;
     }
@@ -367,8 +368,8 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
     ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
 }
 
-void StreamLoadAction::free_handler_ctx(void* param) {
-    StreamLoadContext* ctx = (StreamLoadContext*)param;
+void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
+    std::shared_ptr<StreamLoadContext> ctx = 
std::static_pointer_cast<StreamLoadContext>(param);
     if (ctx == nullptr) {
         return;
     }
@@ -376,12 +377,12 @@ void StreamLoadAction::free_handler_ctx(void* param) {
     if (ctx->body_sink != nullptr) {
         ctx->body_sink->cancel("sender is gone");
     }
-    if (ctx->unref()) {
-        delete ctx;
-    }
+    // remove stream load context from stream load manager and the resource 
will be released
+    ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
 }
 
-Status StreamLoadAction::_process_put(HttpRequest* http_req, 
StreamLoadContext* ctx) {
+Status StreamLoadAction::_process_put(HttpRequest* http_req,
+                                      std::shared_ptr<StreamLoadContext> ctx) {
     // Now we use stream
     ctx->use_streaming = is_format_support_streaming(ctx->format);
 
@@ -399,10 +400,10 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req, StreamLoadContext*
         auto pipe = std::make_shared<io::StreamLoadPipe>(
                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
                 ctx->body_bytes /* total_length */);
-        RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe));
-
         request.fileType = TFileType::FILE_STREAM;
         ctx->body_sink = pipe;
+        ctx->pipe = pipe;
+        RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
     } else {
         RETURN_IF_ERROR(_data_saved_path(http_req, &request.path));
         auto file_sink = std::make_shared<MessageBodyFileSink>(request.path);
@@ -621,7 +622,8 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, 
std::string* file_pa
     return Status::OK();
 }
 
-void StreamLoadAction::_save_stream_load_record(StreamLoadContext* ctx, const 
std::string& str) {
+void 
StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> 
ctx,
+                                                const std::string& str) {
     auto stream_load_recorder = 
StorageEngine::instance()->get_stream_load_recorder();
     if (stream_load_recorder != nullptr) {
         std::string key =
diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h
index c7b7a7d142..2dc3c762ad 100644
--- a/be/src/http/action/stream_load.h
+++ b/be/src/http/action/stream_load.h
@@ -42,14 +42,14 @@ public:
     int on_header(HttpRequest* req) override;
 
     void on_chunk_data(HttpRequest* req) override;
-    void free_handler_ctx(void* ctx) override;
+    void free_handler_ctx(std::shared_ptr<void> ctx) override;
 
 private:
-    Status _on_header(HttpRequest* http_req, StreamLoadContext* ctx);
-    Status _handle(StreamLoadContext* ctx);
+    Status _on_header(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
+    Status _handle(std::shared_ptr<StreamLoadContext> ctx);
     Status _data_saved_path(HttpRequest* req, std::string* file_path);
-    Status _process_put(HttpRequest* http_req, StreamLoadContext* ctx);
-    void _save_stream_load_record(StreamLoadContext* ctx, const std::string& 
str);
+    Status _process_put(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
+    void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, 
const std::string& str);
 
 private:
     ExecEnv* _exec_env;
diff --git a/be/src/http/action/stream_load_2pc.cpp 
b/be/src/http/action/stream_load_2pc.cpp
index bcc4d9f288..3a5690cb8f 100644
--- a/be/src/http/action/stream_load_2pc.cpp
+++ b/be/src/http/action/stream_load_2pc.cpp
@@ -40,9 +40,7 @@ void StreamLoad2PCAction::handle(HttpRequest* req) {
     Status status = Status::OK();
     std::string status_result;
 
-    StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
-    ctx->ref();
-    req->set_handler_ctx(ctx);
+    std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
     ctx->db = req->param(HTTP_DB_KEY);
     std::string req_txn_id = req->header(HTTP_TXN_ID_KEY);
     try {
@@ -66,7 +64,7 @@ void StreamLoad2PCAction::handle(HttpRequest* req) {
         status = Status::InternalError("no valid Basic authorization");
     }
 
-    status = _exec_env->stream_load_executor()->operate_txn_2pc(ctx);
+    status = _exec_env->stream_load_executor()->operate_txn_2pc(ctx.get());
 
     if (!status.ok()) {
         status_result = status.to_json();
@@ -93,18 +91,4 @@ std::string StreamLoad2PCAction::get_success_info(const 
std::string txn_id,
     return s.GetString();
 }
 
-void StreamLoad2PCAction::free_handler_ctx(void* param) {
-    StreamLoadContext* ctx = (StreamLoadContext*)param;
-    if (ctx == nullptr) {
-        return;
-    }
-    // sender is gone, make receiver know it
-    if (ctx->body_sink != nullptr) {
-        ctx->body_sink->cancel("sender is gone");
-    }
-    if (ctx->unref()) {
-        delete ctx;
-    }
-}
-
 } // namespace doris
diff --git a/be/src/http/action/stream_load_2pc.h 
b/be/src/http/action/stream_load_2pc.h
index 960850e20e..1a1013a12b 100644
--- a/be/src/http/action/stream_load_2pc.h
+++ b/be/src/http/action/stream_load_2pc.h
@@ -33,7 +33,6 @@ public:
 
     void handle(HttpRequest* req) override;
     std::string get_success_info(const std::string txn_id, const std::string 
txn_operation);
-    void free_handler_ctx(void* param) override;
 
 private:
     ExecEnv* _exec_env;
diff --git a/be/src/http/http_handler.h b/be/src/http/http_handler.h
index c2c108f1a0..f8058fb8a6 100644
--- a/be/src/http/http_handler.h
+++ b/be/src/http/http_handler.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <memory>
+
 namespace doris {
 
 class HttpRequest;
@@ -37,7 +39,7 @@ public:
     virtual int on_header(HttpRequest* req) { return 0; }
 
     virtual void on_chunk_data(HttpRequest* req) {}
-    virtual void free_handler_ctx(void* handler_ctx) {}
+    virtual void free_handler_ctx(std::shared_ptr<void> handler_ctx) {}
 };
 
 } // namespace doris
diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h
index 1503e4303a..a26be3a22c 100644
--- a/be/src/http/http_request.h
+++ b/be/src/http/http_request.h
@@ -21,6 +21,7 @@
 
 #include <boost/algorithm/string.hpp>
 #include <map>
+#include <memory>
 #include <string>
 
 #include "http/http_common.h"
@@ -74,8 +75,8 @@ public:
 
     struct evhttp_request* get_evhttp_request() const { return _ev_req; }
 
-    void* handler_ctx() const { return _handler_ctx; }
-    void set_handler_ctx(void* ctx) {
+    std::shared_ptr<void> handler_ctx() const { return _handler_ctx; }
+    void set_handler_ctx(std::shared_ptr<void> ctx) {
         DCHECK(_handler != nullptr);
         _handler_ctx = ctx;
     }
@@ -94,7 +95,7 @@ private:
     struct evhttp_request* _ev_req = nullptr;
     HttpHandler* _handler = nullptr;
 
-    void* _handler_ctx = nullptr;
+    std::shared_ptr<void> _handler_ctx;
     std::string _request_body;
 };
 
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 9324a83f27..3cb865be40 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -38,6 +38,7 @@
 #include "olap/iterators.h"
 #include "runtime/exec_env.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
+#include "runtime/stream_load/stream_load_context.h"
 
 namespace doris {
 
@@ -190,10 +191,11 @@ Status FileFactory::create_file_reader(RuntimeProfile* 
/*profile*/,
 
 // file scan node/stream load pipe
 Status FileFactory::create_pipe_reader(const TUniqueId& load_id, 
io::FileReaderSPtr* file_reader) {
-    *file_reader = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
-    if (!(*file_reader)) {
+    auto stream_load_ctx = 
ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
+    if (!stream_load_ctx) {
         return Status::InternalError("unknown stream load id: {}", 
UniqueId(load_id).to_string());
     }
+    *file_reader = stream_load_ctx->pipe;
     return Status::OK();
 }
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f2a2c9e798..02644f186d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -516,7 +516,8 @@ void 
FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state,
 
 Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
     if (params.txn_conf.need_txn) {
-        StreamLoadContext* stream_load_ctx = new StreamLoadContext(_exec_env);
+        std::shared_ptr<StreamLoadContext> stream_load_ctx =
+                std::make_shared<StreamLoadContext>(_exec_env);
         stream_load_ctx->db = params.txn_conf.db;
         stream_load_ctx->db_id = params.txn_conf.db_id;
         stream_load_ctx->table = params.txn_conf.tbl;
@@ -536,9 +537,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params) {
                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
                 -1 /* total_length */, true /* use_proto */);
         stream_load_ctx->body_sink = pipe;
+        stream_load_ctx->pipe = pipe;
         stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
 
-        
RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, 
pipe));
+        RETURN_IF_ERROR(
+                _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, 
stream_load_ctx));
 
         
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
         set_pipe(params.params.fragment_instance_id, pipe,
diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index f2e3464f47..88c00044e1 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -36,7 +36,7 @@ namespace doris {
 static const std::string PROP_GROUP_ID = "group.id";
 // init kafka consumer will only set common configs such as
 // brokers, groupid
-Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
+Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
     std::unique_lock<std::mutex> l(_lock);
     if (_init) {
         // this consumer has already been initialized.
@@ -139,7 +139,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
 
 Status KafkaDataConsumer::assign_topic_partitions(
         const std::map<int32_t, int64_t>& begin_partition_offset, const 
std::string& topic,
-        StreamLoadContext* ctx) {
+        std::shared_ptr<StreamLoadContext> ctx) {
     DCHECK(_k_consumer);
     // create TopicPartitions
     std::stringstream ss;
@@ -380,7 +380,7 @@ Status KafkaDataConsumer::get_latest_offsets_for_partitions(
     return Status::OK();
 }
 
-Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
+Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
     std::unique_lock<std::mutex> l(_lock);
     if (!_init) {
         return Status::InternalError("consumer is not initialized");
@@ -413,7 +413,7 @@ Status 
KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset)
 
 // if the kafka brokers and topic are same,
 // we considered this consumer as matched, thus can be reused.
-bool KafkaDataConsumer::match(StreamLoadContext* ctx) {
+bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
     if (ctx->load_src_type != TLoadSourceType::KAFKA) {
         return false;
     }
diff --git a/be/src/runtime/routine_load/data_consumer.h 
b/be/src/runtime/routine_load/data_consumer.h
index afd4d9f6f7..4ad3d6821f 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -33,7 +33,7 @@ class Status;
 
 class DataConsumer {
 public:
-    DataConsumer(StreamLoadContext* ctx)
+    DataConsumer()
             : _id(UniqueId::gen_uid()),
               _grp_id(UniqueId::gen_uid()),
               _has_grp(false),
@@ -44,18 +44,18 @@ public:
     virtual ~DataConsumer() {}
 
     // init the consumer with the given parameters
-    virtual Status init(StreamLoadContext* ctx) = 0;
+    virtual Status init(std::shared_ptr<StreamLoadContext> ctx) = 0;
     // start consuming
-    virtual Status consume(StreamLoadContext* ctx) = 0;
+    virtual Status consume(std::shared_ptr<StreamLoadContext> ctx) = 0;
     // cancel the consuming process.
     // if the consumer is not initialized, or the consuming
     // process is already finished, call cancel() will
     // return ERROR
-    virtual Status cancel(StreamLoadContext* ctx) = 0;
+    virtual Status cancel(std::shared_ptr<StreamLoadContext> ctx) = 0;
     // reset the data consumer before being reused
     virtual Status reset() = 0;
     // return true the if the consumer match the need
-    virtual bool match(StreamLoadContext* ctx) = 0;
+    virtual bool match(std::shared_ptr<StreamLoadContext> ctx) = 0;
 
     const UniqueId& id() { return _id; }
     time_t last_visit_time() { return _last_visit_time; }
@@ -109,10 +109,8 @@ public:
 
 class KafkaDataConsumer : public DataConsumer {
 public:
-    KafkaDataConsumer(StreamLoadContext* ctx)
-            : DataConsumer(ctx),
-              _brokers(ctx->kafka_info->brokers),
-              _topic(ctx->kafka_info->topic) {}
+    KafkaDataConsumer(std::shared_ptr<StreamLoadContext> ctx)
+            : _brokers(ctx->kafka_info->brokers), 
_topic(ctx->kafka_info->topic) {}
 
     virtual ~KafkaDataConsumer() {
         VLOG_NOTICE << "deconstruct consumer";
@@ -123,18 +121,19 @@ public:
         }
     }
 
-    virtual Status init(StreamLoadContext* ctx) override;
+    Status init(std::shared_ptr<StreamLoadContext> ctx) override;
     // TODO(cmy): currently do not implement single consumer start method, 
using group_consume
-    virtual Status consume(StreamLoadContext* ctx) override { return 
Status::OK(); }
-    virtual Status cancel(StreamLoadContext* ctx) override;
+    Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return 
Status::OK(); }
+    Status cancel(std::shared_ptr<StreamLoadContext> ctx) override;
     // reassign partition topics
     virtual Status reset() override;
-    virtual bool match(StreamLoadContext* ctx) override;
+    bool match(std::shared_ptr<StreamLoadContext> ctx) override;
     // commit kafka offset
     Status commit(std::vector<RdKafka::TopicPartition*>& offset);
 
     Status assign_topic_partitions(const std::map<int32_t, int64_t>& 
begin_partition_offset,
-                                   const std::string& topic, 
StreamLoadContext* ctx);
+                                   const std::string& topic,
+                                   std::shared_ptr<StreamLoadContext> ctx);
 
     // start the consumer and put msgs to queue
     Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t 
max_running_time_ms);
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp 
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 869d427568..2d22fa93ab 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -24,7 +24,7 @@
 
 namespace doris {
 
-Status KafkaDataConsumerGroup::assign_topic_partitions(StreamLoadContext* ctx) 
{
+Status 
KafkaDataConsumerGroup::assign_topic_partitions(std::shared_ptr<StreamLoadContext>
 ctx) {
     DCHECK(ctx->kafka_info);
     DCHECK(_consumers.size() >= 1);
 
@@ -63,7 +63,7 @@ KafkaDataConsumerGroup::~KafkaDataConsumerGroup() {
     DCHECK(_queue.get_size() == 0);
 }
 
-Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
+Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> 
ctx) {
     Status result_st = Status::OK();
     // start all consumers
     for (auto& consumer : _consumers) {
diff --git a/be/src/runtime/routine_load/data_consumer_group.h 
b/be/src/runtime/routine_load/data_consumer_group.h
index b105a3c9db..bd866f0468 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -46,7 +46,7 @@ public:
     }
 
     // start all consumers
-    virtual Status start_all(StreamLoadContext* ctx) { return Status::OK(); }
+    virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx) { return 
Status::OK(); }
 
 protected:
     UniqueId _grp_id;
@@ -68,9 +68,9 @@ public:
 
     virtual ~KafkaDataConsumerGroup();
 
-    virtual Status start_all(StreamLoadContext* ctx) override;
+    Status start_all(std::shared_ptr<StreamLoadContext> ctx) override;
     // assign topic partitions to all consumers equally
-    Status assign_topic_partitions(StreamLoadContext* ctx);
+    Status assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx);
 
 private:
     // start a single consumer
diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp 
b/be/src/runtime/routine_load/data_consumer_pool.cpp
index 31f666d3ab..579edfd201 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.cpp
+++ b/be/src/runtime/routine_load/data_consumer_pool.cpp
@@ -22,7 +22,8 @@
 
 namespace doris {
 
-Status DataConsumerPool::get_consumer(StreamLoadContext* ctx, 
std::shared_ptr<DataConsumer>* ret) {
+Status DataConsumerPool::get_consumer(std::shared_ptr<StreamLoadContext> ctx,
+                                      std::shared_ptr<DataConsumer>* ret) {
     std::unique_lock<std::mutex> l(_lock);
 
     // check if there is an available consumer.
@@ -58,7 +59,7 @@ Status DataConsumerPool::get_consumer(StreamLoadContext* ctx, 
std::shared_ptr<Da
     return Status::OK();
 }
 
-Status DataConsumerPool::get_consumer_grp(StreamLoadContext* ctx,
+Status DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext> 
ctx,
                                           std::shared_ptr<DataConsumerGroup>* 
ret) {
     if (ctx->load_src_type != TLoadSourceType::KAFKA) {
         return Status::InternalError(
diff --git a/be/src/runtime/routine_load/data_consumer_pool.h 
b/be/src/runtime/routine_load/data_consumer_pool.h
index 5ea39d0a6c..6f8cf79f74 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.h
+++ b/be/src/runtime/routine_load/data_consumer_pool.h
@@ -49,10 +49,11 @@ public:
 
     // get a already initialized consumer from cache,
     // if not found in cache, create a new one.
-    Status get_consumer(StreamLoadContext* ctx, std::shared_ptr<DataConsumer>* 
ret);
+    Status get_consumer(std::shared_ptr<StreamLoadContext> ctx, 
std::shared_ptr<DataConsumer>* ret);
 
     // get several consumers and put them into group
-    Status get_consumer_grp(StreamLoadContext* ctx, 
std::shared_ptr<DataConsumerGroup>* ret);
+    Status get_consumer_grp(std::shared_ptr<StreamLoadContext> ctx,
+                            std::shared_ptr<DataConsumerGroup>* ret);
 
     // return the consumer to the pool
     void return_consumer(std::shared_ptr<DataConsumer> consumer);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index da8842ac59..86f56ee756 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -55,19 +55,13 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
     _thread_pool.join();
 
     LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup";
-    for (auto it = _task_map.begin(); it != _task_map.end(); ++it) {
-        auto ctx = it->second;
-        if (ctx->unref()) {
-            delete ctx;
-        }
-    }
     _task_map.clear();
 }
 
 // Create a temp StreamLoadContext and set some kafka connection info in it.
 // So that we can use this ctx to get kafka data consumer instance.
 Status RoutineLoadTaskExecutor::_prepare_ctx(const PKafkaMetaProxyRequest& 
request,
-                                             StreamLoadContext* ctx) {
+                                             
std::shared_ptr<StreamLoadContext> ctx) {
     ctx->load_type = TLoadType::ROUTINE_LOAD;
     ctx->load_src_type = TLoadSourceType::KAFKA;
     ctx->label = "NaN";
@@ -93,11 +87,11 @@ Status 
RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
     CHECK(request.has_kafka_info());
 
     // This context is meaningless, just for unifing the interface
-    StreamLoadContext ctx(_exec_env);
-    RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
+    std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
+    RETURN_IF_ERROR(_prepare_ctx(request, ctx));
 
     std::shared_ptr<DataConsumer> consumer;
-    RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
+    RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer));
 
     Status st = 
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_partition_meta(
             partition_ids);
@@ -112,11 +106,11 @@ Status 
RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
     CHECK(request.has_kafka_info());
 
     // This context is meaningless, just for unifing the interface
-    StreamLoadContext ctx(_exec_env);
-    RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
+    std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
+    RETURN_IF_ERROR(_prepare_ctx(request, ctx));
 
     std::shared_ptr<DataConsumer> consumer;
-    RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
+    RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer));
 
     Status st = 
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_offsets_for_times(
             std::vector<PIntegerPair>(request.offset_times().begin(), 
request.offset_times().end()),
@@ -132,11 +126,11 @@ Status 
RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(
     CHECK(request.has_kafka_info());
 
     // This context is meaningless, just for unifing the interface
-    StreamLoadContext ctx(_exec_env);
-    RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
+    std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
+    RETURN_IF_ERROR(_prepare_ctx(request, ctx));
 
     std::shared_ptr<DataConsumer> consumer;
-    RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
+    RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer));
 
     Status st =
             std::static_pointer_cast<KafkaDataConsumer>(consumer)
@@ -168,7 +162,7 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
     }
 
     // create the context
-    StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
+    std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
     ctx->load_type = TLoadType::ROUTINE_LOAD;
     ctx->load_src_type = task.type;
     ctx->job_id = task.job_id;
@@ -212,35 +206,28 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
         break;
     default:
         LOG(WARNING) << "unknown load source type: " << task.type;
-        delete ctx;
         return Status::InternalError("unknown load source type");
     }
 
     VLOG_CRITICAL << "receive a new routine load task: " << ctx->brief();
     // register the task
-    ctx->ref();
     _task_map[ctx->id] = ctx;
 
     // offer the task to thread pool
-    if 
(!_thread_pool.offer(std::bind<void>(&RoutineLoadTaskExecutor::exec_task, this, 
ctx,
-                                            &_data_consumer_pool, 
[this](StreamLoadContext* ctx) {
-                                                std::unique_lock<std::mutex> 
l(_lock);
-                                                _task_map.erase(ctx->id);
-                                                LOG(INFO) << "finished routine 
load task "
-                                                          << ctx->brief()
-                                                          << ", status: " << 
ctx->status
-                                                          << ", current tasks 
num: "
-                                                          << _task_map.size();
-                                                if (ctx->unref()) {
-                                                    delete ctx;
-                                                }
-                                            }))) {
+    if (!_thread_pool.offer(std::bind<void>(
+                &RoutineLoadTaskExecutor::exec_task, this, ctx, 
&_data_consumer_pool,
+                [this](std::shared_ptr<StreamLoadContext> ctx) {
+                    std::unique_lock<std::mutex> l(_lock);
+                    ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
+                    _task_map.erase(ctx->id);
+                    LOG(INFO) << "finished routine load task " << ctx->brief()
+                              << ", status: " << ctx->status
+                              << ", current tasks num: " << _task_map.size();
+                }))) {
         // failed to submit task, clear and return
         LOG(WARNING) << "failed to submit routine load task: " << ctx->brief();
+        ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
         _task_map.erase(ctx->id);
-        if (ctx->unref()) {
-            delete ctx;
-        }
         return Status::InternalError("failed to submit routine load task");
     } else {
         LOG(INFO) << "submit a new routine load task: " << ctx->brief()
@@ -249,8 +236,8 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
     }
 }
 
-void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, 
DataConsumerPool* consumer_pool,
-                                        ExecFinishCallback cb) {
+void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
+                                        DataConsumerPool* consumer_pool, 
ExecFinishCallback cb) {
 #define HANDLE_ERROR(stmt, err_msg)                                        \
     do {                                                                   \
         Status _status_ = (stmt);                                          \
@@ -292,7 +279,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* 
ctx, DataConsumerPool
     ctx->body_sink = pipe;
 
     // must put pipe before executing plan fragment
-    HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe), "failed 
to add pipe");
+    HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx), "failed 
to add pipe");
 
 #ifndef BE_TEST
     // execute plan fragment, async
@@ -316,7 +303,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* 
ctx, DataConsumerPool
     consumer_pool->return_consumers(consumer_grp.get());
 
     // commit txn
-    HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit 
failed");
+    HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()), 
"commit failed");
 
     // commit kafka offset
     switch (ctx->load_src_type) {
@@ -358,12 +345,12 @@ void 
RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
     cb(ctx);
 }
 
-void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const 
Status& st,
+void RoutineLoadTaskExecutor::err_handler(std::shared_ptr<StreamLoadContext> 
ctx, const Status& st,
                                           const std::string& err_msg) {
     LOG(WARNING) << err_msg << ", routine load task: " << ctx->brief(true);
     ctx->status = st;
     if (ctx->need_rollback) {
-        _exec_env->stream_load_executor()->rollback_txn(ctx);
+        _exec_env->stream_load_executor()->rollback_txn(ctx.get());
         ctx->need_rollback = false;
     }
     if (ctx->body_sink != nullptr) {
@@ -372,10 +359,10 @@ void 
RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status&
 }
 
 // for test only
-Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) 
{
+Status 
RoutineLoadTaskExecutor::_execute_plan_for_test(std::shared_ptr<StreamLoadContext>
 ctx) {
     auto mock_consumer = [this, ctx]() {
-        ctx->ref();
-        std::shared_ptr<io::StreamLoadPipe> pipe = 
_exec_env->new_load_stream_mgr()->get(ctx->id);
+        std::shared_ptr<io::StreamLoadPipe> pipe = 
std::static_pointer_cast<io::StreamLoadPipe>(
+                _exec_env->new_load_stream_mgr()->get(ctx->id)->body_sink);
         std::stringstream ss;
         while (true) {
             char one;
@@ -403,9 +390,6 @@ Status 
RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) {
                 ss << one;
             }
         }
-        if (ctx->unref()) {
-            delete ctx;
-        }
     };
 
     std::thread t1(mock_consumer);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index 6c391a6888..8b46e7bd8d 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -19,6 +19,7 @@
 
 #include <functional>
 #include <map>
+#include <memory>
 #include <mutex>
 
 #include "gen_cpp/internal_service.pb.h"
@@ -40,7 +41,7 @@ class TRoutineLoadTask;
 // to FE finally.
 class RoutineLoadTaskExecutor {
 public:
-    typedef std::function<void(StreamLoadContext*)> ExecFinishCallback;
+    using ExecFinishCallback = 
std::function<void(std::shared_ptr<StreamLoadContext>)>;
 
     RoutineLoadTaskExecutor(ExecEnv* exec_env);
 
@@ -60,14 +61,17 @@ public:
 
 private:
     // execute the task
-    void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool, 
ExecFinishCallback cb);
+    void exec_task(std::shared_ptr<StreamLoadContext> ctx, DataConsumerPool* 
pool,
+                   ExecFinishCallback cb);
 
-    void err_handler(StreamLoadContext* ctx, const Status& st, const 
std::string& err_msg);
+    void err_handler(std::shared_ptr<StreamLoadContext> ctx, const Status& st,
+                     const std::string& err_msg);
 
     // for test only
-    Status _execute_plan_for_test(StreamLoadContext* ctx);
+    Status _execute_plan_for_test(std::shared_ptr<StreamLoadContext> ctx);
     // create a dummy StreamLoadContext for PKafkaMetaProxyRequest
-    Status _prepare_ctx(const PKafkaMetaProxyRequest& request, 
StreamLoadContext* ctx);
+    Status _prepare_ctx(const PKafkaMetaProxyRequest& request,
+                        std::shared_ptr<StreamLoadContext> ctx);
 
 private:
     ExecEnv* _exec_env;
@@ -76,7 +80,7 @@ private:
 
     std::mutex _lock;
     // task id -> load context
-    std::unordered_map<UniqueId, StreamLoadContext*> _task_map;
+    std::unordered_map<UniqueId, std::shared_ptr<StreamLoadContext>> _task_map;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h 
b/be/src/runtime/stream_load/new_load_stream_mgr.h
index 9ab2030487..bb245232dc 100644
--- a/be/src/runtime/stream_load/new_load_stream_mgr.h
+++ b/be/src/runtime/stream_load/new_load_stream_mgr.h
@@ -21,19 +21,20 @@
 #include <mutex>
 #include <unordered_map>
 
-#include "io/fs/stream_load_pipe.h"
+#include "common/status.h"
 #include "util/doris_metrics.h"
 #include "util/uid_util.h"
 
 namespace doris {
 
+class StreamLoadContext;
 // used to register all streams in process so that other module can get this 
stream
 class NewLoadStreamMgr {
 public:
     NewLoadStreamMgr();
     ~NewLoadStreamMgr();
 
-    Status put(const UniqueId& id, std::shared_ptr<io::StreamLoadPipe> stream) 
{
+    Status put(const UniqueId& id, std::shared_ptr<StreamLoadContext> stream) {
         std::lock_guard<std::mutex> l(_lock);
         auto it = _stream_map.find(id);
         if (it != std::end(_stream_map)) {
@@ -44,15 +45,13 @@ public:
         return Status::OK();
     }
 
-    std::shared_ptr<io::StreamLoadPipe> get(const UniqueId& id) {
+    std::shared_ptr<StreamLoadContext> get(const UniqueId& id) {
         std::lock_guard<std::mutex> l(_lock);
         auto it = _stream_map.find(id);
         if (it == std::end(_stream_map)) {
-            return nullptr;
+            return std::shared_ptr<StreamLoadContext>(nullptr);
         }
-        auto stream = it->second;
-        _stream_map.erase(it);
-        return stream;
+        return it->second;
     }
 
     void remove(const UniqueId& id) {
@@ -66,6 +65,6 @@ public:
 
 private:
     std::mutex _lock;
-    std::unordered_map<UniqueId, std::shared_ptr<io::StreamLoadPipe>> 
_stream_map;
+    std::unordered_map<UniqueId, std::shared_ptr<StreamLoadContext>> 
_stream_map;
 };
 } // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index ef3602ab8f..8951630afb 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -27,7 +27,9 @@
 #include "common/utils.h"
 #include "gen_cpp/BackendService_types.h"
 #include "gen_cpp/FrontendService_types.h"
+#include "io/fs/stream_load_pipe.h"
 #include "runtime/exec_env.h"
+#include "runtime/message_body_sink.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_executor.h"
 #include "service/backend_options.h"
@@ -82,7 +84,7 @@ class MessageBodySink;
 
 class StreamLoadContext {
 public:
-    StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), 
_exec_env(exec_env), _refs(0) {
+    StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), 
_exec_env(exec_env) {
         start_millis = UnixMillis();
     }
 
@@ -91,8 +93,6 @@ public:
             _exec_env->stream_load_executor()->rollback_txn(this);
             need_rollback = false;
         }
-
-        _exec_env->new_load_stream_mgr()->remove(id);
     }
 
     std::string to_json() const;
@@ -109,10 +109,6 @@ public:
     // also print the load source info if detail is set to true
     std::string brief(bool detail = false) const;
 
-    void ref() { _refs.fetch_add(1); }
-    // If unref() returns true, this object should be delete
-    bool unref() { return _refs.fetch_sub(1) == 1; }
-
 public:
     // load type, eg: ROUTINE LOAD/MANUAL LOAD
     TLoadType::type load_type;
@@ -164,6 +160,7 @@ public:
     TFileCompressType::type compress_type = TFileCompressType::UNKNOWN;
 
     std::shared_ptr<MessageBodySink> body_sink;
+    std::shared_ptr<io::StreamLoadPipe> pipe;
 
     TStreamLoadPutResult put_result;
 
@@ -211,7 +208,6 @@ public:
 
 private:
     ExecEnv* _exec_env;
-    std::atomic<int> _refs;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 61244bbd9f..e9d6c105f9 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -39,15 +39,15 @@ TLoadTxnRollbackResult k_stream_load_rollback_result;
 Status k_stream_load_plan_status;
 #endif
 
-Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
+Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext> 
ctx) {
 // submit this params
 #ifndef BE_TEST
-    ctx->ref();
     ctx->start_write_data_nanos = MonotonicNanos();
     LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" 
<< ctx->txn_id
               << ", query_id=" << 
print_id(ctx->put_result.params.params.query_id);
     auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
             ctx->put_result.params, [ctx, this](RuntimeState* state, Status* 
status) {
+                ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
                 ctx->commit_infos = std::move(state->tablet_commit_infos());
                 if (status->ok()) {
                     ctx->number_total_rows = state->num_rows_load_total();
@@ -110,19 +110,14 @@ Status 
StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
                 if (ctx->need_commit_self && ctx->body_sink != nullptr) {
                     if (ctx->body_sink->cancelled() || !status->ok()) {
                         ctx->status = *status;
-                        this->rollback_txn(ctx);
+                        this->rollback_txn(ctx.get());
                     } else {
-                        this->commit_txn(ctx);
+                        this->commit_txn(ctx.get());
                     }
                 }
-
-                if (ctx->unref()) {
-                    delete ctx;
-                }
             });
     if (!st.ok()) {
         // no need to check unref's return value
-        ctx->unref();
         return st;
     }
 #else
diff --git a/be/src/runtime/stream_load/stream_load_executor.h 
b/be/src/runtime/stream_load/stream_load_executor.h
index 2c464dea75..b90cef0a06 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/runtime/stream_load/stream_load_executor.h
@@ -44,7 +44,7 @@ public:
 
     void rollback_txn(StreamLoadContext* ctx);
 
-    Status execute_plan_fragment(StreamLoadContext* ctx);
+    Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx);
 
 private:
     // collect the load statistics from context and set them to stat


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to