This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 563c3f75ffd [feature](move-memtable) share delta writer v2 among sinks
(#24066)
563c3f75ffd is described below
commit 563c3f75ffd498c08a41cc9aaaef78755bae80d7
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Sep 13 14:39:29 2023 +0800
[feature](move-memtable) share delta writer v2 among sinks (#24066)
---
be/src/common/config.cpp | 6 +-
be/src/common/config.h | 6 +-
be/src/runtime/exec_env.cpp | 4 +
be/src/runtime/exec_env.h | 13 ++-
be/src/runtime/exec_env_init.cpp | 6 +
be/src/vec/sink/delta_writer_v2_pool.cpp | 87 ++++++++++++++
be/src/vec/sink/delta_writer_v2_pool.h | 111 ++++++++++++++++++
be/src/vec/sink/load_stream_stub.cpp | 4 +
be/src/vec/sink/load_stream_stub.h | 13 ++-
be/src/vec/sink/load_stream_stub_pool.cpp | 56 +++++++++
be/src/vec/sink/load_stream_stub_pool.h | 101 ++++++++++++++++
be/src/vec/sink/vtablet_sink_v2.cpp | 149 ++++++++++++------------
be/src/vec/sink/vtablet_sink_v2.h | 12 +-
be/test/vec/exec/delta_writer_v2_pool_test.cpp | 87 ++++++++++++++
be/test/vec/exec/load_stream_stub_pool_test.cpp | 54 +++++++++
15 files changed, 619 insertions(+), 90 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index eecc4a11185..7b10bc72123 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -730,7 +730,11 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes,
"1048576");
// In most cases, it does not need to be modified.
DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
-// number of brpc stream per OlapTableSinkV2
+// share brpc streams when memtable_on_sink_node = true
+DEFINE_Bool(share_load_streams, "true");
+// share delta writers when memtable_on_sink_node = true
+DEFINE_Bool(share_delta_writers, "true");
+// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams =
true)
DEFINE_Int32(num_streams_per_sink, "5");
// timeout for open stream sink rpc in ms
DEFINE_Int64(open_stream_sink_timeout_ms, "500");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 83b7dbd95fc..eefcbb9b16a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -789,7 +789,11 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes);
// In most cases, it does not need to be modified.
DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
-// number of brpc stream per OlapTableSinkV2
+// share brpc streams when memtable_on_sink_node = true
+DECLARE_Bool(share_load_streams);
+// share delta writers when memtable_on_sink_node = true
+DECLARE_Bool(share_delta_writers);
+// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams =
true)
DECLARE_Int32(num_streams_per_sink);
// timeout for open stream sink rpc in ms
DECLARE_Int64(open_stream_sink_timeout_ms);
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 27986f5de31..aab83fba5f0 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -29,9 +29,13 @@
#include "time.h"
#include "util/debug_util.h"
#include "util/time.h"
+#include "vec/sink/delta_writer_v2_pool.h"
+#include "vec/sink/load_stream_stub_pool.h"
namespace doris {
+ExecEnv::ExecEnv() = default;
+
ExecEnv::~ExecEnv() {
destroy();
}
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index fd54d441666..d52e7cd3612 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -50,6 +50,10 @@ class TaskScheduler;
namespace taskgroup {
class TaskGroupManager;
}
+namespace stream_load {
+class DeltaWriterV2Pool;
+class LoadStreamStubPool;
+} // namespace stream_load
namespace io {
class S3FileBufferPool;
class FileCacheFactory;
@@ -234,6 +238,11 @@ public:
vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }
+ stream_load::LoadStreamStubPool* load_stream_stub_pool() {
+ return _load_stream_stub_pool.get();
+ }
+ stream_load::DeltaWriterV2Pool* delta_writer_v2_pool() { return
_delta_writer_v2_pool.get(); }
+
void wait_for_all_tasks_done();
void update_frontends(const std::vector<TFrontendInfo>& new_infos);
@@ -257,7 +266,7 @@ public:
}
private:
- ExecEnv() = default;
+ ExecEnv();
[[nodiscard]] Status _init(const std::vector<StorePath>& store_paths);
void _destroy();
@@ -334,6 +343,8 @@ private:
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
+ std::unique_ptr<stream_load::LoadStreamStubPool> _load_stream_stub_pool;
+ std::unique_ptr<stream_load::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
std::shared_mutex _zone_cache_rw_lock;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index bcdbf498013..24234a90d95 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -92,6 +92,8 @@
#include "util/timezone_utils.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
+#include "vec/sink/delta_writer_v2_pool.h"
+#include "vec/sink/load_stream_stub_pool.h"
#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) &&
!defined(LEAK_SANITIZER) && \
!defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
@@ -207,6 +209,8 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
_group_commit_mgr = new GroupCommitMgr(this);
_file_meta_cache = new
FileMetaCache(config::max_external_file_meta_cache_num);
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
+ _load_stream_stub_pool =
std::make_unique<stream_load::LoadStreamStubPool>();
+ _delta_writer_v2_pool = std::make_unique<stream_load::DeltaWriterV2Pool>();
_backend_client_cache->init_metrics("backend");
_frontend_client_cache->init_metrics("frontend");
@@ -543,6 +547,8 @@ void ExecEnv::destroy() {
_deregister_metrics();
SAFE_DELETE(_load_channel_mgr);
_memtable_memory_limiter.reset(nullptr);
+ _load_stream_stub_pool.reset();
+ _delta_writer_v2_pool.reset();
// shared_ptr maybe no need to be reset
// _brpc_iobuf_block_memory_tracker.reset();
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp
b/be/src/vec/sink/delta_writer_v2_pool.cpp
new file mode 100644
index 00000000000..c1066174a8c
--- /dev/null
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/delta_writer_v2_pool.h"
+
+#include "olap/delta_writer_v2.h"
+
+namespace doris {
+class TExpr;
+
+namespace stream_load {
+
+DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : _load_id(load_id),
_use_cnt(1) {}
+
+DeltaWriterV2Map::~DeltaWriterV2Map() = default;
+
+DeltaWriterV2* DeltaWriterV2Map::get_or_create(int64_t tablet_id,
+ std::function<DeltaWriterV2*()>
creator) {
+ _map.lazy_emplace(tablet_id, [&](const
TabletToDeltaWriterV2Map::constructor& ctor) {
+ ctor(tablet_id, creator());
+ });
+ return _map.at(tablet_id).get();
+}
+
+Status DeltaWriterV2Map::close() {
+ if (--_use_cnt > 0) {
+ return Status::OK();
+ }
+ Status status = Status::OK();
+ _map.for_each([&status](auto& entry) {
+ if (status.ok()) {
+ status = entry.second->close();
+ }
+ });
+ if (!status.ok()) {
+ return status;
+ }
+ _map.for_each([&status](auto& entry) {
+ if (status.ok()) {
+ status = entry.second->close_wait();
+ }
+ });
+ return status;
+}
+
+void DeltaWriterV2Map::cancel(Status status) {
+ _map.for_each([&status](auto& entry) {
entry.second->cancel_with_status(status); });
+}
+
+DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
+
+DeltaWriterV2Pool::~DeltaWriterV2Pool() = default;
+
+std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId
load_id) {
+ UniqueId id {load_id};
+ std::lock_guard<std::mutex> lock(_mutex);
+ std::shared_ptr<DeltaWriterV2Map> map = _pool[id].lock();
+ if (map) {
+ map->grab();
+ return map;
+ }
+ auto deleter = [this](DeltaWriterV2Map* m) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _pool.erase(m->unique_id());
+ delete m;
+ };
+ map = std::shared_ptr<DeltaWriterV2Map>(new DeltaWriterV2Map(id), deleter);
+ _pool[id] = map;
+ return map;
+}
+
+} // namespace stream_load
+} // namespace doris
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h
b/be/src/vec/sink/delta_writer_v2_pool.h
new file mode 100644
index 00000000000..d0328d7d2b5
--- /dev/null
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <brpc/controller.h>
+#include <bthread/types.h>
+#include <butil/errno.h>
+#include <fmt/format.h>
+#include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <google/protobuf/stubs/callback.h>
+#include <parallel_hashmap/phmap.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+// IWYU pragma: no_include <bits/chrono.h>
+
+#include <chrono> // IWYU pragma: keep
+#include <functional>
+#include <initializer_list>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <queue>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+class DeltaWriterV2;
+
+namespace stream_load {
+
+class DeltaWriterV2Map {
+public:
+ DeltaWriterV2Map(UniqueId load_id);
+
+ ~DeltaWriterV2Map();
+
+ void grab() { ++_use_cnt; }
+
+ // get or create delta writer for the given tablet, memory is managed by
DeltaWriterV2Map
+ DeltaWriterV2* get_or_create(int64_t tablet_id,
std::function<DeltaWriterV2*()> creator);
+
+ // close all delta writers in this DeltaWriterV2Map if there is no other
users
+ Status close();
+
+ // cancel all delta writers in this DeltaWriterV2Map
+ void cancel(Status status);
+
+ UniqueId unique_id() const { return _load_id; }
+
+ size_t size() const { return _map.size(); }
+
+private:
+ using TabletToDeltaWriterV2Map = phmap::parallel_flat_hash_map<
+ int64_t, std::unique_ptr<DeltaWriterV2>, std::hash<int64_t>,
std::equal_to<int64_t>,
+ std::allocator<phmap::Pair<const int64_t,
std::unique_ptr<DeltaWriterV2>>>, 4,
+ std::mutex>;
+
+ UniqueId _load_id;
+ TabletToDeltaWriterV2Map _map;
+ std::atomic<int> _use_cnt;
+};
+
+class DeltaWriterV2Pool {
+public:
+ DeltaWriterV2Pool();
+
+ ~DeltaWriterV2Pool();
+
+ std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id);
+
+ size_t size() {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _pool.size();
+ }
+
+private:
+ std::mutex _mutex;
+ std::unordered_map<UniqueId, std::weak_ptr<DeltaWriterV2Map>> _pool;
+};
+
+} // namespace stream_load
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 63bea0b6c73..052aa1f256b 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -108,6 +108,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema,
bool enable_profile) {
+ _num_open++;
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_is_init) {
return Status::OK();
@@ -188,6 +189,9 @@ Status LoadStreamStub::add_segment(int64_t partition_id,
int64_t index_id, int64
// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>&
tablets_to_commit) {
+ if (--_num_open > 0) {
+ return Status::OK();
+ }
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 8a1ae79c529..20cf5fc02ae 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -26,6 +26,7 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
+#include <parallel_hashmap/phmap.h>
#include <stddef.h>
#include <stdint.h>
@@ -71,8 +72,14 @@ class LoadStreamStub;
struct SegmentStatistics;
-using IndexToTabletSchema = std::unordered_map<int64_t,
std::shared_ptr<TabletSchema>>;
-using IndexToEnableMoW = std::unordered_map<int64_t, bool>;
+using IndexToTabletSchema = phmap::parallel_flat_hash_map<
+ int64_t, std::shared_ptr<TabletSchema>, std::hash<int64_t>,
std::equal_to<int64_t>,
+ std::allocator<phmap::Pair<const int64_t,
std::shared_ptr<TabletSchema>>>, 4, std::mutex>;
+
+using IndexToEnableMoW =
+ phmap::parallel_flat_hash_map<int64_t, bool, std::hash<int64_t>,
std::equal_to<int64_t>,
+ std::allocator<phmap::Pair<const
int64_t, bool>>, 4,
+ std::mutex>;
class LoadStreamStub {
private:
@@ -175,6 +182,8 @@ protected:
bthread::Mutex _mutex;
bthread::ConditionVariable _close_cv;
+ std::atomic<int> _num_open;
+
std::mutex _buffer_mutex;
std::mutex _send_mutex;
butil::IOBuf _buffer;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_stub_pool.cpp
new file mode 100644
index 00000000000..848d038b2f9
--- /dev/null
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/load_stream_stub_pool.h"
+
+#include "vec/sink/load_stream_stub.h"
+
+namespace doris {
+class TExpr;
+
+namespace stream_load {
+
+LoadStreamStubPool::LoadStreamStubPool() = default;
+
+LoadStreamStubPool::~LoadStreamStubPool() = default;
+std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id,
int64_t src_id,
+ int64_t dst_id) {
+ auto key = std::make_pair(UniqueId(load_id), dst_id);
+ std::lock_guard<std::mutex> lock(_mutex);
+ std::shared_ptr<Streams> streams = _pool[key].lock();
+ if (streams) {
+ return streams;
+ }
+ int32_t num_streams = std::max(1, config::num_streams_per_sink);
+ auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub
{load_id, src_id});
+ auto deleter = [this, key](Streams* s) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _pool.erase(key);
+ _template_stubs.erase(key.first);
+ delete s;
+ };
+ streams = std::shared_ptr<Streams>(new Streams(), deleter);
+ for (int32_t i = 0; i < num_streams; i++) {
+ // copy construct, internal tablet schema map will be shared among all
stubs
+ streams->emplace_back(new LoadStreamStub {*it->second});
+ }
+ _pool[key] = streams;
+ return streams;
+}
+
+} // namespace stream_load
+} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub_pool.h
b/be/src/vec/sink/load_stream_stub_pool.h
new file mode 100644
index 00000000000..ae550340d25
--- /dev/null
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <brpc/controller.h>
+#include <bthread/types.h>
+#include <butil/errno.h>
+#include <fmt/format.h>
+#include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <google/protobuf/stubs/callback.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+// IWYU pragma: no_include <bits/chrono.h>
+#include <chrono> // IWYU pragma: keep
+#include <functional>
+#include <initializer_list>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <queue>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "exec/data_sink.h"
+#include "exec/tablet_info.h"
+#include "gutil/ref_counted.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
+#include "runtime/thread_context.h"
+#include "runtime/types.h"
+#include "util/countdown_latch.h"
+#include "util/runtime_profile.h"
+#include "util/stopwatch.hpp"
+#include "vec/columns/column.h"
+#include "vec/common/allocator.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type.h"
+#include "vec/exprs/vexpr_fwd.h"
+
+namespace doris {
+
+class LoadStreamStub;
+
+namespace stream_load {
+
+using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
+
+class LoadStreamStubPool {
+public:
+ LoadStreamStubPool();
+
+ ~LoadStreamStubPool();
+
+ std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id);
+
+ size_t size() {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _pool.size();
+ }
+
+ // for UT only
+ size_t templates_size() {
+ std::lock_guard<std::mutex> lock(_mutex);
+ return _template_stubs.size();
+ }
+
+private:
+ std::mutex _mutex;
+ std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>>
_template_stubs;
+ std::unordered_map<std::pair<UniqueId, int64_t>, std::weak_ptr<Streams>>
_pool;
+};
+
+} // namespace stream_load
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index a88f05b88d8..ebf50d222c3 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -30,6 +30,7 @@
#include <algorithm>
#include <execution>
#include <mutex>
+#include <ranges>
#include <string>
#include <unordered_map>
@@ -54,7 +55,9 @@
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
+#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub.h"
+#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
@@ -153,41 +156,51 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- _stream_pool_for_node = std::make_shared<NodeToStreams>();
- _delta_writer_for_tablet = std::make_shared<DeltaWriterForTablet>();
+ if (config::share_delta_writers) {
+ _delta_writer_for_tablet =
+
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(_load_id);
+ } else {
+ _delta_writer_for_tablet =
std::make_shared<DeltaWriterV2Map>(_load_id);
+ }
_build_tablet_node_mapping();
- RETURN_IF_ERROR(_init_stream_pools());
+ RETURN_IF_ERROR(_open_streams(state->backend_id()));
return Status::OK();
}
-Status VOlapTableSinkV2::_init_stream_pools() {
- // stub template is for sharing internal schema map among all stubs
- LoadStreamStub stub_template {_load_id, _sender_id};
- for (auto& [node_id, _] : _tablets_for_node) {
- auto node_info = _nodes_info->find_node(node_id);
+Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
+ for (auto& [dst_id, _] : _tablets_for_node) {
+ auto node_info = _nodes_info->find_node(dst_id);
if (node_info == nullptr) {
- return Status::InternalError("Unknown node {} in tablet location",
node_id);
+ return Status::InternalError("Unknown node {} in tablet location",
dst_id);
+ }
+ std::shared_ptr<Streams> streams;
+ if (config::share_load_streams) {
+ streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+ _load_id, src_id, dst_id);
+ } else {
+ int32_t num_streams = std::max(1, config::num_streams_per_sink);
+ streams = std::make_shared<Streams>();
+ LoadStreamStub template_stub {_load_id, _sender_id};
+ for (int32_t i = 0; i < num_streams; i++) {
+ // copy construct, internal tablet schema map will be shared
among all stubs
+ streams->emplace_back(new LoadStreamStub {template_stub});
+ }
}
- Streams& stream_pool = (*_stream_pool_for_node)[node_id];
- RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool,
stub_template));
- }
- return Status::OK();
-}
-
-Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, Streams&
stream_pool,
- LoadStreamStub& stub_template) {
- stream_pool.reserve(config::num_streams_per_sink);
- for (int i = 0; i < config::num_streams_per_sink; ++i) {
- // internal tablet schema map will be shared among all stubs
- auto stream = std::make_unique<LoadStreamStub>(stub_template);
// get tablet schema from each backend only in the 1st stream
- const std::vector<PTabletID>& tablets_for_schema =
- i == 0 ? _indexes_from_node[node_info.id] :
std::vector<PTabletID> {};
-
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
node_info,
- _txn_id, *_schema, tablets_for_schema,
- _state->enable_profile()));
- stream_pool.emplace_back(std::move(stream));
+ for (auto& stream : *streams | std::ranges::views::take(1)) {
+ const std::vector<PTabletID>& tablets_for_schema =
_indexes_from_node[node_info->id];
+
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
+ *node_info, _txn_id, *_schema,
tablets_for_schema,
+ _state->enable_profile()));
+ }
+ // for the rest streams, open without getting tablet schema
+ for (auto& stream : *streams | std::ranges::views::drop(1)) {
+
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
+ *node_info, _txn_id, *_schema, {},
+ _state->enable_profile()));
+ }
+ _streams_for_node[dst_id] = streams;
}
return Status::OK();
}
@@ -238,7 +251,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id,
Streams& streams) {
return Status::InternalError("unknown tablet location, tablet id =
{}", tablet_id);
}
for (auto& node_id : location->node_ids) {
-
streams.emplace_back(_stream_pool_for_node->at(node_id)[_stream_index]);
+ streams.emplace_back(_streams_for_node[node_id]->at(_stream_index));
}
_stream_index = (_stream_index + 1) % config::num_streams_per_sink;
return Status::OK();
@@ -310,36 +323,27 @@ Status VOlapTableSinkV2::send(RuntimeState* state,
vectorized::Block* input_bloc
Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block>
block,
int64_t tablet_id, const Rows& rows,
const Streams& streams) {
- DeltaWriterV2* delta_writer = nullptr;
- {
- auto it = _delta_writer_for_tablet->find(tablet_id);
- if (it == _delta_writer_for_tablet->end()) {
- VLOG_DEBUG << "Creating DeltaWriterV2 for Tablet(tablet id: " <<
tablet_id
- << ", index id: " << rows.index_id << ")";
- WriteRequest req;
- req.partition_id = rows.partition_id;
- req.index_id = rows.index_id;
- req.tablet_id = tablet_id;
- req.txn_id = _txn_id;
- req.load_id = _load_id;
- req.tuple_desc = _output_tuple_desc;
- req.is_high_priority = _is_high_priority;
- req.table_schema_param = _schema.get();
- for (auto& index : _schema->indexes()) {
- if (index->index_id == rows.index_id) {
- req.slots = &index->slots;
- req.schema_hash = index->schema_hash;
- break;
- }
+ DeltaWriterV2* delta_writer =
_delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
+ WriteRequest req;
+ req.partition_id = rows.partition_id;
+ req.index_id = rows.index_id;
+ req.tablet_id = tablet_id;
+ req.txn_id = _txn_id;
+ req.load_id = _load_id;
+ req.tuple_desc = _output_tuple_desc;
+ req.is_high_priority = _is_high_priority;
+ req.table_schema_param = _schema.get();
+ for (auto& index : _schema->indexes()) {
+ if (index->index_id == rows.index_id) {
+ req.slots = &index->slots;
+ req.schema_hash = index->schema_hash;
+ break;
}
- DeltaWriterV2::open(&req, streams, &delta_writer, _profile);
- _delta_writer_for_tablet->emplace(tablet_id, delta_writer);
- } else {
- VLOG_DEBUG << "Reusing DeltaWriterV2 for Tablet(tablet id: " <<
tablet_id
- << ", index id: " << rows.index_id << ")";
- delta_writer = it->second.get();
}
- }
+ DeltaWriterV2* delta_writer = nullptr;
+ DeltaWriterV2::open(&req, streams, &delta_writer, _profile);
+ return delta_writer;
+ });
{
SCOPED_TIMER(_wait_mem_limit_timer);
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
@@ -352,12 +356,10 @@ Status
VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
Status VOlapTableSinkV2::_cancel(Status status) {
LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id << ", due to error: " << status;
-
- if (_delta_writer_for_tablet.use_count() == 1) {
- std::for_each(std::begin(*_delta_writer_for_tablet),
std::end(*_delta_writer_for_tablet),
- [&status](auto&& entry) {
entry.second->cancel_with_status(status); });
+ if (_delta_writer_for_tablet) {
+ _delta_writer_for_tablet->cancel(status);
+ _delta_writer_for_tablet.reset();
}
- _delta_writer_for_tablet.reset();
return Status::OK();
}
@@ -382,37 +384,30 @@ Status VOlapTableSinkV2::close(RuntimeState* state,
Status exec_status) {
{
SCOPED_TIMER(_close_writer_timer);
- // close all delta writers
- if (_delta_writer_for_tablet.use_count() == 1) {
- std::for_each(std::begin(*_delta_writer_for_tablet),
- std::end(*_delta_writer_for_tablet),
- [](auto&& entry) { entry.second->close(); });
- std::for_each(std::begin(*_delta_writer_for_tablet),
- std::end(*_delta_writer_for_tablet),
- [](auto&& entry) { entry.second->close_wait();
});
- }
+ // close all delta writers if this is the last user
+ _delta_writer_for_tablet->close();
_delta_writer_for_tablet.reset();
}
{
// send CLOSE_LOAD to all streams, return ERROR if any
- for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
- RETURN_IF_ERROR(_close_load(stream_pool));
+ for (const auto& [_, streams] : _streams_for_node) {
+ RETURN_IF_ERROR(_close_load(*streams));
}
}
{
SCOPED_TIMER(_close_load_timer);
- for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
- for (const auto& stream : stream_pool) {
+ for (const auto& [_, streams] : _streams_for_node) {
+ for (const auto& stream : *streams) {
stream->close_wait();
}
}
}
std::vector<TTabletCommitInfo> tablet_commit_infos;
- for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
- for (const auto& stream : stream_pool) {
+ for (const auto& [node_id, streams] : _streams_for_node) {
+ for (const auto& stream : *streams) {
for (auto tablet_id : stream->success_tablets()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;
@@ -424,7 +419,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status
exec_status) {
state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
std::make_move_iterator(tablet_commit_infos.begin()),
std::make_move_iterator(tablet_commit_infos.end()));
- _stream_pool_for_node.reset();
+ _streams_for_node.clear();
// _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows +
state->num_rows_load_filtered() +
diff --git a/be/src/vec/sink/vtablet_sink_v2.h
b/be/src/vec/sink/vtablet_sink_v2.h
index 047377f4e28..bf983ba693d 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -80,10 +80,9 @@ namespace stream_load {
class OlapTableBlockConvertor;
class OlapTabletFinder;
class VOlapTableSinkV2;
+class DeltaWriterV2Map;
-using DeltaWriterForTablet = std::unordered_map<int64_t,
std::unique_ptr<DeltaWriterV2>>;
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
-using NodeToStreams = std::unordered_map<int64_t, Streams>;
using NodeIdForStream = std::unordered_map<brpc::StreamId, int64_t>;
using NodePartitionTabletMapping =
std::unordered_map<int64_t, std::unordered_map<int64_t,
std::unordered_set<int64_t>>>;
@@ -133,10 +132,7 @@ public:
Status send(RuntimeState* state, vectorized::Block* block, bool eos =
false) override;
private:
- Status _init_stream_pool(const NodeInfo& node_info, Streams& stream_pool,
- LoadStreamStub& stub_template);
-
- Status _init_stream_pools();
+ Status _open_streams(int64_t src_id);
void _build_tablet_node_mapping();
@@ -215,9 +211,9 @@ private:
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
- std::shared_ptr<NodeToStreams> _stream_pool_for_node;
+ std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
size_t _stream_index = 0;
- std::shared_ptr<DeltaWriterForTablet> _delta_writer_for_tablet;
+ std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
std::atomic<int> _pending_streams {0};
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
new file mode 100644
index 00000000000..dfc3276ea75
--- /dev/null
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "vec/sink/delta_writer_v2_pool.h"
+
+#include <gtest/gtest.h>
+
+#include "olap/delta_writer_v2.h"
+
+namespace doris {
+
+namespace stream_load {
+
+class DeltaWriterV2PoolTest : public testing::Test {
+public:
+ DeltaWriterV2PoolTest() = default;
+ virtual ~DeltaWriterV2PoolTest() = default;
+};
+
+TEST_F(DeltaWriterV2PoolTest, test_pool) {
+ DeltaWriterV2Pool pool;
+ PUniqueId load_id;
+ load_id.set_hi(1);
+ load_id.set_hi(2);
+ PUniqueId load_id2;
+ load_id2.set_hi(1);
+ load_id2.set_hi(3);
+ auto map = pool.get_or_create(load_id);
+ auto map2 = pool.get_or_create(load_id2);
+ auto map3 = pool.get_or_create(load_id);
+ EXPECT_EQ(2, pool.size());
+ EXPECT_EQ(map, map3);
+ EXPECT_NE(map, map2);
+ map.reset();
+ map2.reset();
+ map3.reset();
+ EXPECT_EQ(0, pool.size());
+}
+
+TEST_F(DeltaWriterV2PoolTest, test_map) {
+ DeltaWriterV2Pool pool;
+ PUniqueId load_id;
+ load_id.set_hi(1);
+ load_id.set_hi(2);
+ auto map = pool.get_or_create(load_id);
+ EXPECT_EQ(1, pool.size());
+ WriteRequest req;
+ auto writer = map->get_or_create(100, [&req]() {
+ RuntimeProfile profile("test");
+ DeltaWriterV2* writer;
+ DeltaWriterV2::open(&req, {}, &writer, &profile);
+ return writer;
+ });
+ auto writer2 = map->get_or_create(101, [&req]() {
+ RuntimeProfile profile("test");
+ DeltaWriterV2* writer;
+ DeltaWriterV2::open(&req, {}, &writer, &profile);
+ return writer;
+ });
+ auto writer3 = map->get_or_create(100, [&req]() {
+ RuntimeProfile profile("test");
+ DeltaWriterV2* writer;
+ DeltaWriterV2::open(&req, {}, &writer, &profile);
+ return writer;
+ });
+ EXPECT_EQ(2, map->size());
+ EXPECT_EQ(writer, writer3);
+ EXPECT_NE(writer, writer2);
+ map.reset();
+ EXPECT_EQ(0, pool.size());
+}
+
+} // namespace stream_load
+} // namespace doris
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
new file mode 100644
index 00000000000..f1ccb70beeb
--- /dev/null
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "vec/sink/load_stream_stub_pool.h"
+
+#include <gtest/gtest.h>
+
+#include "vec/sink/load_stream_stub.h"
+
+namespace doris {
+
+namespace stream_load {
+
+class LoadStreamStubPoolTest : public testing::Test {
+public:
+ LoadStreamStubPoolTest() = default;
+ virtual ~LoadStreamStubPoolTest() = default;
+};
+
+TEST_F(LoadStreamStubPoolTest, test) {
+ LoadStreamStubPool pool;
+ int64_t src_id = 100;
+ PUniqueId load_id;
+ load_id.set_hi(1);
+ load_id.set_hi(2);
+ auto streams1 = pool.get_or_create(load_id, src_id, 101);
+ auto streams2 = pool.get_or_create(load_id, src_id, 102);
+ auto streams3 = pool.get_or_create(load_id, src_id, 101);
+ EXPECT_EQ(2, pool.size());
+ EXPECT_EQ(1, pool.templates_size());
+ EXPECT_EQ(streams1, streams3);
+ EXPECT_NE(streams1, streams2);
+ streams1.reset();
+ streams2.reset();
+ streams3.reset();
+ EXPECT_EQ(0, pool.size());
+ EXPECT_EQ(0, pool.templates_size());
+}
+
+} // namespace stream_load
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]