This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b19abac5e2e [fix](move-memtable) pass num local sink to backends
(#26897)
b19abac5e2e is described below
commit b19abac5e2ebee26f667b094e77189127c7ebfc6
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Nov 14 08:28:49 2023 +0800
[fix](move-memtable) pass num local sink to backends (#26897)
---
be/src/olap/delta_writer_v2.cpp | 3 ++
be/src/pipeline/pipeline_fragment_context.cpp | 1 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 1 +
be/src/runtime/plan_fragment_executor.cpp | 1 +
be/src/runtime/runtime_state.h | 5 +++
be/src/vec/sink/delta_writer_v2_pool.cpp | 35 ++++++++++++------
be/src/vec/sink/delta_writer_v2_pool.h | 17 ++++-----
be/src/vec/sink/load_stream_stub.cpp | 23 ++++++++----
be/src/vec/sink/load_stream_stub.h | 9 +++--
be/src/vec/sink/load_stream_stub_pool.cpp | 41 +++++++++++++++-------
be/src/vec/sink/load_stream_stub_pool.h | 26 ++++++++++++--
be/src/vec/sink/vtablet_sink_v2.cpp | 33 ++++++++++-------
be/src/vec/sink/vtablet_sink_v2.h | 13 +++++--
be/test/io/fs/stream_sink_file_writer_test.cpp | 2 +-
be/test/vec/exec/delta_writer_v2_pool_test.cpp | 8 ++---
be/test/vec/exec/load_stream_stub_pool_test.cpp | 12 +++----
.../apache/doris/planner/StreamLoadPlanner.java | 2 ++
.../main/java/org/apache/doris/qe/Coordinator.java | 18 ++++++----
gensrc/thrift/PaloInternalService.thrift | 3 ++
19 files changed, 177 insertions(+), 76 deletions(-)
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index c87cf7510a4..0a4108970a6 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -100,6 +100,9 @@ Status DeltaWriterV2::init() {
return Status::OK();
}
// build tablet schema in request level
+ if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) ==
nullptr) {
+ return Status::InternalError("failed to find tablet schema for {}",
_req.index_id);
+ }
_build_current_tablet_schema(_req.index_id, _req.table_schema_param,
*_streams[0]->tablet_schema(_req.index_id));
RowsetWriterContext context;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 02b991a7aa6..9610122bc02 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -310,6 +310,7 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
+ _runtime_state->set_num_local_sink(request.num_local_sink);
if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index aa029709717..47805a04772 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -203,6 +203,7 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
+ _runtime_state->set_num_local_sink(request.num_local_sink);
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 9327f96a5d5..3af870c7f07 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -214,6 +214,7 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
_runtime_state->set_num_per_fragment_instances(params.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
+ _runtime_state->set_num_local_sink(request.num_local_sink);
// set up sink, if required
if (request.fragment.__isset.output_sink) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index aae2fb3cce4..3b420511fa0 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -319,6 +319,10 @@ public:
int total_load_streams() const { return _total_load_streams; }
+ void set_num_local_sink(int num_local_sink) { _num_local_sink =
num_local_sink; }
+
+ int num_local_sink() const { return _num_local_sink; }
+
bool disable_stream_preaggregations() const {
return _query_options.disable_stream_preaggregations;
}
@@ -553,6 +557,7 @@ private:
int _num_per_fragment_instances = 0;
int _load_stream_per_node = 0;
int _total_load_streams = 0;
+ int _num_local_sink = 0;
// The backend id on which this fragment instance runs
int64_t _backend_id = -1;
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index b9057136d97..dc9a2765a55 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -25,7 +25,8 @@ class TExpr;
namespace vectorized {
-DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : _load_id(load_id),
_use_cnt(1) {}
+DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use,
DeltaWriterV2Pool* pool)
+ : _load_id(load_id), _use_cnt(num_use), _pool(pool) {}
DeltaWriterV2Map::~DeltaWriterV2Map() = default;
@@ -38,9 +39,15 @@ DeltaWriterV2* DeltaWriterV2Map::get_or_create(
}
Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
- if (--_use_cnt > 0) {
+ int num_use = --_use_cnt;
+ if (num_use > 0) {
+ LOG(INFO) << "not closing DeltaWriterV2Map << " << _load_id << " ,
use_cnt = " << num_use;
return Status::OK();
}
+ LOG(INFO) << "closing DeltaWriterV2Map " << _load_id;
+ if (_pool != nullptr) {
+ _pool->erase(_load_id);
+ }
Status status = Status::OK();
_map.for_each([&status](auto& entry) {
if (status.ok()) {
@@ -59,6 +66,11 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
}
void DeltaWriterV2Map::cancel(Status status) {
+ int num_use = --_use_cnt;
+ LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt = "
<< num_use;
+ if (num_use == 0 && _pool != nullptr) {
+ _pool->erase(_load_id);
+ }
_map.for_each([&status](auto& entry) {
static_cast<void>(entry.second->cancel_with_status(status));
});
@@ -68,23 +80,24 @@ DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
DeltaWriterV2Pool::~DeltaWriterV2Pool() = default;
-std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId
load_id) {
+std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId
load_id,
+ int
num_sink) {
UniqueId id {load_id};
std::lock_guard<std::mutex> lock(_mutex);
- std::shared_ptr<DeltaWriterV2Map> map = _pool[id].lock();
+ std::shared_ptr<DeltaWriterV2Map> map = _pool[id];
if (map) {
- map->grab();
return map;
}
- auto deleter = [this](DeltaWriterV2Map* m) {
- std::lock_guard<std::mutex> lock(_mutex);
- _pool.erase(m->unique_id());
- delete m;
- };
- map = std::shared_ptr<DeltaWriterV2Map>(new DeltaWriterV2Map(id), deleter);
+ map = std::make_shared<DeltaWriterV2Map>(id, num_sink, this);
_pool[id] = map;
return map;
}
+void DeltaWriterV2Pool::erase(UniqueId load_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ LOG(INFO) << "erasing DeltaWriterV2Map, load_id = " << load_id;
+ _pool.erase(load_id);
+}
+
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h
b/be/src/vec/sink/delta_writer_v2_pool.h
index 8439062440a..f05b144200f 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.h
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -58,26 +58,24 @@ class RuntimeProfile;
namespace vectorized {
+class DeltaWriterV2Pool;
+
class DeltaWriterV2Map {
public:
- DeltaWriterV2Map(UniqueId load_id);
+ DeltaWriterV2Map(UniqueId load_id, int num_use = 1, DeltaWriterV2Pool*
pool = nullptr);
~DeltaWriterV2Map();
- void grab() { ++_use_cnt; }
-
// get or create delta writer for the given tablet, memory is managed by
DeltaWriterV2Map
DeltaWriterV2* get_or_create(int64_t tablet_id,
std::function<std::unique_ptr<DeltaWriterV2>()> creator);
// close all delta writers in this DeltaWriterV2Map if there is no other
users
- Status close(RuntimeProfile* profile);
+ Status close(RuntimeProfile* profile = nullptr);
// cancel all delta writers in this DeltaWriterV2Map
void cancel(Status status);
- UniqueId unique_id() const { return _load_id; }
-
size_t size() const { return _map.size(); }
private:
@@ -89,6 +87,7 @@ private:
UniqueId _load_id;
TabletToDeltaWriterV2Map _map;
std::atomic<int> _use_cnt;
+ DeltaWriterV2Pool* _pool;
};
class DeltaWriterV2Pool {
@@ -97,7 +96,9 @@ public:
~DeltaWriterV2Pool();
- std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id);
+ std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id, int
num_sink = 1);
+
+ void erase(UniqueId load_id);
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
@@ -106,7 +107,7 @@ public:
private:
std::mutex _mutex;
- std::unordered_map<UniqueId, std::weak_ptr<DeltaWriterV2Map>> _pool;
+ std::unordered_map<UniqueId, std::shared_ptr<DeltaWriterV2Map>> _pool;
};
} // namespace vectorized
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index c2f7f246f30..75b814dc23f 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -83,14 +83,16 @@ void
LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
_close_cv.notify_all();
}
-LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id)
- : _load_id(load_id),
+LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
+ : _use_cnt(num_use),
+ _load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>())
{};
LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
- : _load_id(stub._load_id),
+ : _use_cnt(stub._use_cnt.load()),
+ _load_id(stub._load_id),
_src_id(stub._src_id),
_tablet_schema_for_index(stub._tablet_schema_for_index),
_enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
@@ -107,7 +109,6 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema,
int total_streams,
bool enable_profile) {
- _num_open++;
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_is_init.load()) {
return Status::OK();
@@ -190,15 +191,23 @@ Status LoadStreamStub::add_segment(int64_t partition_id,
int64_t index_id, int64
// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>&
tablets_to_commit) {
- if (--_num_open > 0) {
+ {
+ std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+ _tablets_to_commit.insert(_tablets_to_commit.end(),
tablets_to_commit.begin(),
+ tablets_to_commit.end());
+ }
+ if (--_use_cnt > 0) {
return Status::OK();
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
- for (const auto& tablet : tablets_to_commit) {
- *header.add_tablets_to_commit() = tablet;
+ {
+ std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+ for (const auto& tablet : _tablets_to_commit) {
+ *header.add_tablets_to_commit() = tablet;
+ }
}
return _encode_and_send(header);
}
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 3650b2aeae2..b17acea4cb3 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -134,7 +134,7 @@ private:
public:
// construct new stub
- LoadStreamStub(PUniqueId load_id, int64_t src_id);
+ LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
// copy constructor, shared_ptr members are shared
LoadStreamStub(LoadStreamStub& stub);
@@ -177,7 +177,7 @@ public:
}
std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
- return _tablet_schema_for_index->at(index_id);
+ return (*_tablet_schema_for_index)[index_id];
}
bool enable_unique_mow(int64_t index_id) const {
@@ -203,7 +203,10 @@ protected:
std::atomic<bool> _is_init;
bthread::Mutex _mutex;
- std::atomic<int> _num_open;
+ std::atomic<int> _use_cnt;
+
+ std::mutex _tablets_to_commit_mutex;
+ std::vector<PTabletID> _tablets_to_commit;
std::mutex _buffer_mutex;
std::mutex _send_mutex;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_stub_pool.cpp
index 5848ca5a51a..ec7e53211fb 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -24,33 +24,50 @@ class TExpr;
namespace stream_load {
+LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use,
LoadStreamStubPool* pool)
+ : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
+
+void LoadStreams::release() {
+ int num_use = --_use_cnt;
+ if (num_use == 0) {
+ LOG(INFO) << "releasing streams for load_id = " << _load_id << ",
dst_id = " << _dst_id;
+ _pool->erase(_load_id, _dst_id);
+ } else {
+ LOG(INFO) << "no releasing streams for load_id = " << _load_id << ",
dst_id = " << _dst_id
+ << ", use_cnt = " << num_use;
+ }
+}
+
LoadStreamStubPool::LoadStreamStubPool() = default;
LoadStreamStubPool::~LoadStreamStubPool() = default;
-std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id,
int64_t src_id,
- int64_t dst_id, int
num_streams) {
+
+std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId
load_id, int64_t src_id,
+ int64_t dst_id,
int num_streams,
+ int num_sink) {
auto key = std::make_pair(UniqueId(load_id), dst_id);
std::lock_guard<std::mutex> lock(_mutex);
- std::shared_ptr<Streams> streams = _pool[key].lock();
+ std::shared_ptr<LoadStreams> streams = _pool[key];
if (streams) {
return streams;
}
DCHECK(num_streams > 0) << "stream num should be greater than 0";
- auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub
{load_id, src_id});
- auto deleter = [this, key](Streams* s) {
- std::lock_guard<std::mutex> lock(_mutex);
- _pool.erase(key);
- _template_stubs.erase(key.first);
- delete s;
- };
- streams = std::shared_ptr<Streams>(new Streams(), deleter);
+ DCHECK(num_sink > 0) << "sink num should be greater than 0";
+ auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub
{load_id, src_id, num_sink});
+ streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
for (int32_t i = 0; i < num_streams; i++) {
// copy construct, internal tablet schema map will be shared among all
stubs
- streams->emplace_back(new LoadStreamStub {*it->second});
+ streams->streams().emplace_back(new LoadStreamStub {*it->second});
}
_pool[key] = streams;
return streams;
}
+void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _pool.erase(std::make_pair(load_id, dst_id));
+ _template_stubs.erase(load_id);
+}
+
} // namespace stream_load
} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub_pool.h
b/be/src/vec/sink/load_stream_stub_pool.h
index 73b41fdd61a..2cf55be4915 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -70,16 +70,36 @@ class LoadStreamStub;
namespace stream_load {
+class LoadStreamStubPool;
+
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
+class LoadStreams {
+public:
+ LoadStreams(UniqueId load_id, int64_t dst_id, int num_use,
LoadStreamStubPool* pool);
+
+ void release();
+
+ Streams& streams() { return _streams; }
+
+private:
+ Streams _streams;
+ UniqueId _load_id;
+ int64_t _dst_id;
+ std::atomic<int> _use_cnt;
+ LoadStreamStubPool* _pool;
+};
+
class LoadStreamStubPool {
public:
LoadStreamStubPool();
~LoadStreamStubPool();
- std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id,
- int num_streams);
+ std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t
src_id, int64_t dst_id,
+ int num_streams, int num_sink);
+
+ void erase(UniqueId load_id, int64_t dst_id);
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
@@ -95,7 +115,7 @@ public:
private:
std::mutex _mutex;
std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>>
_template_stubs;
- std::unordered_map<std::pair<UniqueId, int64_t>, std::weak_ptr<Streams>>
_pool;
+ std::unordered_map<std::pair<UniqueId, int64_t>,
std::shared_ptr<LoadStreams>> _pool;
};
} // namespace stream_load
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 3aa23e10595..4e93d11e2e8 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -155,10 +155,12 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
_num_senders = state->num_per_fragment_instances();
_stream_per_node = state->load_stream_per_node();
_total_streams = state->total_load_streams();
+ _num_local_sink = state->num_local_sink();
DCHECK(_stream_per_node > 0) << "load stream per node should be greator
than 0";
DCHECK(_total_streams > 0) << "total load streams should be greator than
0";
+ DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " <<
_stream_per_node
- << ", total_streams " << _total_streams;
+ << ", total_streams " << _total_streams << ", num_local_sink: "
<< _num_local_sink;
_is_high_priority =
(state->execution_timeout() <=
config::load_task_high_priority_threshold_second);
@@ -197,8 +199,8 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
if (config::share_delta_writers) {
- _delta_writer_for_tablet =
-
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(_load_id);
+ _delta_writer_for_tablet =
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
+ _load_id, _num_local_sink);
} else {
_delta_writer_for_tablet =
std::make_shared<DeltaWriterV2Map>(_load_id);
}
@@ -226,18 +228,17 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location",
dst_id);
}
- std::shared_ptr<Streams> streams;
- streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
- _load_id, src_id, dst_id, _stream_per_node);
+ auto streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+ _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
// get tablet schema from each backend only in the 1st stream
- for (auto& stream : *streams | std::ranges::views::take(1)) {
+ for (auto& stream : streams->streams() | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema =
_indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema,
tablets_for_schema,
_total_streams,
_state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
- for (auto& stream : *streams | std::ranges::views::drop(1)) {
+ for (auto& stream : streams->streams() | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, {},
_total_streams,
_state->enable_profile()));
@@ -300,7 +301,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id,
Streams& streams) {
return Status::InternalError("unknown tablet location, tablet id =
{}", tablet_id);
}
for (auto& node_id : location->node_ids) {
- streams.emplace_back(_streams_for_node[node_id]->at(_stream_index));
+
streams.emplace_back(_streams_for_node[node_id]->streams().at(_stream_index));
}
_stream_index = (_stream_index + 1) % _stream_per_node;
return Status::OK();
@@ -393,6 +394,9 @@ Status VOlapTableSinkV2::_cancel(Status status) {
_delta_writer_for_tablet->cancel(status);
_delta_writer_for_tablet.reset();
}
+ for (const auto& [_, streams] : _streams_for_node) {
+ streams->release();
+ }
return Status::OK();
}
@@ -415,6 +419,11 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status
exec_status) {
COUNTER_SET(_row_distribution_timer,
(int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_validate_data_timer,
_block_convertor->validate_data_ns());
+ // release streams from the pool first, to prevent memory leak
+ for (const auto& [_, streams] : _streams_for_node) {
+ streams->release();
+ }
+
{
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
@@ -425,14 +434,14 @@ Status VOlapTableSinkV2::close(RuntimeState* state,
Status exec_status) {
{
// send CLOSE_LOAD to all streams, return ERROR if any
for (const auto& [_, streams] : _streams_for_node) {
- RETURN_IF_ERROR(_close_load(*streams));
+ RETURN_IF_ERROR(_close_load(streams->streams()));
}
}
{
SCOPED_TIMER(_close_load_timer);
for (const auto& [_, streams] : _streams_for_node) {
- for (const auto& stream : *streams) {
+ for (const auto& stream : streams->streams()) {
RETURN_IF_ERROR(stream->close_wait());
}
}
@@ -440,7 +449,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status
exec_status) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
for (const auto& [node_id, streams] : _streams_for_node) {
- for (const auto& stream : *streams) {
+ for (const auto& stream : streams->streams()) {
for (auto tablet_id : stream->success_tablets()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;
diff --git a/be/src/vec/sink/vtablet_sink_v2.h
b/be/src/vec/sink/vtablet_sink_v2.h
index 6f369286677..71f85c8156c 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -77,6 +77,10 @@ class TExpr;
class TabletSchema;
class TupleDescriptor;
+namespace stream_load {
+class LoadStreams;
+}
+
namespace vectorized {
class OlapTableBlockConvertor;
@@ -156,8 +160,9 @@ private:
// To support multiple senders, we maintain a channel for each sender.
int _sender_id = -1;
int _num_senders = -1;
- int _stream_per_node = 0;
- int _total_streams = 0;
+ int _stream_per_node = -1;
+ int _total_streams = -1;
+ int _num_local_sink = -1;
bool _is_high_priority = false;
bool _write_file_cache = false;
@@ -204,7 +209,9 @@ private:
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
- std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
+ std::unordered_map<int64_t,
std::shared_ptr<::doris::stream_load::LoadStreams>>
+ _streams_for_node;
+
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index c6ac4a3f501..c52b59f01e4 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -51,7 +51,7 @@ static std::atomic<int64_t> g_num_request;
class StreamSinkFileWriterTest : public testing::Test {
class MockStreamStub : public LoadStreamStub {
public:
- MockStreamStub(PUniqueId load_id, int64_t src_id) :
LoadStreamStub(load_id, src_id) {};
+ MockStreamStub(PUniqueId load_id, int64_t src_id) :
LoadStreamStub(load_id, src_id, 1) {};
virtual ~MockStreamStub() = default;
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
index 30b56b65d11..d44fd17a761 100644
--- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -42,9 +42,9 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) {
EXPECT_EQ(2, pool.size());
EXPECT_EQ(map, map3);
EXPECT_NE(map, map2);
- map.reset();
- map2.reset();
- map3.reset();
+ EXPECT_TRUE(map->close().ok());
+ EXPECT_TRUE(map2->close().ok());
+ EXPECT_TRUE(map3->close().ok());
EXPECT_EQ(0, pool.size());
}
@@ -62,7 +62,7 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
EXPECT_NE(writer, writer2);
- map.reset();
+ static_cast<void>(map->close());
EXPECT_EQ(0, pool.size());
}
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index 929b906aab7..73f09508896 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -36,16 +36,16 @@ TEST_F(LoadStreamStubPoolTest, test) {
PUniqueId load_id;
load_id.set_hi(1);
load_id.set_hi(2);
- auto streams1 = pool.get_or_create(load_id, src_id, 101, 5);
- auto streams2 = pool.get_or_create(load_id, src_id, 102, 5);
- auto streams3 = pool.get_or_create(load_id, src_id, 101, 5);
+ auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
+ auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
+ auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
EXPECT_EQ(2, pool.size());
EXPECT_EQ(1, pool.templates_size());
EXPECT_EQ(streams1, streams3);
EXPECT_NE(streams1, streams2);
- streams1.reset();
- streams2.reset();
- streams3.reset();
+ streams1->release();
+ streams2->release();
+ streams3->release();
EXPECT_EQ(0, pool.size());
EXPECT_EQ(0, pool.templates_size());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index f4131235da3..ce8b08086ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -300,6 +300,7 @@ public class StreamLoadPlanner {
params.setParams(execParams);
params.setLoadStreamPerNode(taskInfo.getStreamPerNode());
params.setTotalLoadStreams(taskInfo.getStreamPerNode());
+ params.setNumLocalSink(1);
TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setQueryType(TQueryType.LOAD);
queryOptions.setQueryTimeout(timeout);
@@ -503,6 +504,7 @@ public class StreamLoadPlanner {
pipParams.setNumSenders(1);
pipParams.setLoadStreamPerNode(taskInfo.getStreamPerNode());
pipParams.setTotalLoadStreams(taskInfo.getStreamPerNode());
+ pipParams.setNumLocalSink(1);
TPipelineInstanceParams localParams = new TPipelineInstanceParams();
localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo +
fragmentInstanceIdIndex));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 90fb91304c2..868a9928220 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -683,7 +683,7 @@ public class Coordinator implements CoordInterface {
int backendIdx = 0;
int profileFragmentId = 0;
long memoryLimit = queryOptions.getMemLimit();
- Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
+ Map<Long, Integer> numSinkOnBackend = Maps.newHashMap();
beToExecStates.clear();
// If #fragments >=2, use twoPhaseExecution with
exec_plan_fragments_prepare and exec_plan_fragments_start,
// else use exec_plan_fragments directly.
@@ -753,7 +753,7 @@ public class Coordinator implements CoordInterface {
states.addState(execState);
if (tParam.getFragment().getOutputSink() != null
&& tParam.getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
-
backendsWithOlapTableSink.add(execState.backend.getId());
+ numSinkOnBackend.merge(execState.backend.getId(), 1,
Integer::sum);
}
++backendIdx;
}
@@ -765,7 +765,10 @@ public class Coordinator implements CoordInterface {
if (tParam.getFragment().getOutputSink() != null
&& tParam.getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
tParam.setLoadStreamPerNode(loadStreamPerNode);
-
tParam.setTotalLoadStreams(backendsWithOlapTableSink.size() *
loadStreamPerNode);
+ tParam.setTotalLoadStreams(numSinkOnBackend.size() *
loadStreamPerNode);
+
tParam.setNumLocalSink(numSinkOnBackend.get(tParam.getBackendId()));
+ LOG.info("num local sink for backend {} is {}",
tParam.getBackendId(),
+ numSinkOnBackend.get(tParam.getBackendId()));
}
}
profileFragmentId += 1;
@@ -844,7 +847,7 @@ public class Coordinator implements CoordInterface {
}
}
- Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
+ int numBackendsWithSink = 0;
// 3. group PipelineExecContext by BE.
// So that we can use one RPC to send all fragment instances
of a BE.
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry
: tParams.entrySet()) {
@@ -881,7 +884,7 @@ public class Coordinator implements CoordInterface {
if (entry.getValue().getFragment().getOutputSink() != null
&&
entry.getValue().getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
- backendsWithOlapTableSink.add(backendId);
+ numBackendsWithSink++;
}
++backendIdx;
}
@@ -894,7 +897,10 @@ public class Coordinator implements CoordInterface {
&&
entry.getValue().getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
entry.getValue().setLoadStreamPerNode(loadStreamPerNode);
-
entry.getValue().setTotalLoadStreams(backendsWithOlapTableSink.size() *
loadStreamPerNode);
+
entry.getValue().setTotalLoadStreams(numBackendsWithSink * loadStreamPerNode);
+
entry.getValue().setNumLocalSink(entry.getValue().getLocalParams().size());
+ LOG.info("num local sink for backend {} is {}",
entry.getValue().getBackendId(),
+ entry.getValue().getNumLocalSink());
}
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index e55ab89e328..7c55842735c 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -461,6 +461,8 @@ struct TExecPlanFragmentParams {
// total num of load streams the downstream backend will see
27: optional i32 total_load_streams
+
+ 28: optional i32 num_local_sink
}
struct TExecPlanFragmentParamsList {
@@ -678,6 +680,7 @@ struct TPipelineFragmentParams {
30: optional bool group_commit = false;
31: optional i32 load_stream_per_node // num load stream for each sink
backend
32: optional i32 total_load_streams // total num of load streams the
downstream backend will see
+ 33: optional i32 num_local_sink
}
struct TPipelineFragmentParamsList {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]