This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 395840cbbb [Chore](refactor) Split IndexChannel from vtablet_sink.h
into vtablet_sink.cc (#22848)
395840cbbb is described below
commit 395840cbbba9438793b4240be0da9c0525dcea33
Author: Jack Drogon <[email protected]>
AuthorDate: Sun Aug 13 10:21:12 2023 +0800
[Chore](refactor) Split IndexChannel from vtablet_sink.h into
vtablet_sink.cc (#22848)
Signed-off-by: Jack Drogon <[email protected]>
---
be/src/vec/sink/vtablet_sink.cpp | 78 +++++++++++++++++++++++++++++++++++++++-
be/src/vec/sink/vtablet_sink.h | 78 ----------------------------------------
2 files changed, 77 insertions(+), 79 deletions(-)
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 39589a239d..536a8cf330 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -101,7 +101,83 @@ class TExpr;
namespace stream_load {
-IndexChannel::~IndexChannel() = default;
+class IndexChannel {
+public:
+ IndexChannel(VOlapTableSink* parent, int64_t index_id,
+ const vectorized::VExprContextSPtr& where_clause)
+ : _parent(parent), _index_id(index_id),
_where_clause(where_clause) {
+ _index_channel_tracker =
+ std::make_unique<MemTracker>("IndexChannel:indexID=" +
std::to_string(_index_id));
+ }
+ ~IndexChannel() = default;
+
+ Status init(RuntimeState* state, const std::vector<TTabletWithPartition>&
tablets);
+
+ void for_each_node_channel(
+ const std::function<void(const std::shared_ptr<VNodeChannel>&)>&
func) {
+ for (auto& it : _node_channels) {
+ func(it.second);
+ }
+ }
+
+ void mark_as_failed(int64_t node_id, const std::string& host, const
std::string& err,
+ int64_t tablet_id = -1);
+ Status check_intolerable_failure();
+
+ // set error tablet info in runtime state, so that it can be returned to
FE.
+ void set_error_tablet_in_state(RuntimeState* state);
+
+ size_t num_node_channels() const { return _node_channels.size(); }
+
+ size_t get_pending_bytes() const {
+ size_t mem_consumption = 0;
+ for (auto& kv : _node_channels) {
+ mem_consumption += kv.second->get_pending_bytes();
+ }
+ return mem_consumption;
+ }
+
+ void set_tablets_received_rows(
+ const std::vector<std::pair<int64_t, int64_t>>&
tablets_received_rows, int64_t node_id);
+
+ // check whether the rows num written by different replicas is consistent
+ Status check_tablet_received_rows_consistency();
+
+ vectorized::VExprContextSPtr get_where_clause() { return _where_clause; }
+
+private:
+ friend class VNodeChannel;
+ friend class VOlapTableSink;
+
+ VOlapTableSink* _parent;
+ int64_t _index_id;
+ vectorized::VExprContextSPtr _where_clause;
+
+ // from backend channel to tablet_id
+ // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`.
+ // Because the destruct order of objects is opposite to the creation order.
+ // So NodeChannel will be destructured first.
+ // And the destructor function of NodeChannel waits for all RPCs to finish.
+ // This ensures that it is safe to use `_tablets_by_channel` in the
callback function for the end of the RPC.
+ std::unordered_map<int64_t, std::unordered_set<int64_t>>
_tablets_by_channel;
+ // BeId -> channel
+ std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
+ // from tablet_id to backend channel
+ std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>>
_channels_by_tablet;
+
+ // lock to protect _failed_channels and _failed_channels_msgs
+ mutable doris::SpinLock _fail_lock;
+ // key is tablet_id, value is a set of failed node id
+ std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
+ // key is tablet_id, value is error message
+ std::unordered_map<int64_t, std::string> _failed_channels_msgs;
+ Status _intolerable_failure_status = Status::OK();
+
+ std::unique_ptr<MemTracker> _index_channel_tracker;
+ // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id,
rows_num>
+ // used to verify whether the rows num received by different replicas is
consistent
+ std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>>
_tablets_received_rows;
+};
Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPartition>& tablets) {
SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index be50285fd8..8b45f251ee 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -382,84 +382,6 @@ protected:
ReusableClosure<PTabletWriterAddBlockResult>* _add_block_closure = nullptr;
};
-class IndexChannel {
-public:
- IndexChannel(VOlapTableSink* parent, int64_t index_id,
- const vectorized::VExprContextSPtr& where_clause)
- : _parent(parent), _index_id(index_id),
_where_clause(where_clause) {
- _index_channel_tracker =
- std::make_unique<MemTracker>("IndexChannel:indexID=" +
std::to_string(_index_id));
- }
- ~IndexChannel();
-
- Status init(RuntimeState* state, const std::vector<TTabletWithPartition>&
tablets);
-
- void for_each_node_channel(
- const std::function<void(const std::shared_ptr<VNodeChannel>&)>&
func) {
- for (auto& it : _node_channels) {
- func(it.second);
- }
- }
-
- void mark_as_failed(int64_t node_id, const std::string& host, const
std::string& err,
- int64_t tablet_id = -1);
- Status check_intolerable_failure();
-
- // set error tablet info in runtime state, so that it can be returned to
FE.
- void set_error_tablet_in_state(RuntimeState* state);
-
- size_t num_node_channels() const { return _node_channels.size(); }
-
- size_t get_pending_bytes() const {
- size_t mem_consumption = 0;
- for (auto& kv : _node_channels) {
- mem_consumption += kv.second->get_pending_bytes();
- }
- return mem_consumption;
- }
-
- void set_tablets_received_rows(
- const std::vector<std::pair<int64_t, int64_t>>&
tablets_received_rows, int64_t node_id);
-
- // check whether the rows num written by different replicas is consistent
- Status check_tablet_received_rows_consistency();
-
- vectorized::VExprContextSPtr get_where_clause() { return _where_clause; }
-
-private:
- friend class VNodeChannel;
- friend class VOlapTableSink;
-
- VOlapTableSink* _parent;
- int64_t _index_id;
- vectorized::VExprContextSPtr _where_clause;
-
- // from backend channel to tablet_id
- // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`.
- // Because the destruct order of objects is opposite to the creation order.
- // So NodeChannel will be destructured first.
- // And the destructor function of NodeChannel waits for all RPCs to finish.
- // This ensures that it is safe to use `_tablets_by_channel` in the
callback function for the end of the RPC.
- std::unordered_map<int64_t, std::unordered_set<int64_t>>
_tablets_by_channel;
- // BeId -> channel
- std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
- // from tablet_id to backend channel
- std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>>
_channels_by_tablet;
-
- // lock to protect _failed_channels and _failed_channels_msgs
- mutable doris::SpinLock _fail_lock;
- // key is tablet_id, value is a set of failed node id
- std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
- // key is tablet_id, value is error message
- std::unordered_map<int64_t, std::string> _failed_channels_msgs;
- Status _intolerable_failure_status = Status::OK();
-
- std::unique_ptr<MemTracker> _index_channel_tracker;
- // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id,
rows_num>
- // used to verify whether the rows num received by different replicas is
consistent
- std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>>
_tablets_received_rows;
-};
-
// Write block data to Olap Table.
// When OlapTableSink::open() called, there will be a consumer thread running
in the background.
// When you call VOlapTableSink::send(), you will be the producer who products
pending batches.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]