This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new e1a9ec6231 [branch-2.0-pick] pick "[Fix](merge-on-write) Check the
returned filtered rows from different replicas" #24191 (#24400)
e1a9ec6231 is described below
commit e1a9ec6231ce2b930e017072a48b8c1a89ca5f56
Author: bobhan1 <[email protected]>
AuthorDate: Thu Sep 14 21:26:30 2023 +0800
[branch-2.0-pick] pick "[Fix](merge-on-write) Check the returned filtered
rows from different replicas" #24191 (#24400)
---
be/src/runtime/runtime_state.h | 4 +-
be/src/vec/sink/vtablet_sink.cpp | 53 +++++++++++++++++++++-
be/src/vec/sink/vtablet_sink.h | 17 +++++++
.../test_partial_update_strict_mode.groovy | 9 ++--
4 files changed, 75 insertions(+), 8 deletions(-)
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index ac9c2bcdb0..9ef2eb3c7a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -269,8 +269,8 @@ public:
_num_rows_load_unselected.fetch_add(num_rows);
}
- void update_num_rows_filtered_in_strict_mode_partial_update(int64_t
num_rows) {
- _num_rows_filtered_in_strict_mode_partial_update += num_rows;
+ void set_num_rows_filtered_in_strict_mode_partial_update(int64_t num_rows)
{
+ _num_rows_filtered_in_strict_mode_partial_update = num_rows;
}
void set_per_fragment_instance_idx(int idx) { _per_fragment_instance_idx =
idx; }
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 08fb10a099..ba720967a2 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -201,6 +201,13 @@ void IndexChannel::set_tablets_received_rows(
}
}
+void IndexChannel::set_tablets_filtered_rows(
+ const std::vector<std::pair<int64_t, int64_t>>& tablets_filtered_rows,
int64_t node_id) {
+ for (const auto& [tablet_id, rows_num] : tablets_filtered_rows) {
+ _tablets_filtered_rows[tablet_id].emplace_back(node_id, rows_num);
+ }
+}
+
Status IndexChannel::check_tablet_received_rows_consistency() {
for (auto& tablet : _tablets_received_rows) {
for (size_t i = 0; i < tablet.second.size(); i++) {
@@ -225,6 +232,30 @@ Status
IndexChannel::check_tablet_received_rows_consistency() {
return Status::OK();
}
+Status IndexChannel::check_tablet_filtered_rows_consistency() {
+ for (auto& tablet : _tablets_filtered_rows) {
+ for (size_t i = 0; i < tablet.second.size(); i++) {
+ VLOG_NOTICE << "check_tablet_filtered_rows_consistency, load_id: "
<< _parent->_load_id
+ << ", txn_id: " << std::to_string(_parent->_txn_id)
+ << ", tablet_id: " << tablet.first
+ << ", node_id: " << tablet.second[i].first
+ << ", rows_num: " << tablet.second[i].second;
+ if (i == 0) {
+ continue;
+ }
+ if (tablet.second[i].second != tablet.second[0].second) {
+ return Status::InternalError(
+ "rows num filtered by multi replicas doest't match,
load_id={}, txn_id={}, "
+ "tablt_id={}, node_id={}, rows_num={}, node_id={},
rows_num={}",
+ print_id(_parent->_load_id), _parent->_txn_id,
tablet.first,
+ tablet.second[i].first, tablet.second[i].second,
tablet.second[0].first,
+ tablet.second[0].second);
+ }
+ }
+ }
+ return Status::OK();
+}
+
VNodeChannel::VNodeChannel(VOlapTableSink* parent, IndexChannel*
index_channel, int64_t node_id)
: _parent(parent), _index_channel(index_channel), _node_id(node_id) {
_node_channel_tracker = std::make_shared<MemTracker>(fmt::format(
@@ -432,8 +463,8 @@ Status VNodeChannel::open_wait() {
tablet.received_rows());
}
if (tablet.has_num_rows_filtered()) {
-
_state->update_num_rows_filtered_in_strict_mode_partial_update(
- tablet.num_rows_filtered());
+ _tablets_filtered_rows.emplace_back(tablet.tablet_id(),
+
tablet.num_rows_filtered());
}
VLOG_CRITICAL << "master replica commit info: tabletId="
<< tablet.tablet_id()
<< ", backendId=" << _node_id
@@ -866,6 +897,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
_index_channel->set_error_tablet_in_state(state);
_index_channel->set_tablets_received_rows(_tablets_received_rows,
_node_id);
+ _index_channel->set_tablets_filtered_rows(_tablets_filtered_rows,
_node_id);
return Status::OK();
}
@@ -1398,6 +1430,8 @@ Status
VOlapTableSink::_cancel_channel_and_check_intolerable_failure(
status = index_st;
} else if (Status st = ich->check_tablet_received_rows_consistency();
!st.ok()) {
status = st;
+ } else if (Status st = ich->check_tablet_filtered_rows_consistency();
!st.ok()) {
+ status = st;
}
return status;
}
@@ -1521,6 +1555,21 @@ Status VOlapTableSink::close(RuntimeState* state, Status
exec_status) {
&total_wait_exec_time_ns,
&wait_exec_time,
&total_add_batch_num);
});
+
+ // Due to the non-determinism of compaction, the rowsets of
each replica may be different from each other on different
+ // BE nodes. The number of rows filtered in SegmentWriter
depends on the historical rowsets located in the correspoding
+ // BE node. So we check the number of rows filtered on each
succeccful BE to ensure the consistency of the current load
+ if (!_write_single_replica && _schema->is_strict_mode() &&
+ _schema->is_partial_update()) {
+ if (Status st =
index_channel->check_tablet_filtered_rows_consistency();
+ !st.ok()) {
+ status = st;
+ } else {
+
state->set_num_rows_filtered_in_strict_mode_partial_update(
+ index_channel->num_rows_filtered());
+ }
+ }
+
num_node_channels += index_channel->num_node_channels();
if (add_batch_exec_time > max_add_batch_exec_time_ns) {
max_add_batch_exec_time_ns = add_batch_exec_time;
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 7b851c420e..26d2ba55f3 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -37,6 +37,7 @@
#include <map>
#include <memory>
#include <mutex>
+#include <numeric>
#include <ostream>
#include <queue>
#include <set>
@@ -370,6 +371,8 @@ protected:
RuntimeState* _state;
// rows number received per tablet, tablet_id -> rows_num
std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows;
+ // rows number filtered per tablet, tablet_id -> filtered_rows_num
+ std::vector<std::pair<int64_t, int64_t>> _tablets_filtered_rows;
std::unique_ptr<vectorized::MutableBlock> _cur_mutable_block;
PTabletWriterAddBlockRequest _cur_add_block_request;
@@ -418,9 +421,20 @@ public:
void set_tablets_received_rows(
const std::vector<std::pair<int64_t, int64_t>>&
tablets_received_rows, int64_t node_id);
+ void set_tablets_filtered_rows(
+ const std::vector<std::pair<int64_t, int64_t>>&
tablets_filtered_rows, int64_t node_id);
+ int64_t num_rows_filtered() {
+ CHECK(!_tablets_filtered_rows.empty());
+ // the Unique table has no roll up or materilized view
+ // we just add up filtered rows from all partitions
+ return std::accumulate(_tablets_filtered_rows.cbegin(),
_tablets_filtered_rows.cend(), 0,
+ [](int64_t sum, const auto& a) { return sum +
a.second[0].second; });
+ }
// check whether the rows num written by different replicas is consistent
Status check_tablet_received_rows_consistency();
+ // check whether the rows num filtered by different replicas is consistent
+ Status check_tablet_filtered_rows_consistency();
vectorized::VExprContextSPtr get_where_clause() { return _where_clause; }
@@ -456,6 +470,9 @@ private:
// rows num received by DeltaWriter per tablet, tablet_id -> <node_Id,
rows_num>
// used to verify whether the rows num received by different replicas is
consistent
std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>>
_tablets_received_rows;
+ // rows num filtered by DeltaWriter per tablet, tablet_id -> <node_Id,
filtered_rows_num>
+ // used to verify whether the rows num filtered by different replicas is
consistent
+ std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>>
_tablets_filtered_rows;
};
// Write block data to Olap Table.
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
index 256cf018d5..3b7a875f2c 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
@@ -30,7 +30,7 @@ suite("test_partial_update_strict_mode", "p0") {
`last_access_time` datetime NULL
) ENGINE = OLAP UNIQUE KEY(`id`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`)
- BUCKETS AUTO PROPERTIES (
+ BUCKETS 4 PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2",
"enable_unique_key_merge_on_write" = "true",
@@ -80,7 +80,7 @@ suite("test_partial_update_strict_mode", "p0") {
`last_access_time` datetime NULL
) ENGINE = OLAP UNIQUE KEY(`id`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`)
- BUCKETS AUTO PROPERTIES (
+ BUCKETS 4 PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2",
"enable_unique_key_merge_on_write" = "true",
@@ -129,7 +129,7 @@ suite("test_partial_update_strict_mode", "p0") {
`last_access_time` datetime NULL
) ENGINE = OLAP UNIQUE KEY(`id`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`)
- BUCKETS AUTO PROPERTIES (
+ BUCKETS 4 PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2",
"enable_unique_key_merge_on_write" = "true",
@@ -179,7 +179,7 @@ suite("test_partial_update_strict_mode", "p0") {
`last_access_time` datetime NULL
) ENGINE = OLAP UNIQUE KEY(`id`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`)
- BUCKETS AUTO PROPERTIES (
+ BUCKETS 4 PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2",
"enable_unique_key_merge_on_write" = "true",
@@ -211,4 +211,5 @@ suite("test_partial_update_strict_mode", "p0") {
sql "sync"
qt_sql """select * from ${tableName4} order by id;"""
+ sql """ DROP TABLE IF EXISTS ${tableName4}; """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]