This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 25b1bc76c0a [fix](scan) Fix incorrect query results due to data race
of compaction and parallel scanners building (#40552)
25b1bc76c0a is described below
commit 25b1bc76c0adfee4442083902b8008b703261210
Author: Siyang Tang <[email protected]>
AuthorDate: Wed Sep 11 21:59:31 2024 +0800
[fix](scan) Fix incorrect query results due to data race of compaction and
parallel scanners building (#40552)
## Proposed changes
Capture rowset splits and delete predicates atomicly in
`ParallelScannerBuilder::_load` as a single read source.
In this way, we could prevent reading stale rowsets with the delete
predicates eliminated by (base) compaction.
---
be/src/olap/parallel_scanner_builder.cpp | 66 +++++++++++++++-----------------
be/src/olap/parallel_scanner_builder.h | 6 ++-
2 files changed, 35 insertions(+), 37 deletions(-)
diff --git a/be/src/olap/parallel_scanner_builder.cpp
b/be/src/olap/parallel_scanner_builder.cpp
index 10bd61cd8d5..33e2762aa44 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -17,9 +17,12 @@
#include "parallel_scanner_builder.h"
+#include <cstddef>
+
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_hotspot.h"
#include "cloud/config.h"
+#include "common/status.h"
#include "olap/rowset/beta_rowset.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scanner.h"
@@ -42,35 +45,28 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
for (auto&& [tablet, version] : _tablets) {
- DCHECK(_all_rowsets.contains(tablet->tablet_id()));
- auto& rowsets = _all_rowsets[tablet->tablet_id()];
-
- TabletReader::ReadSource reade_source_with_delete_info;
+ DCHECK(_all_read_sources.contains(tablet->tablet_id()));
+ auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
if (config::is_cloud_mode()) {
// FIXME(plat1ko): Avoid pointer cast
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
}
- if (!_state->skip_delete_predicate()) {
- RETURN_IF_ERROR(tablet->capture_rs_readers(
- {0, version}, &reade_source_with_delete_info.rs_splits,
false));
- reade_source_with_delete_info.fill_delete_predicates();
- }
-
- TabletReader::ReadSource read_source;
-
+ // `rs_splits` in `entire read source` will be devided into several
partitial read sources
+ // to build several parallel scanners, based on segment rows number.
All the partitial read sources
+ // share the same delete predicates from their corresponding entire
read source.
+ TabletReader::ReadSource partitial_read_source;
int64_t rows_collected = 0;
- for (auto& rowset : rowsets) {
- auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
- RowsetReaderSharedPtr reader;
- RETURN_IF_ERROR(beta_rowset->create_reader(&reader));
- const auto rowset_id = beta_rowset->rowset_id();
+ for (auto& rs_split : entire_read_source.rs_splits) {
+ auto reader = rs_split.rs_reader;
+ auto rowset = reader->rowset();
+ const auto rowset_id = rowset->rowset_id();
DCHECK(_segment_cache_handles.contains(rowset_id));
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
- if (beta_rowset->num_rows() == 0) {
+ if (rowset->num_rows() == 0) {
continue;
}
@@ -110,14 +106,14 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
DCHECK_EQ(split.segment_offsets.second -
split.segment_offsets.first,
split.segment_row_ranges.size());
- read_source.rs_splits.emplace_back(std::move(split));
+
partitial_read_source.rs_splits.emplace_back(std::move(split));
scanners.emplace_back(
_build_scanner(tablet, version, _key_ranges,
-
{std::move(read_source.rs_splits),
-
reade_source_with_delete_info.delete_predicates}));
+
{std::move(partitial_read_source.rs_splits),
+
entire_read_source.delete_predicates}));
- read_source = TabletReader::ReadSource();
+ partitial_read_source = {};
split = RowSetSplits(reader->clone());
row_ranges = RowRanges();
@@ -141,25 +137,24 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
DCHECK_GT(split.segment_offsets.second,
split.segment_offsets.first);
DCHECK_EQ(split.segment_row_ranges.size(),
split.segment_offsets.second -
split.segment_offsets.first);
- read_source.rs_splits.emplace_back(std::move(split));
+ partitial_read_source.rs_splits.emplace_back(std::move(split));
}
} // end `for (auto& rowset : rowsets)`
DCHECK_LE(rows_collected, _rows_per_scanner);
if (rows_collected > 0) {
- DCHECK_GT(read_source.rs_splits.size(), 0);
+ DCHECK_GT(partitial_read_source.rs_splits.size(), 0);
#ifndef NDEBUG
- for (auto& split : read_source.rs_splits) {
+ for (auto& split : partitial_read_source.rs_splits) {
DCHECK(split.rs_reader != nullptr);
DCHECK_LT(split.segment_offsets.first,
split.segment_offsets.second);
DCHECK_EQ(split.segment_row_ranges.size(),
split.segment_offsets.second -
split.segment_offsets.first);
}
#endif
- scanners.emplace_back(
- _build_scanner(tablet, version, _key_ranges,
- {std::move(read_source.rs_splits),
-
reade_source_with_delete_info.delete_predicates}));
+ scanners.emplace_back(_build_scanner(tablet, version, _key_ranges,
+
{std::move(partitial_read_source.rs_splits),
+
entire_read_source.delete_predicates}));
}
}
@@ -173,16 +168,17 @@ Status ParallelScannerBuilder::_load() {
_total_rows = 0;
for (auto&& [tablet, version] : _tablets) {
const auto tablet_id = tablet->tablet_id();
- auto& rowsets = _all_rowsets[tablet_id];
- {
- std::shared_lock read_lock(tablet->get_header_lock());
- RETURN_IF_ERROR(tablet->capture_consistent_rowsets_unlocked({0,
version}, &rowsets));
+ auto& read_source = _all_read_sources[tablet_id];
+ RETURN_IF_ERROR(tablet->capture_rs_readers({0, version},
&read_source.rs_splits, false));
+ if (!_state->skip_delete_predicate()) {
+ read_source.fill_delete_predicates();
}
-
bool enable_segment_cache =
_state->query_options().__isset.enable_segment_cache
?
_state->query_options().enable_segment_cache
: true;
- for (auto& rowset : rowsets) {
+
+ for (auto& rs_split : read_source.rs_splits) {
+ auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
diff --git a/be/src/olap/parallel_scanner_builder.h
b/be/src/olap/parallel_scanner_builder.h
index eb25e183df2..934d769ed59 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -19,8 +19,10 @@
#include <memory>
#include <string>
+#include <unordered_map>
#include <utility>
+#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/segment_v2/row_ranges.h"
#include "olap/segment_loader.h"
#include "olap/tablet.h"
@@ -90,7 +92,7 @@ private:
bool _is_preaggregation;
std::vector<TabletWithVersion> _tablets;
std::vector<OlapScanRange*> _key_ranges;
- std::unordered_map<int64_t, std::vector<RowsetSharedPtr>> _all_rowsets;
+ std::unordered_map<int64_t, TabletReader::ReadSource> _all_read_sources;
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]