This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch cs_opt_version-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/cs_opt_version-3.1 by this
push:
new 66a516e629c fix concurrent issue in VariantColumnReader (#58609)
66a516e629c is described below
commit 66a516e629c2a24d40e5a99be6d0f37e17557b82
Author: lihangyu <[email protected]>
AuthorDate: Tue Dec 2 13:53:38 2025 +0800
fix concurrent issue in VariantColumnReader (#58609)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../segment_v2/variant/variant_column_reader.cpp | 22 ++++
.../segment_v2/variant/variant_column_reader.h | 6 +
.../variant_column_writer_reader_test.cpp | 121 +++++++++++++++++++++
3 files changed, 149 insertions(+)
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
index e872b2b9a64..f225a34834f 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
@@ -49,6 +49,7 @@ namespace doris::segment_v2 {
const SubcolumnColumnMetaInfo::Node*
VariantColumnReader::get_subcolumn_meta_by_path(
const vectorized::PathInData& relative_path) const {
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
const auto* node = _subcolumns_meta_info->find_leaf(relative_path);
if (node) {
return node;
@@ -62,6 +63,7 @@ const SubcolumnColumnMetaInfo::Node*
VariantColumnReader::get_subcolumn_meta_by_
bool VariantColumnReader::exist_in_sparse_column(
const vectorized::PathInData& relative_path) const {
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
// Check if path exist in sparse column
bool existed_in_sparse_column =
!_statistics->sparse_column_non_null_size.empty() &&
@@ -77,6 +79,7 @@ bool VariantColumnReader::exist_in_sparse_column(
}
bool VariantColumnReader::is_exceeded_sparse_column_limit() const {
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
bool exceeded_sparse_column_limit =
!_statistics->sparse_column_non_null_size.empty() &&
_statistics->sparse_column_non_null_size.size() >=
_variant_sparse_column_statistics_size;
@@ -94,6 +97,7 @@ bool VariantColumnReader::is_exceeded_sparse_column_limit()
const {
}
int64_t VariantColumnReader::get_metadata_size() const {
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
int64_t size = ColumnReader::get_metadata_size();
if (_statistics) {
for (const auto& [path, _] : _statistics->subcolumns_non_null_size) {
@@ -121,6 +125,11 @@ Status
VariantColumnReader::_create_hierarchical_reader(ColumnIteratorUPtr* read
// TODO(lhy): this will load all external meta if not loaded, and memory
will be consumed.
RETURN_IF_ERROR(load_external_meta_once());
+ // After external meta is loaded, protect reads from `_statistics` and
+ // `_subcolumns_meta_info` against concurrent writers.
+ // english only in comments
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
+
// Node contains column with children columns or has correspoding sparse
columns
// Create reader with hirachical data.
std::unique_ptr<SubstreamIterator> sparse_iter;
@@ -160,6 +169,7 @@ Status
VariantColumnReader::_create_sparse_merge_reader(ColumnIteratorUPtr* iter
const TabletColumn&
target_col,
ColumnIteratorUPtr
inner_iter,
ColumnReaderCache*
column_reader_cache) {
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
// Get subcolumns path set from tablet schema
const auto& path_set_info =
opts->tablet_schema->path_set_info(target_col.parent_unique_id());
@@ -197,6 +207,7 @@ Status
VariantColumnReader::_create_sparse_merge_reader(ColumnIteratorUPtr* iter
Status VariantColumnReader::_new_default_iter_with_same_nested(
ColumnIteratorUPtr* iterator, const TabletColumn& tablet_column,
const StorageReadOptions* opt, ColumnReaderCache* column_reader_cache)
{
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
auto relative_path = tablet_column.path_info_ptr()->copy_pop_front();
// We find node that represents the same Nested type as path.
const auto* parent = _subcolumns_meta_info->find_best_match(relative_path);
@@ -252,6 +263,8 @@ Status
VariantColumnReader::_build_read_plan_flat_leaves(ReadPlan* plan,
// make sure external meta is loaded otherwise can't find any meta data
for extracted columns
RETURN_IF_ERROR(load_external_meta_once());
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
+
DCHECK(opts != nullptr);
auto relative_path = target_col.path_info_ptr()->copy_pop_front();
plan->relative_path = relative_path;
@@ -317,6 +330,7 @@ Status
VariantColumnReader::_build_read_plan_flat_leaves(ReadPlan* plan,
}
bool VariantColumnReader::has_prefix_path(const vectorized::PathInData&
relative_path) const {
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
if (relative_path.empty()) {
return true;
}
@@ -372,6 +386,7 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
Status VariantColumnReader::_build_read_plan(ReadPlan* plan, const
TabletColumn& target_col,
const StorageReadOptions* opt,
ColumnReaderCache*
column_reader_cache) {
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
// root column use unique id, leaf column use parent_unique_id
int32_t col_uid =
target_col.unique_id() >= 0 ? target_col.unique_id() :
target_col.parent_unique_id();
@@ -750,6 +765,10 @@ Status VariantColumnReader::load_external_meta_once() {
if (!_ext_meta_reader || !_ext_meta_reader->available()) {
return Status::OK();
}
+ // Ensure only one writer can populate `_subcolumns_meta_info` /
`_statistics`
+ // while readers of these structures hold shared locks.
+ // english only in comments
+ std::unique_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
return _ext_meta_reader->load_all_once(_subcolumns_meta_info.get(),
_statistics.get());
}
@@ -785,6 +804,7 @@ TabletIndexes
VariantColumnReader::find_subcolumn_tablet_indexes(
void VariantColumnReader::get_subcolumns_types(
std::unordered_map<vectorized::PathInData, vectorized::DataTypes,
vectorized::PathInData::Hash>* subcolumns_types)
const {
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
for (const auto& subcolumn_reader : *_subcolumns_meta_info) {
auto& path_types = (*subcolumns_types)[subcolumn_reader->path];
path_types.push_back(subcolumn_reader->data.file_column_type);
@@ -792,6 +812,7 @@ void VariantColumnReader::get_subcolumns_types(
}
void VariantColumnReader::get_typed_paths(std::unordered_set<std::string>*
typed_paths) const {
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
for (const auto& entry : *_subcolumns_meta_info) {
if (entry->path.get_is_typed()) {
typed_paths->insert(entry->path.get_path());
@@ -802,6 +823,7 @@ void
VariantColumnReader::get_typed_paths(std::unordered_set<std::string>* typed
void VariantColumnReader::get_nested_paths(
std::unordered_set<vectorized::PathInData,
vectorized::PathInData::Hash>* nested_paths)
const {
+ std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
for (const auto& entry : *_subcolumns_meta_info) {
if (entry->path.has_nested_part()) {
nested_paths->insert(entry->path);
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
index 01dff89763d..b8a98c9ac02 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
@@ -22,6 +22,7 @@
#include <map>
#include <memory>
+#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <vector>
@@ -195,6 +196,11 @@ private:
const TabletColumn& target_col,
ColumnIteratorUPtr inner_iter,
ColumnReaderCache* column_reader_cache);
+
+ // Protect `_subcolumns_meta_info` and `_statistics` when loading external
meta.
+ // english only in comments
+ mutable std::shared_mutex _subcolumns_meta_mutex;
+
std::unique_ptr<SubcolumnColumnMetaInfo> _subcolumns_meta_info;
std::shared_ptr<ColumnReader> _sparse_column_reader;
std::shared_ptr<ColumnReader> _root_column_reader;
diff --git
a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
index 463c15e5bea..28669a3a73f 100644
--- a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
+++ b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+#include <atomic>
+#include <thread>
+
#include "gtest/gtest.h"
#include "olap/rowset/segment_v2/column_meta_accessor.h"
#include "olap/rowset/segment_v2/column_reader.h"
@@ -2568,4 +2571,122 @@ TEST_F(VariantColumnWriterReaderTest,
test_read_with_checksum) {
}
}
+// Concurrently trigger external meta loading and subcolumn meta access to
guard against
+// data races between `load_external_meta_once` writer and readers like
+// `get_subcolumn_meta_by_path` / `get_metadata_size`. This roughly simulates
the
+// production crash stack where one thread was loading external meta while
another
+// thread was reading from `_subcolumns_meta_info`.
+TEST_F(VariantColumnWriterReaderTest,
+ test_concurrent_load_external_meta_and_get_subcolumn_meta) {
+ // 1. create tablet_schema
+ TabletSchemaPB schema_pb;
+ schema_pb.set_keys_type(KeysType::DUP_KEYS);
+ SchemaUtils::construct_column(schema_pb.add_column(), 1, "VARIANT", "V1");
+ _tablet_schema = std::make_shared<TabletSchema>();
+ _tablet_schema->init_from_pb(schema_pb);
+
+ // 2. create tablet with external segment meta explicitly enabled so that
+ // VariantColumnReader builds a VariantExternalMetaReader.
+ TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));
+ bool external_segment_meta_used_default = true;
+
_tablet_schema->set_external_segment_meta_used_default(external_segment_meta_used_default);
+ tablet_meta->_tablet_id = 20000;
+ _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta,
_data_dir.get());
+
+ EXPECT_TRUE(_tablet->init().ok());
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
+
+ // 3. create file_writer
+ io::FileWriterPtr file_writer;
+ auto file_path = local_segment_path(_tablet->tablet_path(), "0", 0);
+ auto st = io::global_local_filesystem()->create_file(file_path,
&file_writer);
+ EXPECT_TRUE(st.ok()) << st.msg();
+
+ // 4. create column_writer
+ SegmentFooterPB footer;
+ ColumnWriterOptions opts;
+ opts.meta = footer.add_columns();
+ opts.compression_type = CompressionTypePB::LZ4;
+ opts.file_writer = file_writer.get();
+ opts.footer = &footer;
+ RowsetWriterContext rowset_ctx;
+ rowset_ctx.write_type = DataWriteType::TYPE_DIRECT;
+ opts.rowset_ctx = &rowset_ctx;
+ opts.rowset_ctx->tablet_schema = _tablet_schema;
+ TabletColumn column = _tablet_schema->column(0);
+ _init_column_meta(opts.meta, 0, column, CompressionTypePB::LZ4);
+
+ std::unique_ptr<ColumnWriter> writer;
+ EXPECT_TRUE(ColumnWriter::create(opts, &column, file_writer.get(),
&writer).ok());
+ EXPECT_TRUE(writer->init().ok());
+ EXPECT_TRUE(assert_cast<VariantColumnWriter*>(writer.get()) != nullptr);
+
+ // 5. write a small amount of data to build some subcolumns
+ auto olap_data_convertor =
std::make_unique<vectorized::OlapBlockDataConvertor>();
+ auto block = _tablet_schema->create_block();
+ auto column_object =
(*std::move(block.get_by_position(0).column)).mutate();
+ std::unordered_map<int, std::string> inserted_jsonstr;
+ auto path_with_size =
+ VariantUtil::fill_object_column_with_test_data(column_object, 200,
&inserted_jsonstr);
+ olap_data_convertor->add_column_data_convertor(column);
+ olap_data_convertor->set_source_content(&block, 0, 200);
+ auto [result, accessor] = olap_data_convertor->convert_column_data(0);
+ EXPECT_TRUE(result.ok());
+ EXPECT_TRUE(accessor != nullptr);
+ EXPECT_TRUE(writer->append(accessor->get_nullmap(), accessor->get_data(),
200).ok());
+ st = writer->finish();
+ EXPECT_TRUE(st.ok()) << st.msg();
+ st = writer->write_data();
+ EXPECT_TRUE(st.ok()) << st.msg();
+ st = writer->write_ordinal_index();
+ EXPECT_TRUE(st.ok()) << st.msg();
+ st = writer->write_zone_map();
+ EXPECT_TRUE(st.ok()) << st.msg();
+ EXPECT_TRUE(file_writer->close().ok());
+ footer.set_num_rows(200);
+
+ // 6. open a VariantColumnReader on this segment
+ io::FileReaderSPtr file_reader;
+ st = io::global_local_filesystem()->open_file(file_path, &file_reader);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ std::shared_ptr<ColumnReader> column_reader;
+ st = create_variant_root_reader(footer, file_reader, _tablet_schema,
&column_reader);
+ EXPECT_TRUE(st.ok()) << st.msg();
+
+ auto* variant_column_reader =
assert_cast<VariantColumnReader*>(column_reader.get());
+ EXPECT_TRUE(variant_column_reader != nullptr);
+
+ // 7. run load_external_meta_once and subcolumn meta access concurrently.
+ const int rounds = 200;
+ std::atomic<bool> failed {false};
+ Status writer_status = Status::OK();
+
+ std::thread writer_thread([&] {
+ for (int i = 0; i < rounds && !failed.load(); ++i) {
+ Status s = variant_column_reader->load_external_meta_once();
+ if (!s.ok()) {
+ writer_status = s;
+ failed.store(true);
+ break;
+ }
+ }
+ });
+
+ std::thread reader_thread([&] {
+ for (int i = 0; i < rounds && !failed.load(); ++i) {
+ // Access subcolumn meta and metadata size repeatedly.
+ auto* node =
variant_column_reader->get_subcolumn_meta_by_path(PathInData("key0"));
+ (void)node;
+ auto meta_size = variant_column_reader->get_metadata_size();
+ (void)meta_size;
+ }
+ });
+
+ writer_thread.join();
+ reader_thread.join();
+
+ EXPECT_TRUE(writer_status.ok());
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]