This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch cs_opt_version-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/cs_opt_version-3.1 by this 
push:
     new 66a516e629c fix concurrent issue in VariantColumnReader (#58609)
66a516e629c is described below

commit 66a516e629c2a24d40e5a99be6d0f37e17557b82
Author: lihangyu <[email protected]>
AuthorDate: Tue Dec 2 13:53:38 2025 +0800

    fix concurrent issue in VariantColumnReader (#58609)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../segment_v2/variant/variant_column_reader.cpp   |  22 ++++
 .../segment_v2/variant/variant_column_reader.h     |   6 +
 .../variant_column_writer_reader_test.cpp          | 121 +++++++++++++++++++++
 3 files changed, 149 insertions(+)

diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp 
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
index e872b2b9a64..f225a34834f 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
@@ -49,6 +49,7 @@ namespace doris::segment_v2 {
 
 const SubcolumnColumnMetaInfo::Node* 
VariantColumnReader::get_subcolumn_meta_by_path(
         const vectorized::PathInData& relative_path) const {
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     const auto* node = _subcolumns_meta_info->find_leaf(relative_path);
     if (node) {
         return node;
@@ -62,6 +63,7 @@ const SubcolumnColumnMetaInfo::Node* 
VariantColumnReader::get_subcolumn_meta_by_
 
 bool VariantColumnReader::exist_in_sparse_column(
         const vectorized::PathInData& relative_path) const {
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     // Check if path exist in sparse column
     bool existed_in_sparse_column =
             !_statistics->sparse_column_non_null_size.empty() &&
@@ -77,6 +79,7 @@ bool VariantColumnReader::exist_in_sparse_column(
 }
 
 bool VariantColumnReader::is_exceeded_sparse_column_limit() const {
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     bool exceeded_sparse_column_limit = 
!_statistics->sparse_column_non_null_size.empty() &&
                                         
_statistics->sparse_column_non_null_size.size() >=
                                                 
_variant_sparse_column_statistics_size;
@@ -94,6 +97,7 @@ bool VariantColumnReader::is_exceeded_sparse_column_limit() 
const {
 }
 
 int64_t VariantColumnReader::get_metadata_size() const {
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     int64_t size = ColumnReader::get_metadata_size();
     if (_statistics) {
         for (const auto& [path, _] : _statistics->subcolumns_non_null_size) {
@@ -121,6 +125,11 @@ Status 
VariantColumnReader::_create_hierarchical_reader(ColumnIteratorUPtr* read
     // TODO(lhy): this will load all external meta if not loaded, and memory 
will be consumed.
     RETURN_IF_ERROR(load_external_meta_once());
 
+    // After external meta is loaded, protect reads from `_statistics` and
+    // `_subcolumns_meta_info` against concurrent writers.
+    // english only in comments
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
+
     // Node contains column with children columns or has correspoding sparse 
columns
     // Create reader with hirachical data.
     std::unique_ptr<SubstreamIterator> sparse_iter;
@@ -160,6 +169,7 @@ Status 
VariantColumnReader::_create_sparse_merge_reader(ColumnIteratorUPtr* iter
                                                         const TabletColumn& 
target_col,
                                                         ColumnIteratorUPtr 
inner_iter,
                                                         ColumnReaderCache* 
column_reader_cache) {
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     // Get subcolumns path set from tablet schema
     const auto& path_set_info = 
opts->tablet_schema->path_set_info(target_col.parent_unique_id());
 
@@ -197,6 +207,7 @@ Status 
VariantColumnReader::_create_sparse_merge_reader(ColumnIteratorUPtr* iter
 Status VariantColumnReader::_new_default_iter_with_same_nested(
         ColumnIteratorUPtr* iterator, const TabletColumn& tablet_column,
         const StorageReadOptions* opt, ColumnReaderCache* column_reader_cache) 
{
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     auto relative_path = tablet_column.path_info_ptr()->copy_pop_front();
     // We find node that represents the same Nested type as path.
     const auto* parent = _subcolumns_meta_info->find_best_match(relative_path);
@@ -252,6 +263,8 @@ Status 
VariantColumnReader::_build_read_plan_flat_leaves(ReadPlan* plan,
     // make sure external meta is loaded otherwise can't find any meta data 
for extracted columns
     RETURN_IF_ERROR(load_external_meta_once());
 
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
+
     DCHECK(opts != nullptr);
     auto relative_path = target_col.path_info_ptr()->copy_pop_front();
     plan->relative_path = relative_path;
@@ -317,6 +330,7 @@ Status 
VariantColumnReader::_build_read_plan_flat_leaves(ReadPlan* plan,
 }
 
 bool VariantColumnReader::has_prefix_path(const vectorized::PathInData& 
relative_path) const {
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     if (relative_path.empty()) {
         return true;
     }
@@ -372,6 +386,7 @@ Status 
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
 Status VariantColumnReader::_build_read_plan(ReadPlan* plan, const 
TabletColumn& target_col,
                                              const StorageReadOptions* opt,
                                              ColumnReaderCache* 
column_reader_cache) {
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     // root column use unique id, leaf column use parent_unique_id
     int32_t col_uid =
             target_col.unique_id() >= 0 ? target_col.unique_id() : 
target_col.parent_unique_id();
@@ -750,6 +765,10 @@ Status VariantColumnReader::load_external_meta_once() {
     if (!_ext_meta_reader || !_ext_meta_reader->available()) {
         return Status::OK();
     }
+    // Ensure only one writer can populate `_subcolumns_meta_info` / 
`_statistics`
+    // while readers of these structures hold shared locks.
+    // english only in comments
+    std::unique_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     return _ext_meta_reader->load_all_once(_subcolumns_meta_info.get(), 
_statistics.get());
 }
 
@@ -785,6 +804,7 @@ TabletIndexes 
VariantColumnReader::find_subcolumn_tablet_indexes(
 void VariantColumnReader::get_subcolumns_types(
         std::unordered_map<vectorized::PathInData, vectorized::DataTypes,
                            vectorized::PathInData::Hash>* subcolumns_types) 
const {
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     for (const auto& subcolumn_reader : *_subcolumns_meta_info) {
         auto& path_types = (*subcolumns_types)[subcolumn_reader->path];
         path_types.push_back(subcolumn_reader->data.file_column_type);
@@ -792,6 +812,7 @@ void VariantColumnReader::get_subcolumns_types(
 }
 
 void VariantColumnReader::get_typed_paths(std::unordered_set<std::string>* 
typed_paths) const {
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     for (const auto& entry : *_subcolumns_meta_info) {
         if (entry->path.get_is_typed()) {
             typed_paths->insert(entry->path.get_path());
@@ -802,6 +823,7 @@ void 
VariantColumnReader::get_typed_paths(std::unordered_set<std::string>* typed
 void VariantColumnReader::get_nested_paths(
         std::unordered_set<vectorized::PathInData, 
vectorized::PathInData::Hash>* nested_paths)
         const {
+    std::shared_lock<std::shared_mutex> lock(_subcolumns_meta_mutex);
     for (const auto& entry : *_subcolumns_meta_info) {
         if (entry->path.has_nested_part()) {
             nested_paths->insert(entry->path);
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h 
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
index 01dff89763d..b8a98c9ac02 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
@@ -22,6 +22,7 @@
 
 #include <map>
 #include <memory>
+#include <shared_mutex>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -195,6 +196,11 @@ private:
                                        const TabletColumn& target_col,
                                        ColumnIteratorUPtr inner_iter,
                                        ColumnReaderCache* column_reader_cache);
+
+    // Protect `_subcolumns_meta_info` and `_statistics` when loading external 
meta.
+    // english only in comments
+    mutable std::shared_mutex _subcolumns_meta_mutex;
+
     std::unique_ptr<SubcolumnColumnMetaInfo> _subcolumns_meta_info;
     std::shared_ptr<ColumnReader> _sparse_column_reader;
     std::shared_ptr<ColumnReader> _root_column_reader;
diff --git 
a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp 
b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
index 463c15e5bea..28669a3a73f 100644
--- a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
+++ b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
+#include <thread>
+
 #include "gtest/gtest.h"
 #include "olap/rowset/segment_v2/column_meta_accessor.h"
 #include "olap/rowset/segment_v2/column_reader.h"
@@ -2568,4 +2571,122 @@ TEST_F(VariantColumnWriterReaderTest, 
test_read_with_checksum) {
     }
 }
 
+// Concurrently trigger external meta loading and subcolumn meta access to 
guard against
+// data races between `load_external_meta_once` writer and readers like
+// `get_subcolumn_meta_by_path` / `get_metadata_size`. This roughly simulates 
the
+// production crash stack where one thread was loading external meta while 
another
+// thread was reading from `_subcolumns_meta_info`.
+TEST_F(VariantColumnWriterReaderTest,
+       test_concurrent_load_external_meta_and_get_subcolumn_meta) {
+    // 1. create tablet_schema
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    SchemaUtils::construct_column(schema_pb.add_column(), 1, "VARIANT", "V1");
+    _tablet_schema = std::make_shared<TabletSchema>();
+    _tablet_schema->init_from_pb(schema_pb);
+
+    // 2. create tablet with external segment meta explicitly enabled so that
+    // VariantColumnReader builds a VariantExternalMetaReader.
+    TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));
+    bool external_segment_meta_used_default = true;
+    
_tablet_schema->set_external_segment_meta_used_default(external_segment_meta_used_default);
+    tablet_meta->_tablet_id = 20000;
+    _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, 
_data_dir.get());
+
+    EXPECT_TRUE(_tablet->init().ok());
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+    
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
+
+    // 3. create file_writer
+    io::FileWriterPtr file_writer;
+    auto file_path = local_segment_path(_tablet->tablet_path(), "0", 0);
+    auto st = io::global_local_filesystem()->create_file(file_path, 
&file_writer);
+    EXPECT_TRUE(st.ok()) << st.msg();
+
+    // 4. create column_writer
+    SegmentFooterPB footer;
+    ColumnWriterOptions opts;
+    opts.meta = footer.add_columns();
+    opts.compression_type = CompressionTypePB::LZ4;
+    opts.file_writer = file_writer.get();
+    opts.footer = &footer;
+    RowsetWriterContext rowset_ctx;
+    rowset_ctx.write_type = DataWriteType::TYPE_DIRECT;
+    opts.rowset_ctx = &rowset_ctx;
+    opts.rowset_ctx->tablet_schema = _tablet_schema;
+    TabletColumn column = _tablet_schema->column(0);
+    _init_column_meta(opts.meta, 0, column, CompressionTypePB::LZ4);
+
+    std::unique_ptr<ColumnWriter> writer;
+    EXPECT_TRUE(ColumnWriter::create(opts, &column, file_writer.get(), 
&writer).ok());
+    EXPECT_TRUE(writer->init().ok());
+    EXPECT_TRUE(assert_cast<VariantColumnWriter*>(writer.get()) != nullptr);
+
+    // 5. write a small amount of data to build some subcolumns
+    auto olap_data_convertor = 
std::make_unique<vectorized::OlapBlockDataConvertor>();
+    auto block = _tablet_schema->create_block();
+    auto column_object = 
(*std::move(block.get_by_position(0).column)).mutate();
+    std::unordered_map<int, std::string> inserted_jsonstr;
+    auto path_with_size =
+            VariantUtil::fill_object_column_with_test_data(column_object, 200, 
&inserted_jsonstr);
+    olap_data_convertor->add_column_data_convertor(column);
+    olap_data_convertor->set_source_content(&block, 0, 200);
+    auto [result, accessor] = olap_data_convertor->convert_column_data(0);
+    EXPECT_TRUE(result.ok());
+    EXPECT_TRUE(accessor != nullptr);
+    EXPECT_TRUE(writer->append(accessor->get_nullmap(), accessor->get_data(), 
200).ok());
+    st = writer->finish();
+    EXPECT_TRUE(st.ok()) << st.msg();
+    st = writer->write_data();
+    EXPECT_TRUE(st.ok()) << st.msg();
+    st = writer->write_ordinal_index();
+    EXPECT_TRUE(st.ok()) << st.msg();
+    st = writer->write_zone_map();
+    EXPECT_TRUE(st.ok()) << st.msg();
+    EXPECT_TRUE(file_writer->close().ok());
+    footer.set_num_rows(200);
+
+    // 6. open a VariantColumnReader on this segment
+    io::FileReaderSPtr file_reader;
+    st = io::global_local_filesystem()->open_file(file_path, &file_reader);
+    EXPECT_TRUE(st.ok()) << st.msg();
+    std::shared_ptr<ColumnReader> column_reader;
+    st = create_variant_root_reader(footer, file_reader, _tablet_schema, 
&column_reader);
+    EXPECT_TRUE(st.ok()) << st.msg();
+
+    auto* variant_column_reader = 
assert_cast<VariantColumnReader*>(column_reader.get());
+    EXPECT_TRUE(variant_column_reader != nullptr);
+
+    // 7. run load_external_meta_once and subcolumn meta access concurrently.
+    const int rounds = 200;
+    std::atomic<bool> failed {false};
+    Status writer_status = Status::OK();
+
+    std::thread writer_thread([&] {
+        for (int i = 0; i < rounds && !failed.load(); ++i) {
+            Status s = variant_column_reader->load_external_meta_once();
+            if (!s.ok()) {
+                writer_status = s;
+                failed.store(true);
+                break;
+            }
+        }
+    });
+
+    std::thread reader_thread([&] {
+        for (int i = 0; i < rounds && !failed.load(); ++i) {
+            // Access subcolumn meta and metadata size repeatedly.
+            auto* node = 
variant_column_reader->get_subcolumn_meta_by_path(PathInData("key0"));
+            (void)node;
+            auto meta_size = variant_column_reader->get_metadata_size();
+            (void)meta_size;
+        }
+    });
+
+    writer_thread.join();
+    reader_thread.join();
+
+    EXPECT_TRUE(writer_status.ok());
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to