This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 960e80cc1b1 [refactor](move-memtable) remove phmap and use shared ptr
in delta writer v2 (#30949)
960e80cc1b1 is described below
commit 960e80cc1b1e0ed8259db2d286d1b144042980c4
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Feb 7 18:04:48 2024 +0800
[refactor](move-memtable) remove phmap and use shared ptr in delta writer
v2 (#30949)
* [refactor](move-memtable) remove phmap and use shared ptr in delta writer
v2 pool
* ENABLE_FACTORY_CREATOR DeltaWriterV2
---
be/src/olap/delta_writer_v2.h | 2 ++
be/src/vec/sink/delta_writer_v2_pool.cpp | 41 +++++++++++++---------------
be/src/vec/sink/delta_writer_v2_pool.h | 15 ++++------
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +--
4 files changed, 28 insertions(+), 34 deletions(-)
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index c20c7f6745e..d9a63593fa8 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -62,6 +62,8 @@ class Block;
// Writer for a particular (load, index, tablet).
// This class is NOT thread-safe, external synchronization is required.
class DeltaWriterV2 {
+ ENABLE_FACTORY_CREATOR(DeltaWriterV2);
+
public:
DeltaWriterV2(WriteRequest* req, const
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
RuntimeState* state);
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index df9c0fc1c8c..cfb2b5294c7 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -30,12 +30,15 @@ DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int
num_use, DeltaWriterV2P
DeltaWriterV2Map::~DeltaWriterV2Map() = default;
-DeltaWriterV2* DeltaWriterV2Map::get_or_create(
+std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create(
int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()>
creator) {
- _map.lazy_emplace(tablet_id, [&](const
TabletToDeltaWriterV2Map::constructor& ctor) {
- ctor(tablet_id, creator());
- });
- return _map.at(tablet_id).get();
+ std::lock_guard lock(_mutex);
+ if (_map.contains(tablet_id)) {
+ return _map.at(tablet_id);
+ }
+ std::shared_ptr<DeltaWriterV2> writer = creator();
+ _map[tablet_id] = writer;
+ return writer;
}
Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
@@ -48,22 +51,15 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
_pool->erase(_load_id);
}
LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id;
- Status status = Status::OK();
- _map.for_each([&status](auto& entry) {
- if (status.ok()) {
- status = entry.second->close();
- }
- });
- if (!status.ok()) {
- return status;
+ std::lock_guard lock(_mutex);
+ for (auto& [_, writer] : _map) {
+ RETURN_IF_ERROR(writer->close());
}
LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
- _map.for_each([&status, profile](auto& entry) {
- if (status.ok()) {
- status = entry.second->close_wait(profile);
- }
- });
- return status;
+ for (auto& [_, writer] : _map) {
+ RETURN_IF_ERROR(writer->close_wait(profile));
+ }
+ return Status::OK();
}
void DeltaWriterV2Map::cancel(Status status) {
@@ -72,9 +68,10 @@ void DeltaWriterV2Map::cancel(Status status) {
if (num_use == 0 && _pool != nullptr) {
_pool->erase(_load_id);
}
- _map.for_each([&status](auto& entry) {
- static_cast<void>(entry.second->cancel_with_status(status));
- });
+ std::lock_guard lock(_mutex);
+ for (auto& [_, writer] : _map) {
+ static_cast<void>(writer->cancel_with_status(status));
+ }
}
DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h
b/be/src/vec/sink/delta_writer_v2_pool.h
index b2e267bcfd7..912b9216e9f 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.h
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -26,7 +26,6 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
-#include <parallel_hashmap/phmap.h>
#include <stddef.h>
#include <stdint.h>
@@ -67,8 +66,8 @@ public:
~DeltaWriterV2Map();
// get or create delta writer for the given tablet, memory is managed by
DeltaWriterV2Map
- DeltaWriterV2* get_or_create(int64_t tablet_id,
-
std::function<std::unique_ptr<DeltaWriterV2>()> creator);
+ std::shared_ptr<DeltaWriterV2> get_or_create(
+ int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()>
creator);
// close all delta writers in this DeltaWriterV2Map if there is no other
users
Status close(RuntimeProfile* profile = nullptr);
@@ -79,13 +78,9 @@ public:
size_t size() const { return _map.size(); }
private:
- using TabletToDeltaWriterV2Map = phmap::parallel_flat_hash_map<
- int64_t, std::unique_ptr<DeltaWriterV2>, std::hash<int64_t>,
std::equal_to<int64_t>,
- std::allocator<phmap::Pair<const int64_t,
std::unique_ptr<DeltaWriterV2>>>, 4,
- std::mutex>;
-
UniqueId _load_id;
- TabletToDeltaWriterV2Map _map;
+ std::mutex _mutex;
+ std::unordered_map<int64_t, std::shared_ptr<DeltaWriterV2>> _map;
std::atomic<int> _use_cnt;
DeltaWriterV2Pool* _pool = nullptr;
};
@@ -111,4 +106,4 @@ private:
};
} // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index af278228eb0..4cbca9cd8af 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -420,7 +420,7 @@ Status VTabletWriterV2::write(Block& input_block) {
Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block>
block, int64_t tablet_id,
const Rows& rows, const Streams&
streams) {
- DeltaWriterV2* delta_writer =
_delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
+ auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id,
[&]() {
WriteRequest req {
.tablet_id = tablet_id,
.txn_id = _txn_id,
@@ -446,7 +446,7 @@ Status
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
<< " not found in schema, load_id=" <<
print_id(_load_id);
return std::unique_ptr<DeltaWriterV2>(nullptr);
}
- return std::make_unique<DeltaWriterV2>(&req, streams, _state);
+ return DeltaWriterV2::create_unique(&req, streams, _state);
});
if (delta_writer == nullptr) {
LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]