This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e37a86a361 [fix](variant) Avoid mutating shared variant columns 
(#64092)
6e37a86a361 is described below

commit 6e37a86a36135690922638d82c054d75520fdaaa
Author: lihangyu <[email protected]>
AuthorDate: Fri Jun 5 18:34:10 2026 +0800

    [fix](variant) Avoid mutating shared variant columns (#64092)
    
    ### What problem does this PR solve?
    
    Issue Number: None
    
    Related PR: None
    
    Problem Summary: Queries that evaluate VARIANT expressions after local
    exchange can share input blocks across downstream pipeline tasks.
    Variant casts and Variant serialization finalized source columns
    in-place, so one consumer could mutate a shared input column while
    another consumer still expected the original column shape and row count.
    This made local-shuffle query results unstable and could trigger later
    operators to observe changed Variant column contents or sizes. This
    change confines the fix to Variant handling by using private finalized
    Variant copies for cast and serialization paths instead of mutating the
    source column.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test:
    - PATH=/tmp/doris-clang-format-bin:$PATH build-support/clang-format.sh
        - git diff --check HEAD^
        - ./build.sh --be
    - ./run-be-ut.sh --run
    
--filter='ColumnVariantTest.serialize_does_not_finalize_source_column:ColumnVariantTest.block_serialize_does_not_finalize_source_column:FunctionVariantCast.CastFromVariantDoesNotFinalizeSourceColumn:FunctionVariantCast.CastFromVariant'
    - Manual test: with column_nullable.cpp and column_nullable_test.cpp
    reverted from this PR, local 1FE+4BE first local-shuffle repro passed
    16x100 concurrent executions on BE version doris-0.0.0-5078f25a971fc
    - Manual test: with column_nullable.cpp and column_nullable_test.cpp
    reverted from this PR, local 1FE+4BE second local-shuffle repro matched
    local-off baseline for 100 iterations on BE version
    doris-0.0.0-5078f25a971fc
    - Behavior changed: No
    - Does this need documentation: No
---
 be/src/core/data_type/data_type_variant.cpp        | 42 +++++++------
 be/src/exprs/function/cast/cast_to_variant.h       | 68 +++++++++++++---------
 be/test/core/column/column_variant_test.cpp        | 57 ++++++++++++++++++
 .../function/cast/function_variant_cast_test.cpp   | 57 ++++++++++++++++++
 4 files changed, 177 insertions(+), 47 deletions(-)

diff --git a/be/src/core/data_type/data_type_variant.cpp 
b/be/src/core/data_type/data_type_variant.cpp
index 133226def49..fff6ff53408 100644
--- a/be/src/core/data_type/data_type_variant.cpp
+++ b/be/src/core/data_type/data_type_variant.cpp
@@ -61,14 +61,16 @@ bool DataTypeVariant::equals(const IDataType& rhs) const {
 
 int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& 
column,
                                                            int 
be_exec_version) const {
-    const auto& column_variant = assert_cast<const ColumnVariant&>(column);
-    if (!column_variant.is_finalized()) {
-        // Icolumn originates from MutablePtr or block, and therefore can be 
modified.
-        // todo: We should reconsider the logic here, why are we using 
finalize() in this context?
-        const_cast<ColumnVariant&>(column_variant).finalize();
+    const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
+    MutableColumnPtr finalized_column;
+    if (!column_variant->is_finalized()) {
+        // Local exchange can share the same block across downstream tasks. 
Serialize a private
+        // finalized copy so serialization never mutates shared variant 
columns.
+        finalized_column = column_variant->clone_finalized();
+        column_variant = assert_cast<const 
ColumnVariant*>(finalized_column.get());
     }
 
-    const auto& subcolumns = column_variant.get_subcolumns();
+    const auto& subcolumns = column_variant->get_subcolumns();
     size_t size = 0;
 
     size += sizeof(uint32_t);
@@ -95,26 +97,28 @@ int64_t 
DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column
     // sparse column
     // TODO make compability with sparse column
     size += 
ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
-            *column_variant.get_sparse_column(), be_exec_version);
+            *column_variant->get_sparse_column(), be_exec_version);
 
     size += 
ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
-            *column_variant.get_doc_value_column(), be_exec_version);
+            *column_variant->get_doc_value_column(), be_exec_version);
     return size;
 }
 
 char* DataTypeVariant::serialize(const IColumn& column, char* buf, int 
be_exec_version) const {
-    const auto& column_variant = assert_cast<const ColumnVariant&>(column);
-    if (!column_variant.is_finalized()) {
-        // Icolumn originates from block, and therefore can be modified.
-        // todo: We should reconsider the logic here, why are we using 
finalize() in this context?
-        const_cast<ColumnVariant&>(column_variant).finalize();
+    const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
+    MutableColumnPtr finalized_column;
+    if (!column_variant->is_finalized()) {
+        // Local exchange can share the same block across downstream tasks. 
Serialize a private
+        // finalized copy so serialization never mutates shared variant 
columns.
+        finalized_column = column_variant->clone_finalized();
+        column_variant = assert_cast<const 
ColumnVariant*>(finalized_column.get());
     }
 #ifndef NDEBUG
     // DCHECK size
-    column_variant.check_consistency();
+    column_variant->check_consistency();
 #endif
 
-    const auto& subcolumns = column_variant.get_subcolumns();
+    const auto& subcolumns = column_variant->get_subcolumns();
 
     char* size_pos = buf;
     buf += sizeof(uint32_t);
@@ -147,15 +151,15 @@ char* DataTypeVariant::serialize(const IColumn& column, 
char* buf, int be_exec_v
     // Safe case
     unaligned_store<uint32_t>(size_pos, static_cast<UInt32>(num_of_columns));
     // serialize num of rows, only take effect when subcolumns empty
-    unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant.rows()));
+    unaligned_store<uint32_t>(buf, 
static_cast<UInt32>(column_variant->rows()));
     buf += sizeof(uint32_t);
 
     // serialize sparse column
     // TODO make compability with sparse column
-    buf = 
ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_sparse_column(),
-                                                             buf, 
be_exec_version);
-    buf = 
ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_doc_value_column(),
+    buf = 
ColumnVariant::get_binary_column_type()->serialize(*column_variant->get_sparse_column(),
                                                              buf, 
be_exec_version);
+    buf = ColumnVariant::get_binary_column_type()->serialize(
+            *column_variant->get_doc_value_column(), buf, be_exec_version);
     return buf;
 }
 
diff --git a/be/src/exprs/function/cast/cast_to_variant.h 
b/be/src/exprs/function/cast/cast_to_variant.h
index 6c6ed1743fc..0efc29047b1 100644
--- a/be/src/exprs/function/cast/cast_to_variant.h
+++ b/be/src/exprs/function/cast/cast_to_variant.h
@@ -32,45 +32,52 @@ inline Status cast_from_variant_impl(FunctionContext* 
context, Block& block,
     auto& col_with_type_and_name = block.get_by_position(arguments[0]);
     auto& col_from = col_with_type_and_name.column;
     const IColumn* variant_column = col_from.get();
-    if (const auto* nullable = 
check_and_get_column<ColumnNullable>(*variant_column)) {
+    const auto* nullable = 
check_and_get_column<ColumnNullable>(*variant_column);
+    if (nullable != nullptr) {
         variant_column = &nullable->get_nested_column();
     }
+    const auto* variant = assert_cast<const ColumnVariant*>(variant_column);
+    ColumnPtr col_to = data_type_to->create_column();
 
-    if (!assert_cast<const ColumnVariant&>(*variant_column).is_finalized()) {
-        // ColumnVariant should be finalized before parsing, finalize maybe 
modify original column structure
-        auto mutable_column = 
IColumn::mutate(std::move(col_with_type_and_name.column));
-        if (auto* nullable = 
check_and_get_column<ColumnNullable>(*mutable_column)) {
-            const auto& const_nullable = *nullable;
-            auto nested_column = 
IColumn::mutate(const_nullable.get_nested_column_ptr());
-            assert_cast<ColumnVariant&>(*nested_column).finalize();
-            ColumnPtr nested_column_ptr = std::move(nested_column);
-            nullable->change_nested_column(nested_column_ptr);
+    ColumnPtr finalized_input_column;
+    if (!variant->is_finalized()) {
+        // Local exchange can share the same input block across multiple 
downstream tasks.
+        // Finalize a private copy so variant casts never mutate shared input 
columns.
+        auto finalized_variant = variant->clone_finalized();
+        variant = assert_cast<const ColumnVariant*>(finalized_variant.get());
+        if (nullable != nullptr) {
+            auto cloned_null_map =
+                    
nullable->get_null_map_column_ptr()->clone_resized(input_rows_count);
+            finalized_input_column = 
ColumnNullable::create(std::move(finalized_variant),
+                                                            
std::move(cloned_null_map));
         } else {
-            assert_cast<ColumnVariant&>(*mutable_column).finalize();
+            finalized_input_column = std::move(finalized_variant);
         }
-        col_with_type_and_name.column = std::move(mutable_column);
-    }
-
-    variant_column = col_with_type_and_name.column.get();
-    if (const auto* nullable = 
check_and_get_column<ColumnNullable>(*variant_column)) {
-        variant_column = &nullable->get_nested_column();
     }
-    const auto& variant = assert_cast<const ColumnVariant&>(*variant_column);
-    ColumnPtr col_to = data_type_to->create_column();
+    auto execute_on_finalized_input = [&](auto&& executor) -> Status {
+        if (!finalized_input_column) {
+            return executor(block);
+        }
+        Block finalized_block = block;
+        finalized_block.replace_by_position(arguments[0], 
finalized_input_column);
+        RETURN_IF_ERROR(executor(finalized_block));
+        block.replace_by_position(result, 
finalized_block.get_by_position(result).column);
+        return Status::OK();
+    };
 
     // It's important to convert as many elements as possible in this context. 
For instance,
     // if the root of this variant column is a number column, converting it to 
a number column
     // is acceptable. However, if the destination type is a string and root is 
none scalar root, then
     // we should convert the entire tree to a string.
-    bool is_root_valuable = variant.is_scalar_variant() ||
-                            (!variant.is_null_root() &&
-                             variant.get_root_type()->get_primitive_type() != 
INVALID_TYPE &&
+    bool is_root_valuable = variant->is_scalar_variant() ||
+                            (!variant->is_null_root() &&
+                             variant->get_root_type()->get_primitive_type() != 
INVALID_TYPE &&
                              
!is_string_type(data_type_to->get_primitive_type()) &&
                              data_type_to->get_primitive_type() != TYPE_JSONB);
 
     if (is_root_valuable) {
-        ColumnPtr nested = variant.get_root();
-        auto nested_from_type = variant.get_root_type();
+        ColumnPtr nested = variant->get_root();
+        auto nested_from_type = variant->get_root_type();
         // DCHECK(nested_from_type->is_nullable());
         DCHECK(!data_type_to->is_nullable());
         auto new_context = context == nullptr ? nullptr : context->clone();
@@ -105,16 +112,21 @@ inline Status cast_from_variant_impl(FunctionContext* 
context, Block& block,
                                       {0, 1}, input_rows_count);
         }
     } else {
-        if (variant.only_have_default_values()) {
+        if (variant->only_have_default_values()) {
             col_to->assert_mutable()->insert_many_defaults(input_rows_count);
             col_to = make_nullable(col_to, true);
         } else if (is_string_type(data_type_to->get_primitive_type())) {
             // serialize to string
-            return CastToStringFunction::execute_impl(context, block, 
arguments, result,
-                                                      input_rows_count);
+            return execute_on_finalized_input([&](Block& finalized_block) {
+                return CastToStringFunction::execute_impl(context, 
finalized_block, arguments,
+                                                          result, 
input_rows_count);
+            });
         } else if (data_type_to->get_primitive_type() == TYPE_JSONB) {
             // serialize to json by parsing
-            return cast_from_generic_to_jsonb(context, block, arguments, 
result, input_rows_count);
+            return execute_on_finalized_input([&](Block& finalized_block) {
+                return cast_from_generic_to_jsonb(context, finalized_block, 
arguments, result,
+                                                  input_rows_count);
+            });
         } else if (!data_type_to->is_nullable() &&
                    !is_string_type(data_type_to->get_primitive_type())) {
             // other types
diff --git a/be/test/core/column/column_variant_test.cpp 
b/be/test/core/column/column_variant_test.cpp
index dff9e2c0ae5..4db42b8a3ef 100644
--- a/be/test/core/column/column_variant_test.cpp
+++ b/be/test/core/column/column_variant_test.cpp
@@ -27,8 +27,11 @@
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <memory>
 
+#include "agent/be_exec_version_manager.h"
 #include "common/cast_set.h"
+#include "core/block/block.h"
 #include "core/column/column_variant.cpp"
 #include "core/column/common_column_test.h"
 #include "core/column/subcolumn_tree.h"
@@ -40,9 +43,11 @@
 #include "core/types.h"
 #include "core/value/jsonb_value.h"
 #include "exec/common/variant_util.h"
+#include "gen_cpp/data.pb.h"
 #include "storage/olap_common.h"
 #include "testutil/test_util.h"
 #include "testutil/variant_util.h"
+#include "util/block_compression.h"
 
 using namespace doris;
 namespace doris {
@@ -2025,6 +2030,58 @@ TEST_F(ColumnVariantTest, clone_finalized) {
     test_func(std::move(cloned_object));
 }
 
+TEST_F(ColumnVariantTest, clone_finalized_deep_copies_columns) {
+    auto source_column = VariantUtil::construct_advanced_varint_column();
+    source_column->finalize(ColumnVariant::FinalizeMode::READ_MODE);
+
+    auto cloned = source_column->clone_finalized();
+    auto* cloned_variant = assert_cast<ColumnVariant*>(cloned.get());
+    EXPECT_TRUE(cloned_variant->is_finalized());
+
+    for (const auto& source_subcolumn : source_column->get_subcolumns()) {
+        const auto* cloned_subcolumn =
+                
cloned_variant->get_subcolumns().find_exact(source_subcolumn->path);
+        ASSERT_NE(cloned_subcolumn, nullptr);
+        EXPECT_NE(source_subcolumn->data.get_finalized_column_ptr().get(),
+                  cloned_subcolumn->data.get_finalized_column_ptr().get())
+                << source_subcolumn->path.get_path();
+    }
+    EXPECT_NE(source_column->get_sparse_column().get(), 
cloned_variant->get_sparse_column().get());
+    EXPECT_NE(source_column->get_doc_value_column().get(),
+              cloned_variant->get_doc_value_column().get());
+}
+
+TEST_F(ColumnVariantTest, serialize_does_not_finalize_source_column) {
+    auto source_column = VariantUtil::construct_advanced_varint_column();
+    ASSERT_FALSE(source_column->is_finalized());
+
+    const int be_exec_version = BeExecVersionManager::get_newest_version();
+    const auto size =
+            dt_variant->get_uncompressed_serialized_bytes(*source_column, 
be_exec_version);
+    EXPECT_FALSE(source_column->is_finalized());
+
+    auto buffer = std::make_unique<char[]>(size);
+    dt_variant->serialize(*source_column, buffer.get(), be_exec_version);
+    EXPECT_FALSE(source_column->is_finalized());
+}
+
+TEST_F(ColumnVariantTest, block_serialize_does_not_finalize_source_column) {
+    auto source_column = VariantUtil::construct_advanced_varint_column();
+    ASSERT_FALSE(source_column->is_finalized());
+
+    Block block({{source_column->get_ptr(), dt_variant, "variant_col"}});
+    PBlock pblock;
+    size_t uncompressed_bytes = 0;
+    size_t compressed_bytes = 0;
+    int64_t compress_time = 0;
+    auto status = block.serialize(BeExecVersionManager::get_newest_version(), 
&pblock,
+                                  &uncompressed_bytes, &compressed_bytes, 
&compress_time,
+                                  segment_v2::NO_COMPRESSION);
+    ASSERT_TRUE(status.ok()) << status;
+    EXPECT_FALSE(source_column->is_finalized());
+    EXPECT_GT(pblock.column_values().size(), 0);
+}
+
 TEST_F(ColumnVariantTest, sanitize) {
     auto test_func = [](const auto& source_column) {
         auto src_size = source_column->size();
diff --git a/be/test/exprs/function/cast/function_variant_cast_test.cpp 
b/be/test/exprs/function/cast/function_variant_cast_test.cpp
index 960637bf150..51034ad6e03 100644
--- a/be/test/exprs/function/cast/function_variant_cast_test.cpp
+++ b/be/test/exprs/function/cast/function_variant_cast_test.cpp
@@ -284,6 +284,63 @@ TEST(FunctionVariantCast, CastFromVariant) {
     }
 }
 
+TEST(FunctionVariantCast, CastFromVariantDoesNotFinalizeSourceColumn) {
+    auto variant_type = std::make_shared<DataTypeVariant>();
+    auto int32_type = std::make_shared<DataTypeInt32>();
+    auto string_type = std::make_shared<DataTypeString>();
+    auto variant_col = construct_basic_varint_column();
+
+    ASSERT_FALSE(variant_col->is_finalized());
+
+    {
+        ColumnsWithTypeAndName arguments {{variant_col->get_ptr(), 
variant_type, "variant_col"},
+                                          {nullptr, int32_type, "int32_type"}};
+
+        auto function =
+                SimpleFunctionFactory::instance().get_function("CAST", 
arguments, int32_type);
+        ASSERT_NE(function, nullptr);
+
+        Block block {arguments};
+        size_t result_column = block.columns();
+        block.insert({nullptr, int32_type, "result"});
+
+        RuntimeState state;
+        auto ctx = FunctionContext::create_context(&state, {}, {});
+        ASSERT_TRUE(
+                function->execute(ctx.get(), block, {0}, result_column, 
variant_col->size()).ok());
+
+        EXPECT_FALSE(variant_col->is_finalized());
+
+        auto result_col = block.get_by_position(result_column).column;
+        ASSERT_NE(result_col.get(), nullptr);
+        ASSERT_EQ(result_col->size(), variant_col->size());
+    }
+
+    {
+        ColumnsWithTypeAndName arguments {{variant_col->get_ptr(), 
variant_type, "variant_col"},
+                                          {nullptr, string_type, 
"string_type"}};
+
+        auto function =
+                SimpleFunctionFactory::instance().get_function("CAST", 
arguments, string_type);
+        ASSERT_NE(function, nullptr);
+
+        Block block {arguments};
+        size_t result_column = block.columns();
+        block.insert({nullptr, string_type, "result"});
+
+        RuntimeState state;
+        auto ctx = FunctionContext::create_context(&state, {}, {});
+        ASSERT_TRUE(
+                function->execute(ctx.get(), block, {0}, result_column, 
variant_col->size()).ok());
+
+        EXPECT_FALSE(variant_col->is_finalized());
+
+        auto result_col = block.get_by_position(result_column).column;
+        ASSERT_NE(result_col.get(), nullptr);
+        ASSERT_EQ(result_col->size(), variant_col->size());
+    }
+}
+
 TEST(FunctionVariantCast, CastVariantWithNull) {
     auto variant_type = std::make_shared<DataTypeVariant>();
     auto int32_type = std::make_shared<DataTypeInt32>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to