This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 03a4fe6f39 [enhancement](streamload) make stream load context as
shared ptr and save it in global load mgr (#16996)
03a4fe6f39 is described below
commit 03a4fe6f39d86419b05697db2d45857fc9f5de89
Author: yiguolei <[email protected]>
AuthorDate: Fri Feb 24 11:15:29 2023 +0800
[enhancement](streamload) make stream load context as shared ptr and save
it in global load mgr (#16996)
---
be/src/http/action/stream_load.cpp | 42 ++++++------
be/src/http/action/stream_load.h | 10 +--
be/src/http/action/stream_load_2pc.cpp | 20 +-----
be/src/http/action/stream_load_2pc.h | 1 -
be/src/http/http_handler.h | 4 +-
be/src/http/http_request.h | 7 +-
be/src/io/file_factory.cpp | 6 +-
be/src/runtime/fragment_mgr.cpp | 7 +-
be/src/runtime/routine_load/data_consumer.cpp | 8 +--
be/src/runtime/routine_load/data_consumer.h | 27 ++++----
.../runtime/routine_load/data_consumer_group.cpp | 4 +-
be/src/runtime/routine_load/data_consumer_group.h | 6 +-
be/src/runtime/routine_load/data_consumer_pool.cpp | 5 +-
be/src/runtime/routine_load/data_consumer_pool.h | 5 +-
.../routine_load/routine_load_task_executor.cpp | 78 +++++++++-------------
.../routine_load/routine_load_task_executor.h | 16 +++--
be/src/runtime/stream_load/new_load_stream_mgr.h | 15 ++---
be/src/runtime/stream_load/stream_load_context.h | 12 ++--
.../runtime/stream_load/stream_load_executor.cpp | 13 ++--
be/src/runtime/stream_load/stream_load_executor.h | 2 +-
20 files changed, 130 insertions(+), 158 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index a55c0e22c2..c0fc86b68b 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -144,7 +144,8 @@ StreamLoadAction::~StreamLoadAction() {
}
void StreamLoadAction::handle(HttpRequest* req) {
- StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx();
+ std::shared_ptr<StreamLoadContext> ctx =
+ std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
if (ctx == nullptr) {
return;
}
@@ -161,7 +162,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
if (ctx->need_rollback) {
- _exec_env->stream_load_executor()->rollback_txn(ctx);
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
@@ -185,7 +186,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
streaming_load_current_processing->increment(-1);
}
-Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
+Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes,
body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" <<
ctx->id;
@@ -206,12 +207,12 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
-
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx));
+
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() -
pre_commit_start_time;
} else {
// If put file success we need commit this load
int64_t commit_and_publish_start_time = MonotonicNanos();
- RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx));
+
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() -
commit_and_publish_start_time;
}
return Status::OK();
@@ -220,8 +221,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
int StreamLoadAction::on_header(HttpRequest* req) {
streaming_load_current_processing->increment(1);
- StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
- ctx->ref();
+ std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
req->set_handler_ctx(ctx);
ctx->load_type = TLoadType::MANUL_LOAD;
@@ -243,7 +243,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
if (!st.ok()) {
ctx->status = std::move(st);
if (ctx->need_rollback) {
- _exec_env->stream_load_executor()->rollback_txn(ctx);
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
@@ -265,7 +265,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
return 0;
}
-Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext*
ctx) {
+Status StreamLoadAction::_on_header(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx) {
// auth information
if (!parse_basic_auth(*http_req, &ctx->auth)) {
LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
@@ -334,7 +334,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req,
StreamLoadContext* ct
// begin transaction
int64_t begin_txn_start_time = MonotonicNanos();
- RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx));
+ RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
// process put file
@@ -342,7 +342,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req,
StreamLoadContext* ct
}
void StreamLoadAction::on_chunk_data(HttpRequest* req) {
- StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx();
+ std::shared_ptr<StreamLoadContext> ctx =
+ std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
if (ctx == nullptr || !ctx->status.ok()) {
return;
}
@@ -367,8 +368,8 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
}
-void StreamLoadAction::free_handler_ctx(void* param) {
- StreamLoadContext* ctx = (StreamLoadContext*)param;
+void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
+ std::shared_ptr<StreamLoadContext> ctx =
std::static_pointer_cast<StreamLoadContext>(param);
if (ctx == nullptr) {
return;
}
@@ -376,12 +377,12 @@ void StreamLoadAction::free_handler_ctx(void* param) {
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel("sender is gone");
}
- if (ctx->unref()) {
- delete ctx;
- }
+ // remove stream load context from stream load manager and the resource
will be released
+ ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
}
-Status StreamLoadAction::_process_put(HttpRequest* http_req,
StreamLoadContext* ctx) {
+Status StreamLoadAction::_process_put(HttpRequest* http_req,
+ std::shared_ptr<StreamLoadContext> ctx) {
// Now we use stream
ctx->use_streaming = is_format_support_streaming(ctx->format);
@@ -399,10 +400,10 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req, StreamLoadContext*
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024
/* min_chunk_size */,
ctx->body_bytes /* total_length */);
- RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe));
-
request.fileType = TFileType::FILE_STREAM;
ctx->body_sink = pipe;
+ ctx->pipe = pipe;
+ RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
} else {
RETURN_IF_ERROR(_data_saved_path(http_req, &request.path));
auto file_sink = std::make_shared<MessageBodyFileSink>(request.path);
@@ -621,7 +622,8 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req,
std::string* file_pa
return Status::OK();
}
-void StreamLoadAction::_save_stream_load_record(StreamLoadContext* ctx, const
std::string& str) {
+void
StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext>
ctx,
+ const std::string& str) {
auto stream_load_recorder =
StorageEngine::instance()->get_stream_load_recorder();
if (stream_load_recorder != nullptr) {
std::string key =
diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h
index c7b7a7d142..2dc3c762ad 100644
--- a/be/src/http/action/stream_load.h
+++ b/be/src/http/action/stream_load.h
@@ -42,14 +42,14 @@ public:
int on_header(HttpRequest* req) override;
void on_chunk_data(HttpRequest* req) override;
- void free_handler_ctx(void* ctx) override;
+ void free_handler_ctx(std::shared_ptr<void> ctx) override;
private:
- Status _on_header(HttpRequest* http_req, StreamLoadContext* ctx);
- Status _handle(StreamLoadContext* ctx);
+ Status _on_header(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
+ Status _handle(std::shared_ptr<StreamLoadContext> ctx);
Status _data_saved_path(HttpRequest* req, std::string* file_path);
- Status _process_put(HttpRequest* http_req, StreamLoadContext* ctx);
- void _save_stream_load_record(StreamLoadContext* ctx, const std::string&
str);
+ Status _process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
+ void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str);
private:
ExecEnv* _exec_env;
diff --git a/be/src/http/action/stream_load_2pc.cpp
b/be/src/http/action/stream_load_2pc.cpp
index bcc4d9f288..3a5690cb8f 100644
--- a/be/src/http/action/stream_load_2pc.cpp
+++ b/be/src/http/action/stream_load_2pc.cpp
@@ -40,9 +40,7 @@ void StreamLoad2PCAction::handle(HttpRequest* req) {
Status status = Status::OK();
std::string status_result;
- StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
- ctx->ref();
- req->set_handler_ctx(ctx);
+ std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
ctx->db = req->param(HTTP_DB_KEY);
std::string req_txn_id = req->header(HTTP_TXN_ID_KEY);
try {
@@ -66,7 +64,7 @@ void StreamLoad2PCAction::handle(HttpRequest* req) {
status = Status::InternalError("no valid Basic authorization");
}
- status = _exec_env->stream_load_executor()->operate_txn_2pc(ctx);
+ status = _exec_env->stream_load_executor()->operate_txn_2pc(ctx.get());
if (!status.ok()) {
status_result = status.to_json();
@@ -93,18 +91,4 @@ std::string StreamLoad2PCAction::get_success_info(const
std::string txn_id,
return s.GetString();
}
-void StreamLoad2PCAction::free_handler_ctx(void* param) {
- StreamLoadContext* ctx = (StreamLoadContext*)param;
- if (ctx == nullptr) {
- return;
- }
- // sender is gone, make receiver know it
- if (ctx->body_sink != nullptr) {
- ctx->body_sink->cancel("sender is gone");
- }
- if (ctx->unref()) {
- delete ctx;
- }
-}
-
} // namespace doris
diff --git a/be/src/http/action/stream_load_2pc.h
b/be/src/http/action/stream_load_2pc.h
index 960850e20e..1a1013a12b 100644
--- a/be/src/http/action/stream_load_2pc.h
+++ b/be/src/http/action/stream_load_2pc.h
@@ -33,7 +33,6 @@ public:
void handle(HttpRequest* req) override;
std::string get_success_info(const std::string txn_id, const std::string
txn_operation);
- void free_handler_ctx(void* param) override;
private:
ExecEnv* _exec_env;
diff --git a/be/src/http/http_handler.h b/be/src/http/http_handler.h
index c2c108f1a0..f8058fb8a6 100644
--- a/be/src/http/http_handler.h
+++ b/be/src/http/http_handler.h
@@ -17,6 +17,8 @@
#pragma once
+#include <memory>
+
namespace doris {
class HttpRequest;
@@ -37,7 +39,7 @@ public:
virtual int on_header(HttpRequest* req) { return 0; }
virtual void on_chunk_data(HttpRequest* req) {}
- virtual void free_handler_ctx(void* handler_ctx) {}
+ virtual void free_handler_ctx(std::shared_ptr<void> handler_ctx) {}
};
} // namespace doris
diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h
index 1503e4303a..a26be3a22c 100644
--- a/be/src/http/http_request.h
+++ b/be/src/http/http_request.h
@@ -21,6 +21,7 @@
#include <boost/algorithm/string.hpp>
#include <map>
+#include <memory>
#include <string>
#include "http/http_common.h"
@@ -74,8 +75,8 @@ public:
struct evhttp_request* get_evhttp_request() const { return _ev_req; }
- void* handler_ctx() const { return _handler_ctx; }
- void set_handler_ctx(void* ctx) {
+ std::shared_ptr<void> handler_ctx() const { return _handler_ctx; }
+ void set_handler_ctx(std::shared_ptr<void> ctx) {
DCHECK(_handler != nullptr);
_handler_ctx = ctx;
}
@@ -94,7 +95,7 @@ private:
struct evhttp_request* _ev_req = nullptr;
HttpHandler* _handler = nullptr;
- void* _handler_ctx = nullptr;
+ std::shared_ptr<void> _handler_ctx;
std::string _request_body;
};
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 9324a83f27..3cb865be40 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -38,6 +38,7 @@
#include "olap/iterators.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
+#include "runtime/stream_load/stream_load_context.h"
namespace doris {
@@ -190,10 +191,11 @@ Status FileFactory::create_file_reader(RuntimeProfile*
/*profile*/,
// file scan node/stream load pipe
Status FileFactory::create_pipe_reader(const TUniqueId& load_id,
io::FileReaderSPtr* file_reader) {
- *file_reader = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
- if (!(*file_reader)) {
+ auto stream_load_ctx =
ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
+ if (!stream_load_ctx) {
return Status::InternalError("unknown stream load id: {}",
UniqueId(load_id).to_string());
}
+ *file_reader = stream_load_ctx->pipe;
return Status::OK();
}
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f2a2c9e798..02644f186d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -516,7 +516,8 @@ void
FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state,
Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
if (params.txn_conf.need_txn) {
- StreamLoadContext* stream_load_ctx = new StreamLoadContext(_exec_env);
+ std::shared_ptr<StreamLoadContext> stream_load_ctx =
+ std::make_shared<StreamLoadContext>(_exec_env);
stream_load_ctx->db = params.txn_conf.db;
stream_load_ctx->db_id = params.txn_conf.db_id;
stream_load_ctx->table = params.txn_conf.tbl;
@@ -536,9 +537,11 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params) {
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024
/* min_chunk_size */,
-1 /* total_length */, true /* use_proto */);
stream_load_ctx->body_sink = pipe;
+ stream_load_ctx->pipe = pipe;
stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
-
RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(stream_load_ctx->id,
pipe));
+ RETURN_IF_ERROR(
+ _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id,
stream_load_ctx));
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
set_pipe(params.params.fragment_instance_id, pipe,
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index f2e3464f47..88c00044e1 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -36,7 +36,7 @@ namespace doris {
static const std::string PROP_GROUP_ID = "group.id";
// init kafka consumer will only set common configs such as
// brokers, groupid
-Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
+Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
std::unique_lock<std::mutex> l(_lock);
if (_init) {
// this consumer has already been initialized.
@@ -139,7 +139,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
Status KafkaDataConsumer::assign_topic_partitions(
const std::map<int32_t, int64_t>& begin_partition_offset, const
std::string& topic,
- StreamLoadContext* ctx) {
+ std::shared_ptr<StreamLoadContext> ctx) {
DCHECK(_k_consumer);
// create TopicPartitions
std::stringstream ss;
@@ -380,7 +380,7 @@ Status KafkaDataConsumer::get_latest_offsets_for_partitions(
return Status::OK();
}
-Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
+Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
return Status::InternalError("consumer is not initialized");
@@ -413,7 +413,7 @@ Status
KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset)
// if the kafka brokers and topic are same,
// we considered this consumer as matched, thus can be reused.
-bool KafkaDataConsumer::match(StreamLoadContext* ctx) {
+bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->load_src_type != TLoadSourceType::KAFKA) {
return false;
}
diff --git a/be/src/runtime/routine_load/data_consumer.h
b/be/src/runtime/routine_load/data_consumer.h
index afd4d9f6f7..4ad3d6821f 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -33,7 +33,7 @@ class Status;
class DataConsumer {
public:
- DataConsumer(StreamLoadContext* ctx)
+ DataConsumer()
: _id(UniqueId::gen_uid()),
_grp_id(UniqueId::gen_uid()),
_has_grp(false),
@@ -44,18 +44,18 @@ public:
virtual ~DataConsumer() {}
// init the consumer with the given parameters
- virtual Status init(StreamLoadContext* ctx) = 0;
+ virtual Status init(std::shared_ptr<StreamLoadContext> ctx) = 0;
// start consuming
- virtual Status consume(StreamLoadContext* ctx) = 0;
+ virtual Status consume(std::shared_ptr<StreamLoadContext> ctx) = 0;
// cancel the consuming process.
// if the consumer is not initialized, or the consuming
// process is already finished, call cancel() will
// return ERROR
- virtual Status cancel(StreamLoadContext* ctx) = 0;
+ virtual Status cancel(std::shared_ptr<StreamLoadContext> ctx) = 0;
// reset the data consumer before being reused
virtual Status reset() = 0;
// return true the if the consumer match the need
- virtual bool match(StreamLoadContext* ctx) = 0;
+ virtual bool match(std::shared_ptr<StreamLoadContext> ctx) = 0;
const UniqueId& id() { return _id; }
time_t last_visit_time() { return _last_visit_time; }
@@ -109,10 +109,8 @@ public:
class KafkaDataConsumer : public DataConsumer {
public:
- KafkaDataConsumer(StreamLoadContext* ctx)
- : DataConsumer(ctx),
- _brokers(ctx->kafka_info->brokers),
- _topic(ctx->kafka_info->topic) {}
+ KafkaDataConsumer(std::shared_ptr<StreamLoadContext> ctx)
+ : _brokers(ctx->kafka_info->brokers),
_topic(ctx->kafka_info->topic) {}
virtual ~KafkaDataConsumer() {
VLOG_NOTICE << "deconstruct consumer";
@@ -123,18 +121,19 @@ public:
}
}
- virtual Status init(StreamLoadContext* ctx) override;
+ Status init(std::shared_ptr<StreamLoadContext> ctx) override;
// TODO(cmy): currently do not implement single consumer start method,
using group_consume
- virtual Status consume(StreamLoadContext* ctx) override { return
Status::OK(); }
- virtual Status cancel(StreamLoadContext* ctx) override;
+ Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return
Status::OK(); }
+ Status cancel(std::shared_ptr<StreamLoadContext> ctx) override;
// reassign partition topics
virtual Status reset() override;
- virtual bool match(StreamLoadContext* ctx) override;
+ bool match(std::shared_ptr<StreamLoadContext> ctx) override;
// commit kafka offset
Status commit(std::vector<RdKafka::TopicPartition*>& offset);
Status assign_topic_partitions(const std::map<int32_t, int64_t>&
begin_partition_offset,
- const std::string& topic,
StreamLoadContext* ctx);
+ const std::string& topic,
+ std::shared_ptr<StreamLoadContext> ctx);
// start the consumer and put msgs to queue
Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t
max_running_time_ms);
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 869d427568..2d22fa93ab 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -24,7 +24,7 @@
namespace doris {
-Status KafkaDataConsumerGroup::assign_topic_partitions(StreamLoadContext* ctx)
{
+Status
KafkaDataConsumerGroup::assign_topic_partitions(std::shared_ptr<StreamLoadContext>
ctx) {
DCHECK(ctx->kafka_info);
DCHECK(_consumers.size() >= 1);
@@ -63,7 +63,7 @@ KafkaDataConsumerGroup::~KafkaDataConsumerGroup() {
DCHECK(_queue.get_size() == 0);
}
-Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
+Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext>
ctx) {
Status result_st = Status::OK();
// start all consumers
for (auto& consumer : _consumers) {
diff --git a/be/src/runtime/routine_load/data_consumer_group.h
b/be/src/runtime/routine_load/data_consumer_group.h
index b105a3c9db..bd866f0468 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -46,7 +46,7 @@ public:
}
// start all consumers
- virtual Status start_all(StreamLoadContext* ctx) { return Status::OK(); }
+ virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx) { return
Status::OK(); }
protected:
UniqueId _grp_id;
@@ -68,9 +68,9 @@ public:
virtual ~KafkaDataConsumerGroup();
- virtual Status start_all(StreamLoadContext* ctx) override;
+ Status start_all(std::shared_ptr<StreamLoadContext> ctx) override;
// assign topic partitions to all consumers equally
- Status assign_topic_partitions(StreamLoadContext* ctx);
+ Status assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx);
private:
// start a single consumer
diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp
b/be/src/runtime/routine_load/data_consumer_pool.cpp
index 31f666d3ab..579edfd201 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.cpp
+++ b/be/src/runtime/routine_load/data_consumer_pool.cpp
@@ -22,7 +22,8 @@
namespace doris {
-Status DataConsumerPool::get_consumer(StreamLoadContext* ctx,
std::shared_ptr<DataConsumer>* ret) {
+Status DataConsumerPool::get_consumer(std::shared_ptr<StreamLoadContext> ctx,
+ std::shared_ptr<DataConsumer>* ret) {
std::unique_lock<std::mutex> l(_lock);
// check if there is an available consumer.
@@ -58,7 +59,7 @@ Status DataConsumerPool::get_consumer(StreamLoadContext* ctx,
std::shared_ptr<Da
return Status::OK();
}
-Status DataConsumerPool::get_consumer_grp(StreamLoadContext* ctx,
+Status DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext>
ctx,
std::shared_ptr<DataConsumerGroup>*
ret) {
if (ctx->load_src_type != TLoadSourceType::KAFKA) {
return Status::InternalError(
diff --git a/be/src/runtime/routine_load/data_consumer_pool.h
b/be/src/runtime/routine_load/data_consumer_pool.h
index 5ea39d0a6c..6f8cf79f74 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.h
+++ b/be/src/runtime/routine_load/data_consumer_pool.h
@@ -49,10 +49,11 @@ public:
// get a already initialized consumer from cache,
// if not found in cache, create a new one.
- Status get_consumer(StreamLoadContext* ctx, std::shared_ptr<DataConsumer>*
ret);
+ Status get_consumer(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<DataConsumer>* ret);
// get several consumers and put them into group
- Status get_consumer_grp(StreamLoadContext* ctx,
std::shared_ptr<DataConsumerGroup>* ret);
+ Status get_consumer_grp(std::shared_ptr<StreamLoadContext> ctx,
+ std::shared_ptr<DataConsumerGroup>* ret);
// return the consumer to the pool
void return_consumer(std::shared_ptr<DataConsumer> consumer);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index da8842ac59..86f56ee756 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -55,19 +55,13 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
_thread_pool.join();
LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup";
- for (auto it = _task_map.begin(); it != _task_map.end(); ++it) {
- auto ctx = it->second;
- if (ctx->unref()) {
- delete ctx;
- }
- }
_task_map.clear();
}
// Create a temp StreamLoadContext and set some kafka connection info in it.
// So that we can use this ctx to get kafka data consumer instance.
Status RoutineLoadTaskExecutor::_prepare_ctx(const PKafkaMetaProxyRequest&
request,
- StreamLoadContext* ctx) {
+
std::shared_ptr<StreamLoadContext> ctx) {
ctx->load_type = TLoadType::ROUTINE_LOAD;
ctx->load_src_type = TLoadSourceType::KAFKA;
ctx->label = "NaN";
@@ -93,11 +87,11 @@ Status
RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
CHECK(request.has_kafka_info());
// This context is meaningless, just for unifing the interface
- StreamLoadContext ctx(_exec_env);
- RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
+ std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
+ RETURN_IF_ERROR(_prepare_ctx(request, ctx));
std::shared_ptr<DataConsumer> consumer;
- RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
+ RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer));
Status st =
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_partition_meta(
partition_ids);
@@ -112,11 +106,11 @@ Status
RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
CHECK(request.has_kafka_info());
// This context is meaningless, just for unifing the interface
- StreamLoadContext ctx(_exec_env);
- RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
+ std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
+ RETURN_IF_ERROR(_prepare_ctx(request, ctx));
std::shared_ptr<DataConsumer> consumer;
- RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
+ RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer));
Status st =
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_offsets_for_times(
std::vector<PIntegerPair>(request.offset_times().begin(),
request.offset_times().end()),
@@ -132,11 +126,11 @@ Status
RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(
CHECK(request.has_kafka_info());
// This context is meaningless, just for unifing the interface
- StreamLoadContext ctx(_exec_env);
- RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
+ std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
+ RETURN_IF_ERROR(_prepare_ctx(request, ctx));
std::shared_ptr<DataConsumer> consumer;
- RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
+ RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer));
Status st =
std::static_pointer_cast<KafkaDataConsumer>(consumer)
@@ -168,7 +162,7 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
}
// create the context
- StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
+ std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
ctx->load_type = TLoadType::ROUTINE_LOAD;
ctx->load_src_type = task.type;
ctx->job_id = task.job_id;
@@ -212,35 +206,28 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
break;
default:
LOG(WARNING) << "unknown load source type: " << task.type;
- delete ctx;
return Status::InternalError("unknown load source type");
}
VLOG_CRITICAL << "receive a new routine load task: " << ctx->brief();
// register the task
- ctx->ref();
_task_map[ctx->id] = ctx;
// offer the task to thread pool
- if
(!_thread_pool.offer(std::bind<void>(&RoutineLoadTaskExecutor::exec_task, this,
ctx,
- &_data_consumer_pool,
[this](StreamLoadContext* ctx) {
- std::unique_lock<std::mutex>
l(_lock);
- _task_map.erase(ctx->id);
- LOG(INFO) << "finished routine
load task "
- << ctx->brief()
- << ", status: " <<
ctx->status
- << ", current tasks
num: "
- << _task_map.size();
- if (ctx->unref()) {
- delete ctx;
- }
- }))) {
+ if (!_thread_pool.offer(std::bind<void>(
+ &RoutineLoadTaskExecutor::exec_task, this, ctx,
&_data_consumer_pool,
+ [this](std::shared_ptr<StreamLoadContext> ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
+ _task_map.erase(ctx->id);
+ LOG(INFO) << "finished routine load task " << ctx->brief()
+ << ", status: " << ctx->status
+ << ", current tasks num: " << _task_map.size();
+ }))) {
// failed to submit task, clear and return
LOG(WARNING) << "failed to submit routine load task: " << ctx->brief();
+ ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
_task_map.erase(ctx->id);
- if (ctx->unref()) {
- delete ctx;
- }
return Status::InternalError("failed to submit routine load task");
} else {
LOG(INFO) << "submit a new routine load task: " << ctx->brief()
@@ -249,8 +236,8 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
}
}
-void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx,
DataConsumerPool* consumer_pool,
- ExecFinishCallback cb) {
+void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
+ DataConsumerPool* consumer_pool,
ExecFinishCallback cb) {
#define HANDLE_ERROR(stmt, err_msg) \
do { \
Status _status_ = (stmt); \
@@ -292,7 +279,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext*
ctx, DataConsumerPool
ctx->body_sink = pipe;
// must put pipe before executing plan fragment
- HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe), "failed
to add pipe");
+ HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx), "failed
to add pipe");
#ifndef BE_TEST
// execute plan fragment, async
@@ -316,7 +303,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext*
ctx, DataConsumerPool
consumer_pool->return_consumers(consumer_grp.get());
// commit txn
- HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit
failed");
+ HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()),
"commit failed");
// commit kafka offset
switch (ctx->load_src_type) {
@@ -358,12 +345,12 @@ void
RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
cb(ctx);
}
-void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const
Status& st,
+void RoutineLoadTaskExecutor::err_handler(std::shared_ptr<StreamLoadContext>
ctx, const Status& st,
const std::string& err_msg) {
LOG(WARNING) << err_msg << ", routine load task: " << ctx->brief(true);
ctx->status = st;
if (ctx->need_rollback) {
- _exec_env->stream_load_executor()->rollback_txn(ctx);
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
if (ctx->body_sink != nullptr) {
@@ -372,10 +359,10 @@ void
RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status&
}
// for test only
-Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx)
{
+Status
RoutineLoadTaskExecutor::_execute_plan_for_test(std::shared_ptr<StreamLoadContext>
ctx) {
auto mock_consumer = [this, ctx]() {
- ctx->ref();
- std::shared_ptr<io::StreamLoadPipe> pipe =
_exec_env->new_load_stream_mgr()->get(ctx->id);
+ std::shared_ptr<io::StreamLoadPipe> pipe =
std::static_pointer_cast<io::StreamLoadPipe>(
+ _exec_env->new_load_stream_mgr()->get(ctx->id)->body_sink);
std::stringstream ss;
while (true) {
char one;
@@ -403,9 +390,6 @@ Status
RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) {
ss << one;
}
}
- if (ctx->unref()) {
- delete ctx;
- }
};
std::thread t1(mock_consumer);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h
b/be/src/runtime/routine_load/routine_load_task_executor.h
index 6c391a6888..8b46e7bd8d 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -19,6 +19,7 @@
#include <functional>
#include <map>
+#include <memory>
#include <mutex>
#include "gen_cpp/internal_service.pb.h"
@@ -40,7 +41,7 @@ class TRoutineLoadTask;
// to FE finally.
class RoutineLoadTaskExecutor {
public:
- typedef std::function<void(StreamLoadContext*)> ExecFinishCallback;
+ using ExecFinishCallback =
std::function<void(std::shared_ptr<StreamLoadContext>)>;
RoutineLoadTaskExecutor(ExecEnv* exec_env);
@@ -60,14 +61,17 @@ public:
private:
// execute the task
- void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool,
ExecFinishCallback cb);
+ void exec_task(std::shared_ptr<StreamLoadContext> ctx, DataConsumerPool*
pool,
+ ExecFinishCallback cb);
- void err_handler(StreamLoadContext* ctx, const Status& st, const
std::string& err_msg);
+ void err_handler(std::shared_ptr<StreamLoadContext> ctx, const Status& st,
+ const std::string& err_msg);
// for test only
- Status _execute_plan_for_test(StreamLoadContext* ctx);
+ Status _execute_plan_for_test(std::shared_ptr<StreamLoadContext> ctx);
// create a dummy StreamLoadContext for PKafkaMetaProxyRequest
- Status _prepare_ctx(const PKafkaMetaProxyRequest& request,
StreamLoadContext* ctx);
+ Status _prepare_ctx(const PKafkaMetaProxyRequest& request,
+ std::shared_ptr<StreamLoadContext> ctx);
private:
ExecEnv* _exec_env;
@@ -76,7 +80,7 @@ private:
std::mutex _lock;
// task id -> load context
- std::unordered_map<UniqueId, StreamLoadContext*> _task_map;
+ std::unordered_map<UniqueId, std::shared_ptr<StreamLoadContext>> _task_map;
};
} // namespace doris
diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h
b/be/src/runtime/stream_load/new_load_stream_mgr.h
index 9ab2030487..bb245232dc 100644
--- a/be/src/runtime/stream_load/new_load_stream_mgr.h
+++ b/be/src/runtime/stream_load/new_load_stream_mgr.h
@@ -21,19 +21,20 @@
#include <mutex>
#include <unordered_map>
-#include "io/fs/stream_load_pipe.h"
+#include "common/status.h"
#include "util/doris_metrics.h"
#include "util/uid_util.h"
namespace doris {
+class StreamLoadContext;
// used to register all streams in process so that other module can get this
stream
class NewLoadStreamMgr {
public:
NewLoadStreamMgr();
~NewLoadStreamMgr();
- Status put(const UniqueId& id, std::shared_ptr<io::StreamLoadPipe> stream)
{
+ Status put(const UniqueId& id, std::shared_ptr<StreamLoadContext> stream) {
std::lock_guard<std::mutex> l(_lock);
auto it = _stream_map.find(id);
if (it != std::end(_stream_map)) {
@@ -44,15 +45,13 @@ public:
return Status::OK();
}
- std::shared_ptr<io::StreamLoadPipe> get(const UniqueId& id) {
+ std::shared_ptr<StreamLoadContext> get(const UniqueId& id) {
std::lock_guard<std::mutex> l(_lock);
auto it = _stream_map.find(id);
if (it == std::end(_stream_map)) {
- return nullptr;
+ return std::shared_ptr<StreamLoadContext>(nullptr);
}
- auto stream = it->second;
- _stream_map.erase(it);
- return stream;
+ return it->second;
}
void remove(const UniqueId& id) {
@@ -66,6 +65,6 @@ public:
private:
std::mutex _lock;
- std::unordered_map<UniqueId, std::shared_ptr<io::StreamLoadPipe>>
_stream_map;
+ std::unordered_map<UniqueId, std::shared_ptr<StreamLoadContext>>
_stream_map;
};
} // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index ef3602ab8f..8951630afb 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -27,7 +27,9 @@
#include "common/utils.h"
#include "gen_cpp/BackendService_types.h"
#include "gen_cpp/FrontendService_types.h"
+#include "io/fs/stream_load_pipe.h"
#include "runtime/exec_env.h"
+#include "runtime/message_body_sink.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "service/backend_options.h"
@@ -82,7 +84,7 @@ class MessageBodySink;
class StreamLoadContext {
public:
- StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()),
_exec_env(exec_env), _refs(0) {
+ StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()),
_exec_env(exec_env) {
start_millis = UnixMillis();
}
@@ -91,8 +93,6 @@ public:
_exec_env->stream_load_executor()->rollback_txn(this);
need_rollback = false;
}
-
- _exec_env->new_load_stream_mgr()->remove(id);
}
std::string to_json() const;
@@ -109,10 +109,6 @@ public:
// also print the load source info if detail is set to true
std::string brief(bool detail = false) const;
- void ref() { _refs.fetch_add(1); }
- // If unref() returns true, this object should be delete
- bool unref() { return _refs.fetch_sub(1) == 1; }
-
public:
// load type, eg: ROUTINE LOAD/MANUAL LOAD
TLoadType::type load_type;
@@ -164,6 +160,7 @@ public:
TFileCompressType::type compress_type = TFileCompressType::UNKNOWN;
std::shared_ptr<MessageBodySink> body_sink;
+ std::shared_ptr<io::StreamLoadPipe> pipe;
TStreamLoadPutResult put_result;
@@ -211,7 +208,6 @@ public:
private:
ExecEnv* _exec_env;
- std::atomic<int> _refs;
};
} // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 61244bbd9f..e9d6c105f9 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -39,15 +39,15 @@ TLoadTxnRollbackResult k_stream_load_rollback_result;
Status k_stream_load_plan_status;
#endif
-Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
+Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext>
ctx) {
// submit this params
#ifndef BE_TEST
- ctx->ref();
ctx->start_write_data_nanos = MonotonicNanos();
LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id="
<< ctx->txn_id
<< ", query_id=" <<
print_id(ctx->put_result.params.params.query_id);
auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
ctx->put_result.params, [ctx, this](RuntimeState* state, Status*
status) {
+ ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos = std::move(state->tablet_commit_infos());
if (status->ok()) {
ctx->number_total_rows = state->num_rows_load_total();
@@ -110,19 +110,14 @@ Status
StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
if (ctx->need_commit_self && ctx->body_sink != nullptr) {
if (ctx->body_sink->cancelled() || !status->ok()) {
ctx->status = *status;
- this->rollback_txn(ctx);
+ this->rollback_txn(ctx.get());
} else {
- this->commit_txn(ctx);
+ this->commit_txn(ctx.get());
}
}
-
- if (ctx->unref()) {
- delete ctx;
- }
});
if (!st.ok()) {
// no need to check unref's return value
- ctx->unref();
return st;
}
#else
diff --git a/be/src/runtime/stream_load/stream_load_executor.h
b/be/src/runtime/stream_load/stream_load_executor.h
index 2c464dea75..b90cef0a06 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/runtime/stream_load/stream_load_executor.h
@@ -44,7 +44,7 @@ public:
void rollback_txn(StreamLoadContext* ctx);
- Status execute_plan_fragment(StreamLoadContext* ctx);
+ Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx);
private:
// collect the load statistics from context and set them to stat
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]