This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 646735ef5f6 branch-4.1: [fix](variant) Avoid mutating shared variant
columns (#64198)
646735ef5f6 is described below
commit 646735ef5f671ce30d3d71b34552d7ac74735449
Author: lihangyu <[email protected]>
AuthorDate: Mon Jun 8 16:16:32 2026 +0800
branch-4.1: [fix](variant) Avoid mutating shared variant columns (#64198)
cherry-pick #64092
---
be/src/core/data_type/data_type_variant.cpp | 38 +++++++++------
be/src/exprs/function/cast/cast_to_variant.h | 55 +++++++++++++++------
be/test/core/column/column_variant_test.cpp | 57 ++++++++++++++++++++++
.../function/cast/function_variant_cast_test.cpp | 57 ++++++++++++++++++++++
4 files changed, 178 insertions(+), 29 deletions(-)
diff --git a/be/src/core/data_type/data_type_variant.cpp
b/be/src/core/data_type/data_type_variant.cpp
index 3a8ff4ce716..c83efe4cc45 100644
--- a/be/src/core/data_type/data_type_variant.cpp
+++ b/be/src/core/data_type/data_type_variant.cpp
@@ -62,12 +62,16 @@ bool DataTypeVariant::equals(const IDataType& rhs) const {
int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn&
column,
int
be_exec_version) const {
- const auto& column_variant = assert_cast<const ColumnVariant&>(column);
- if (!column_variant.is_finalized()) {
- const_cast<ColumnVariant&>(column_variant).finalize();
+ const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
+ MutableColumnPtr finalized_column;
+ if (!column_variant->is_finalized()) {
+ // Local exchange can share the same block across downstream tasks.
Serialize a private
+ // finalized copy so serialization never mutates shared variant
columns.
+ finalized_column = column_variant->clone_finalized();
+ column_variant = assert_cast<const
ColumnVariant*>(finalized_column.get());
}
- const auto& subcolumns = column_variant.get_subcolumns();
+ const auto& subcolumns = column_variant->get_subcolumns();
size_t size = 0;
size += sizeof(uint32_t);
@@ -94,24 +98,28 @@ int64_t
DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column
// sparse column
// TODO make compability with sparse column
size +=
ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
- *column_variant.get_sparse_column(), be_exec_version);
+ *column_variant->get_sparse_column(), be_exec_version);
size +=
ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
- *column_variant.get_doc_value_column(), be_exec_version);
+ *column_variant->get_doc_value_column(), be_exec_version);
return size;
}
char* DataTypeVariant::serialize(const IColumn& column, char* buf, int
be_exec_version) const {
- const auto& column_variant = assert_cast<const ColumnVariant&>(column);
- if (!column_variant.is_finalized()) {
- const_cast<ColumnVariant&>(column_variant).finalize();
+ const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
+ MutableColumnPtr finalized_column;
+ if (!column_variant->is_finalized()) {
+ // Local exchange can share the same block across downstream tasks.
Serialize a private
+ // finalized copy so serialization never mutates shared variant
columns.
+ finalized_column = column_variant->clone_finalized();
+ column_variant = assert_cast<const
ColumnVariant*>(finalized_column.get());
}
#ifndef NDEBUG
// DCHECK size
- column_variant.check_consistency();
+ column_variant->check_consistency();
#endif
- const auto& subcolumns = column_variant.get_subcolumns();
+ const auto& subcolumns = column_variant->get_subcolumns();
char* size_pos = buf;
buf += sizeof(uint32_t);
@@ -144,15 +152,15 @@ char* DataTypeVariant::serialize(const IColumn& column,
char* buf, int be_exec_v
// Safe case
unaligned_store<uint32_t>(size_pos, static_cast<UInt32>(num_of_columns));
// serialize num of rows, only take effect when subcolumns empty
- unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant.rows()));
+ unaligned_store<uint32_t>(buf,
static_cast<UInt32>(column_variant->rows()));
buf += sizeof(uint32_t);
// serialize sparse column
// TODO make compability with sparse column
- buf =
ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_sparse_column(),
- buf,
be_exec_version);
- buf =
ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_doc_value_column(),
+ buf =
ColumnVariant::get_binary_column_type()->serialize(*column_variant->get_sparse_column(),
buf,
be_exec_version);
+ buf = ColumnVariant::get_binary_column_type()->serialize(
+ *column_variant->get_doc_value_column(), buf, be_exec_version);
return buf;
}
diff --git a/be/src/exprs/function/cast/cast_to_variant.h
b/be/src/exprs/function/cast/cast_to_variant.h
index 3c0e2c1ee92..bbb0462bb7f 100644
--- a/be/src/exprs/function/cast/cast_to_variant.h
+++ b/be/src/exprs/function/cast/cast_to_variant.h
@@ -32,30 +32,52 @@ inline Status cast_from_variant_impl(FunctionContext*
context, Block& block,
const auto& col_with_type_and_name = block.get_by_position(arguments[0]);
const auto& col_from = col_with_type_and_name.column;
const IColumn* variant_column = col_from.get();
- if (const auto* nullable =
check_and_get_column<ColumnNullable>(*variant_column)) {
+ const auto* nullable =
check_and_get_column<ColumnNullable>(*variant_column);
+ if (nullable != nullptr) {
variant_column = &nullable->get_nested_column();
}
- const auto& variant = assert_cast<const ColumnVariant&>(*variant_column);
+ const auto* variant = assert_cast<const ColumnVariant*>(variant_column);
ColumnPtr col_to = data_type_to->create_column();
- if (!variant.is_finalized()) {
- // ColumnVariant should be finalized before parsing, finalize maybe
modify original column structure
- variant.assume_mutable()->finalize();
+ ColumnPtr finalized_input_column;
+ if (!variant->is_finalized()) {
+ // Local exchange can share the same input block across multiple
downstream tasks.
+ // Finalize a private copy so variant casts never mutate shared input
columns.
+ auto finalized_variant = variant->clone_finalized();
+ variant = assert_cast<const ColumnVariant*>(finalized_variant.get());
+ if (nullable != nullptr) {
+ auto cloned_null_map =
+
nullable->get_null_map_column_ptr()->clone_resized(input_rows_count);
+ finalized_input_column =
ColumnNullable::create(std::move(finalized_variant),
+
std::move(cloned_null_map));
+ } else {
+ finalized_input_column = std::move(finalized_variant);
+ }
}
+ auto execute_on_finalized_input = [&](auto&& executor) -> Status {
+ if (!finalized_input_column) {
+ return executor(block);
+ }
+ Block finalized_block = block;
+ finalized_block.replace_by_position(arguments[0],
finalized_input_column);
+ RETURN_IF_ERROR(executor(finalized_block));
+ block.replace_by_position(result,
finalized_block.get_by_position(result).column);
+ return Status::OK();
+ };
// It's important to convert as many elements as possible in this context.
For instance,
// if the root of this variant column is a number column, converting it to
a number column
// is acceptable. However, if the destination type is a string and root is
none scalar root, then
// we should convert the entire tree to a string.
- bool is_root_valuable = variant.is_scalar_variant() ||
- (!variant.is_null_root() &&
- variant.get_root_type()->get_primitive_type() !=
INVALID_TYPE &&
+ bool is_root_valuable = variant->is_scalar_variant() ||
+ (!variant->is_null_root() &&
+ variant->get_root_type()->get_primitive_type() !=
INVALID_TYPE &&
!is_string_type(data_type_to->get_primitive_type()) &&
data_type_to->get_primitive_type() != TYPE_JSONB);
if (is_root_valuable) {
- ColumnPtr nested = variant.get_root();
- auto nested_from_type = variant.get_root_type();
+ ColumnPtr nested = variant->get_root();
+ auto nested_from_type = variant->get_root_type();
// DCHECK(nested_from_type->is_nullable());
DCHECK(!data_type_to->is_nullable());
auto new_context = context == nullptr ? nullptr : context->clone();
@@ -84,16 +106,21 @@ inline Status cast_from_variant_impl(FunctionContext*
context, Block& block,
{0, 1}, input_rows_count);
}
} else {
- if (variant.only_have_default_values()) {
+ if (variant->only_have_default_values()) {
col_to->assume_mutable()->insert_many_defaults(input_rows_count);
col_to = make_nullable(col_to, true);
} else if (is_string_type(data_type_to->get_primitive_type())) {
// serialize to string
- return CastToStringFunction::execute_impl(context, block,
arguments, result,
- input_rows_count);
+ return execute_on_finalized_input([&](Block& finalized_block) {
+ return CastToStringFunction::execute_impl(context,
finalized_block, arguments,
+ result,
input_rows_count);
+ });
} else if (data_type_to->get_primitive_type() == TYPE_JSONB) {
// serialize to json by parsing
- return cast_from_generic_to_jsonb(context, block, arguments,
result, input_rows_count);
+ return execute_on_finalized_input([&](Block& finalized_block) {
+ return cast_from_generic_to_jsonb(context, finalized_block,
arguments, result,
+ input_rows_count);
+ });
} else if (!data_type_to->is_nullable() &&
!is_string_type(data_type_to->get_primitive_type())) {
// other types
diff --git a/be/test/core/column/column_variant_test.cpp
b/be/test/core/column/column_variant_test.cpp
index c414579f22b..4f7f5aa2bdb 100644
--- a/be/test/core/column/column_variant_test.cpp
+++ b/be/test/core/column/column_variant_test.cpp
@@ -27,8 +27,11 @@
#include <algorithm>
#include <cstddef>
#include <cstdint>
+#include <memory>
+#include "agent/be_exec_version_manager.h"
#include "common/cast_set.h"
+#include "core/block/block.h"
#include "core/column/column_variant.cpp"
#include "core/column/common_column_test.h"
#include "core/column/subcolumn_tree.h"
@@ -40,9 +43,11 @@
#include "core/types.h"
#include "core/value/jsonb_value.h"
#include "exec/common/variant_util.h"
+#include "gen_cpp/data.pb.h"
#include "storage/olap_common.h"
#include "testutil/test_util.h"
#include "testutil/variant_util.h"
+#include "util/block_compression.h"
using namespace doris;
namespace doris {
@@ -2039,6 +2044,58 @@ TEST_F(ColumnVariantTest, clone_finalized) {
test_func(std::move(cloned_object));
}
+TEST_F(ColumnVariantTest, clone_finalized_deep_copies_columns) {
+ auto source_column = VariantUtil::construct_advanced_varint_column();
+ source_column->finalize(ColumnVariant::FinalizeMode::READ_MODE);
+
+ auto cloned = source_column->clone_finalized();
+ auto* cloned_variant = assert_cast<ColumnVariant*>(cloned.get());
+ EXPECT_TRUE(cloned_variant->is_finalized());
+
+ for (const auto& source_subcolumn : source_column->get_subcolumns()) {
+ const auto* cloned_subcolumn =
+
cloned_variant->get_subcolumns().find_exact(source_subcolumn->path);
+ ASSERT_NE(cloned_subcolumn, nullptr);
+ EXPECT_NE(source_subcolumn->data.get_finalized_column_ptr().get(),
+ cloned_subcolumn->data.get_finalized_column_ptr().get())
+ << source_subcolumn->path.get_path();
+ }
+ EXPECT_NE(source_column->get_sparse_column().get(),
cloned_variant->get_sparse_column().get());
+ EXPECT_NE(source_column->get_doc_value_column().get(),
+ cloned_variant->get_doc_value_column().get());
+}
+
+TEST_F(ColumnVariantTest, serialize_does_not_finalize_source_column) {
+ auto source_column = VariantUtil::construct_advanced_varint_column();
+ ASSERT_FALSE(source_column->is_finalized());
+
+ const int be_exec_version = BeExecVersionManager::get_newest_version();
+ const auto size =
+ dt_variant->get_uncompressed_serialized_bytes(*source_column,
be_exec_version);
+ EXPECT_FALSE(source_column->is_finalized());
+
+ auto buffer = std::make_unique<char[]>(size);
+ dt_variant->serialize(*source_column, buffer.get(), be_exec_version);
+ EXPECT_FALSE(source_column->is_finalized());
+}
+
+TEST_F(ColumnVariantTest, block_serialize_does_not_finalize_source_column) {
+ auto source_column = VariantUtil::construct_advanced_varint_column();
+ ASSERT_FALSE(source_column->is_finalized());
+
+ Block block({{source_column->get_ptr(), dt_variant, "variant_col"}});
+ PBlock pblock;
+ size_t uncompressed_bytes = 0;
+ size_t compressed_bytes = 0;
+ int64_t compress_time = 0;
+ auto status = block.serialize(BeExecVersionManager::get_newest_version(),
&pblock,
+ &uncompressed_bytes, &compressed_bytes,
&compress_time,
+ segment_v2::NO_COMPRESSION);
+ ASSERT_TRUE(status.ok()) << status;
+ EXPECT_FALSE(source_column->is_finalized());
+ EXPECT_GT(pblock.column_values().size(), 0);
+}
+
TEST_F(ColumnVariantTest, sanitize) {
auto test_func = [](const auto& source_column) {
auto src_size = source_column->size();
diff --git a/be/test/exprs/function/cast/function_variant_cast_test.cpp
b/be/test/exprs/function/cast/function_variant_cast_test.cpp
index 2ee76058bc6..9fd12fc2576 100644
--- a/be/test/exprs/function/cast/function_variant_cast_test.cpp
+++ b/be/test/exprs/function/cast/function_variant_cast_test.cpp
@@ -288,6 +288,63 @@ TEST(FunctionVariantCast, CastFromVariant) {
}
}
+TEST(FunctionVariantCast, CastFromVariantDoesNotFinalizeSourceColumn) {
+ auto variant_type = std::make_shared<DataTypeVariant>();
+ auto int32_type = std::make_shared<DataTypeInt32>();
+ auto string_type = std::make_shared<DataTypeString>();
+ auto variant_col = construct_basic_varint_column();
+
+ ASSERT_FALSE(variant_col->is_finalized());
+
+ {
+ ColumnsWithTypeAndName arguments {{variant_col->get_ptr(),
variant_type, "variant_col"},
+ {nullptr, int32_type, "int32_type"}};
+
+ auto function =
+ SimpleFunctionFactory::instance().get_function("CAST",
arguments, int32_type);
+ ASSERT_NE(function, nullptr);
+
+ Block block {arguments};
+ size_t result_column = block.columns();
+ block.insert({nullptr, int32_type, "result"});
+
+ RuntimeState state;
+ auto ctx = FunctionContext::create_context(&state, {}, {});
+ ASSERT_TRUE(
+ function->execute(ctx.get(), block, {0}, result_column,
variant_col->size()).ok());
+
+ EXPECT_FALSE(variant_col->is_finalized());
+
+ auto result_col = block.get_by_position(result_column).column;
+ ASSERT_NE(result_col.get(), nullptr);
+ ASSERT_EQ(result_col->size(), variant_col->size());
+ }
+
+ {
+ ColumnsWithTypeAndName arguments {{variant_col->get_ptr(),
variant_type, "variant_col"},
+ {nullptr, string_type,
"string_type"}};
+
+ auto function =
+ SimpleFunctionFactory::instance().get_function("CAST",
arguments, string_type);
+ ASSERT_NE(function, nullptr);
+
+ Block block {arguments};
+ size_t result_column = block.columns();
+ block.insert({nullptr, string_type, "result"});
+
+ RuntimeState state;
+ auto ctx = FunctionContext::create_context(&state, {}, {});
+ ASSERT_TRUE(
+ function->execute(ctx.get(), block, {0}, result_column,
variant_col->size()).ok());
+
+ EXPECT_FALSE(variant_col->is_finalized());
+
+ auto result_col = block.get_by_position(result_column).column;
+ ASSERT_NE(result_col.get(), nullptr);
+ ASSERT_EQ(result_col->size(), variant_col->size());
+ }
+}
+
TEST(FunctionVariantCast, CastVariantWithNull) {
auto variant_type = std::make_shared<DataTypeVariant>();
auto int32_type = std::make_shared<DataTypeInt32>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]