This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 646735ef5f6 branch-4.1: [fix](variant) Avoid mutating shared variant 
columns (#64198)
646735ef5f6 is described below

commit 646735ef5f671ce30d3d71b34552d7ac74735449
Author: lihangyu <[email protected]>
AuthorDate: Mon Jun 8 16:16:32 2026 +0800

    branch-4.1: [fix](variant) Avoid mutating shared variant columns (#64198)
    
    cherry-pick #64092
---
 be/src/core/data_type/data_type_variant.cpp        | 38 +++++++++------
 be/src/exprs/function/cast/cast_to_variant.h       | 55 +++++++++++++++------
 be/test/core/column/column_variant_test.cpp        | 57 ++++++++++++++++++++++
 .../function/cast/function_variant_cast_test.cpp   | 57 ++++++++++++++++++++++
 4 files changed, 178 insertions(+), 29 deletions(-)

diff --git a/be/src/core/data_type/data_type_variant.cpp 
b/be/src/core/data_type/data_type_variant.cpp
index 3a8ff4ce716..c83efe4cc45 100644
--- a/be/src/core/data_type/data_type_variant.cpp
+++ b/be/src/core/data_type/data_type_variant.cpp
@@ -62,12 +62,16 @@ bool DataTypeVariant::equals(const IDataType& rhs) const {
 
 int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& 
column,
                                                            int 
be_exec_version) const {
-    const auto& column_variant = assert_cast<const ColumnVariant&>(column);
-    if (!column_variant.is_finalized()) {
-        const_cast<ColumnVariant&>(column_variant).finalize();
+    const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
+    MutableColumnPtr finalized_column;
+    if (!column_variant->is_finalized()) {
+        // Local exchange can share the same block across downstream tasks. 
Serialize a private
+        // finalized copy so serialization never mutates shared variant 
columns.
+        finalized_column = column_variant->clone_finalized();
+        column_variant = assert_cast<const 
ColumnVariant*>(finalized_column.get());
     }
 
-    const auto& subcolumns = column_variant.get_subcolumns();
+    const auto& subcolumns = column_variant->get_subcolumns();
     size_t size = 0;
 
     size += sizeof(uint32_t);
@@ -94,24 +98,28 @@ int64_t 
DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column
     // sparse column
     // TODO make compability with sparse column
     size += 
ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
-            *column_variant.get_sparse_column(), be_exec_version);
+            *column_variant->get_sparse_column(), be_exec_version);
 
     size += 
ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
-            *column_variant.get_doc_value_column(), be_exec_version);
+            *column_variant->get_doc_value_column(), be_exec_version);
     return size;
 }
 
 char* DataTypeVariant::serialize(const IColumn& column, char* buf, int 
be_exec_version) const {
-    const auto& column_variant = assert_cast<const ColumnVariant&>(column);
-    if (!column_variant.is_finalized()) {
-        const_cast<ColumnVariant&>(column_variant).finalize();
+    const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
+    MutableColumnPtr finalized_column;
+    if (!column_variant->is_finalized()) {
+        // Local exchange can share the same block across downstream tasks. 
Serialize a private
+        // finalized copy so serialization never mutates shared variant 
columns.
+        finalized_column = column_variant->clone_finalized();
+        column_variant = assert_cast<const 
ColumnVariant*>(finalized_column.get());
     }
 #ifndef NDEBUG
     // DCHECK size
-    column_variant.check_consistency();
+    column_variant->check_consistency();
 #endif
 
-    const auto& subcolumns = column_variant.get_subcolumns();
+    const auto& subcolumns = column_variant->get_subcolumns();
 
     char* size_pos = buf;
     buf += sizeof(uint32_t);
@@ -144,15 +152,15 @@ char* DataTypeVariant::serialize(const IColumn& column, 
char* buf, int be_exec_v
     // Safe case
     unaligned_store<uint32_t>(size_pos, static_cast<UInt32>(num_of_columns));
     // serialize num of rows, only take effect when subcolumns empty
-    unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant.rows()));
+    unaligned_store<uint32_t>(buf, 
static_cast<UInt32>(column_variant->rows()));
     buf += sizeof(uint32_t);
 
     // serialize sparse column
     // TODO make compability with sparse column
-    buf = 
ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_sparse_column(),
-                                                             buf, 
be_exec_version);
-    buf = 
ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_doc_value_column(),
+    buf = 
ColumnVariant::get_binary_column_type()->serialize(*column_variant->get_sparse_column(),
                                                              buf, 
be_exec_version);
+    buf = ColumnVariant::get_binary_column_type()->serialize(
+            *column_variant->get_doc_value_column(), buf, be_exec_version);
     return buf;
 }
 
diff --git a/be/src/exprs/function/cast/cast_to_variant.h 
b/be/src/exprs/function/cast/cast_to_variant.h
index 3c0e2c1ee92..bbb0462bb7f 100644
--- a/be/src/exprs/function/cast/cast_to_variant.h
+++ b/be/src/exprs/function/cast/cast_to_variant.h
@@ -32,30 +32,52 @@ inline Status cast_from_variant_impl(FunctionContext* 
context, Block& block,
     const auto& col_with_type_and_name = block.get_by_position(arguments[0]);
     const auto& col_from = col_with_type_and_name.column;
     const IColumn* variant_column = col_from.get();
-    if (const auto* nullable = 
check_and_get_column<ColumnNullable>(*variant_column)) {
+    const auto* nullable = 
check_and_get_column<ColumnNullable>(*variant_column);
+    if (nullable != nullptr) {
         variant_column = &nullable->get_nested_column();
     }
-    const auto& variant = assert_cast<const ColumnVariant&>(*variant_column);
+    const auto* variant = assert_cast<const ColumnVariant*>(variant_column);
     ColumnPtr col_to = data_type_to->create_column();
 
-    if (!variant.is_finalized()) {
-        // ColumnVariant should be finalized before parsing, finalize maybe 
modify original column structure
-        variant.assume_mutable()->finalize();
+    ColumnPtr finalized_input_column;
+    if (!variant->is_finalized()) {
+        // Local exchange can share the same input block across multiple 
downstream tasks.
+        // Finalize a private copy so variant casts never mutate shared input 
columns.
+        auto finalized_variant = variant->clone_finalized();
+        variant = assert_cast<const ColumnVariant*>(finalized_variant.get());
+        if (nullable != nullptr) {
+            auto cloned_null_map =
+                    
nullable->get_null_map_column_ptr()->clone_resized(input_rows_count);
+            finalized_input_column = 
ColumnNullable::create(std::move(finalized_variant),
+                                                            
std::move(cloned_null_map));
+        } else {
+            finalized_input_column = std::move(finalized_variant);
+        }
     }
+    auto execute_on_finalized_input = [&](auto&& executor) -> Status {
+        if (!finalized_input_column) {
+            return executor(block);
+        }
+        Block finalized_block = block;
+        finalized_block.replace_by_position(arguments[0], 
finalized_input_column);
+        RETURN_IF_ERROR(executor(finalized_block));
+        block.replace_by_position(result, 
finalized_block.get_by_position(result).column);
+        return Status::OK();
+    };
 
     // It's important to convert as many elements as possible in this context. 
For instance,
     // if the root of this variant column is a number column, converting it to 
a number column
     // is acceptable. However, if the destination type is a string and root is 
none scalar root, then
     // we should convert the entire tree to a string.
-    bool is_root_valuable = variant.is_scalar_variant() ||
-                            (!variant.is_null_root() &&
-                             variant.get_root_type()->get_primitive_type() != 
INVALID_TYPE &&
+    bool is_root_valuable = variant->is_scalar_variant() ||
+                            (!variant->is_null_root() &&
+                             variant->get_root_type()->get_primitive_type() != 
INVALID_TYPE &&
                              
!is_string_type(data_type_to->get_primitive_type()) &&
                              data_type_to->get_primitive_type() != TYPE_JSONB);
 
     if (is_root_valuable) {
-        ColumnPtr nested = variant.get_root();
-        auto nested_from_type = variant.get_root_type();
+        ColumnPtr nested = variant->get_root();
+        auto nested_from_type = variant->get_root_type();
         // DCHECK(nested_from_type->is_nullable());
         DCHECK(!data_type_to->is_nullable());
         auto new_context = context == nullptr ? nullptr : context->clone();
@@ -84,16 +106,21 @@ inline Status cast_from_variant_impl(FunctionContext* 
context, Block& block,
                                       {0, 1}, input_rows_count);
         }
     } else {
-        if (variant.only_have_default_values()) {
+        if (variant->only_have_default_values()) {
             col_to->assume_mutable()->insert_many_defaults(input_rows_count);
             col_to = make_nullable(col_to, true);
         } else if (is_string_type(data_type_to->get_primitive_type())) {
             // serialize to string
-            return CastToStringFunction::execute_impl(context, block, 
arguments, result,
-                                                      input_rows_count);
+            return execute_on_finalized_input([&](Block& finalized_block) {
+                return CastToStringFunction::execute_impl(context, 
finalized_block, arguments,
+                                                          result, 
input_rows_count);
+            });
         } else if (data_type_to->get_primitive_type() == TYPE_JSONB) {
             // serialize to json by parsing
-            return cast_from_generic_to_jsonb(context, block, arguments, 
result, input_rows_count);
+            return execute_on_finalized_input([&](Block& finalized_block) {
+                return cast_from_generic_to_jsonb(context, finalized_block, 
arguments, result,
+                                                  input_rows_count);
+            });
         } else if (!data_type_to->is_nullable() &&
                    !is_string_type(data_type_to->get_primitive_type())) {
             // other types
diff --git a/be/test/core/column/column_variant_test.cpp 
b/be/test/core/column/column_variant_test.cpp
index c414579f22b..4f7f5aa2bdb 100644
--- a/be/test/core/column/column_variant_test.cpp
+++ b/be/test/core/column/column_variant_test.cpp
@@ -27,8 +27,11 @@
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <memory>
 
+#include "agent/be_exec_version_manager.h"
 #include "common/cast_set.h"
+#include "core/block/block.h"
 #include "core/column/column_variant.cpp"
 #include "core/column/common_column_test.h"
 #include "core/column/subcolumn_tree.h"
@@ -40,9 +43,11 @@
 #include "core/types.h"
 #include "core/value/jsonb_value.h"
 #include "exec/common/variant_util.h"
+#include "gen_cpp/data.pb.h"
 #include "storage/olap_common.h"
 #include "testutil/test_util.h"
 #include "testutil/variant_util.h"
+#include "util/block_compression.h"
 
 using namespace doris;
 namespace doris {
@@ -2039,6 +2044,58 @@ TEST_F(ColumnVariantTest, clone_finalized) {
     test_func(std::move(cloned_object));
 }
 
+TEST_F(ColumnVariantTest, clone_finalized_deep_copies_columns) {
+    auto source_column = VariantUtil::construct_advanced_varint_column();
+    source_column->finalize(ColumnVariant::FinalizeMode::READ_MODE);
+
+    auto cloned = source_column->clone_finalized();
+    auto* cloned_variant = assert_cast<ColumnVariant*>(cloned.get());
+    EXPECT_TRUE(cloned_variant->is_finalized());
+
+    for (const auto& source_subcolumn : source_column->get_subcolumns()) {
+        const auto* cloned_subcolumn =
+                
cloned_variant->get_subcolumns().find_exact(source_subcolumn->path);
+        ASSERT_NE(cloned_subcolumn, nullptr);
+        EXPECT_NE(source_subcolumn->data.get_finalized_column_ptr().get(),
+                  cloned_subcolumn->data.get_finalized_column_ptr().get())
+                << source_subcolumn->path.get_path();
+    }
+    EXPECT_NE(source_column->get_sparse_column().get(), 
cloned_variant->get_sparse_column().get());
+    EXPECT_NE(source_column->get_doc_value_column().get(),
+              cloned_variant->get_doc_value_column().get());
+}
+
+TEST_F(ColumnVariantTest, serialize_does_not_finalize_source_column) {
+    auto source_column = VariantUtil::construct_advanced_varint_column();
+    ASSERT_FALSE(source_column->is_finalized());
+
+    const int be_exec_version = BeExecVersionManager::get_newest_version();
+    const auto size =
+            dt_variant->get_uncompressed_serialized_bytes(*source_column, 
be_exec_version);
+    EXPECT_FALSE(source_column->is_finalized());
+
+    auto buffer = std::make_unique<char[]>(size);
+    dt_variant->serialize(*source_column, buffer.get(), be_exec_version);
+    EXPECT_FALSE(source_column->is_finalized());
+}
+
+TEST_F(ColumnVariantTest, block_serialize_does_not_finalize_source_column) {
+    auto source_column = VariantUtil::construct_advanced_varint_column();
+    ASSERT_FALSE(source_column->is_finalized());
+
+    Block block({{source_column->get_ptr(), dt_variant, "variant_col"}});
+    PBlock pblock;
+    size_t uncompressed_bytes = 0;
+    size_t compressed_bytes = 0;
+    int64_t compress_time = 0;
+    auto status = block.serialize(BeExecVersionManager::get_newest_version(), 
&pblock,
+                                  &uncompressed_bytes, &compressed_bytes, 
&compress_time,
+                                  segment_v2::NO_COMPRESSION);
+    ASSERT_TRUE(status.ok()) << status;
+    EXPECT_FALSE(source_column->is_finalized());
+    EXPECT_GT(pblock.column_values().size(), 0);
+}
+
 TEST_F(ColumnVariantTest, sanitize) {
     auto test_func = [](const auto& source_column) {
         auto src_size = source_column->size();
diff --git a/be/test/exprs/function/cast/function_variant_cast_test.cpp 
b/be/test/exprs/function/cast/function_variant_cast_test.cpp
index 2ee76058bc6..9fd12fc2576 100644
--- a/be/test/exprs/function/cast/function_variant_cast_test.cpp
+++ b/be/test/exprs/function/cast/function_variant_cast_test.cpp
@@ -288,6 +288,63 @@ TEST(FunctionVariantCast, CastFromVariant) {
     }
 }
 
+TEST(FunctionVariantCast, CastFromVariantDoesNotFinalizeSourceColumn) {
+    auto variant_type = std::make_shared<DataTypeVariant>();
+    auto int32_type = std::make_shared<DataTypeInt32>();
+    auto string_type = std::make_shared<DataTypeString>();
+    auto variant_col = construct_basic_varint_column();
+
+    ASSERT_FALSE(variant_col->is_finalized());
+
+    {
+        ColumnsWithTypeAndName arguments {{variant_col->get_ptr(), 
variant_type, "variant_col"},
+                                          {nullptr, int32_type, "int32_type"}};
+
+        auto function =
+                SimpleFunctionFactory::instance().get_function("CAST", 
arguments, int32_type);
+        ASSERT_NE(function, nullptr);
+
+        Block block {arguments};
+        size_t result_column = block.columns();
+        block.insert({nullptr, int32_type, "result"});
+
+        RuntimeState state;
+        auto ctx = FunctionContext::create_context(&state, {}, {});
+        ASSERT_TRUE(
+                function->execute(ctx.get(), block, {0}, result_column, 
variant_col->size()).ok());
+
+        EXPECT_FALSE(variant_col->is_finalized());
+
+        auto result_col = block.get_by_position(result_column).column;
+        ASSERT_NE(result_col.get(), nullptr);
+        ASSERT_EQ(result_col->size(), variant_col->size());
+    }
+
+    {
+        ColumnsWithTypeAndName arguments {{variant_col->get_ptr(), 
variant_type, "variant_col"},
+                                          {nullptr, string_type, 
"string_type"}};
+
+        auto function =
+                SimpleFunctionFactory::instance().get_function("CAST", 
arguments, string_type);
+        ASSERT_NE(function, nullptr);
+
+        Block block {arguments};
+        size_t result_column = block.columns();
+        block.insert({nullptr, string_type, "result"});
+
+        RuntimeState state;
+        auto ctx = FunctionContext::create_context(&state, {}, {});
+        ASSERT_TRUE(
+                function->execute(ctx.get(), block, {0}, result_column, 
variant_col->size()).ok());
+
+        EXPECT_FALSE(variant_col->is_finalized());
+
+        auto result_col = block.get_by_position(result_column).column;
+        ASSERT_NE(result_col.get(), nullptr);
+        ASSERT_EQ(result_col->size(), variant_col->size());
+    }
+}
+
 TEST(FunctionVariantCast, CastVariantWithNull) {
     auto variant_type = std::make_shared<DataTypeVariant>();
     auto int32_type = std::make_shared<DataTypeInt32>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to