This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new e9c8dd4aa81 [refine](column) backport strong typed column helpers to 
4.1 (#64960)
e9c8dd4aa81 is described below

commit e9c8dd4aa814b08e02057fa4d155f583e6b0be95
Author: Mryange <[email protected]>
AuthorDate: Tue Jun 30 11:59:58 2026 +0800

    [refine](column) backport strong typed column helpers to 4.1 (#64960)
    
    ### What problem does this PR solve?
    
    
    Related PR: #63491, #63678, #63946, #58092
    
    Problem Summary:
    
    Backport the column strong-typing refinements for branch-4.1: use strong
    typed nullable null maps, array offsets, and map offsets, and apply the
    column check helper for type casts. Branch-4.1 was missing the
    COW/Column cast helper introduced by #58092, so this also ports the
    minimal helper API needed by the picked changes and adjusts an old array
    distance code path to use the already strong typed offsets pointer.
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/core/column/column.h                        |  14 ++-
 be/src/core/column/column_array.cpp                | 102 ++++++++++++---------
 be/src/core/column/column_array.h                  |  30 +++---
 be/src/core/column/column_const.cpp                |   2 +-
 be/src/core/column/column_const.h                  |   4 +-
 be/src/core/column/column_decimal.h                |   2 +-
 be/src/core/column/column_map.cpp                  |  44 +++++----
 be/src/core/column/column_map.h                    |  47 ++++++----
 be/src/core/column/column_nullable.cpp             |  64 ++++++++-----
 be/src/core/column/column_nullable.h               |  35 ++++---
 be/src/core/column/column_struct.cpp               |   2 +-
 be/src/core/column/column_struct.h                 |   2 +-
 be/src/core/column/column_variant.cpp              |   2 +-
 be/src/core/column/column_variant.h                |   6 +-
 be/src/core/cow.h                                  |  11 ++-
 be/src/core/data_type/data_type_array.cpp          |  12 ++-
 be/src/core/data_type/data_type_map.cpp            |  12 ++-
 be/src/exec/operator/table_function_operator.cpp   |   9 +-
 .../exprs/function/array/function_array_distance.h |   4 +-
 .../exprs/function/array/function_array_exists.cpp |   3 +-
 .../exprs/function/array/function_array_filter.cpp |   8 +-
 .../function/array/function_array_flatten.cpp      |   4 +-
 .../exprs/function/function_always_not_nullable.h  |   3 +-
 be/src/exprs/function/function_hll.cpp             |   3 +-
 be/src/exprs/function/function_quantile_state.cpp  |   3 +-
 be/src/exprs/function/if.cpp                       |  26 +++---
 .../lambda_function/varray_filter_function.cpp     |   8 +-
 .../exprs/lambda_function/varray_map_function.cpp  |   8 +-
 .../exprs/lambda_function/varray_sort_function.cpp |   4 +-
 .../exprs/table_function/python_udtf_function.cpp  |   7 +-
 be/src/exprs/table_function/udf_table_function.cpp |   7 +-
 be/src/exprs/table_function/vexplode.cpp           |   7 +-
 be/src/exprs/table_function/vexplode_bitmap.cpp    |   8 +-
 .../exprs/table_function/vexplode_json_object.cpp  |   7 +-
 be/src/exprs/table_function/vexplode_map.cpp       |   7 +-
 be/src/exprs/table_function/vexplode_numbers.cpp   |   4 +-
 be/src/exprs/table_function/vexplode_numbers.h     |  13 ++-
 be/src/exprs/table_function/vexplode_v2.cpp        |  12 +--
 be/src/exprs/table_function/vjson_each.cpp         |   6 +-
 be/src/exprs/vcase_expr.h                          |   5 +-
 be/src/exprs/vcompound_pred.h                      |   5 +-
 be/src/exprs/vcondition_expr.cpp                   |  26 ++++--
 be/src/storage/segment/column_reader.cpp           |  53 +++++++----
 be/test/core/column/column_array_test.cpp          |   8 +-
 be/test/core/column/column_map_test.cpp            |   8 +-
 .../exec/operator/table_function_operator_test.cpp |   3 +-
 be/test/exprs/function/function_test_util.cpp      |   6 +-
 be/test/format/table/hive/hive_reader_test.cpp     |   8 +-
 .../format/table/iceberg/iceberg_reader_test.cpp   |   8 +-
 49 files changed, 391 insertions(+), 291 deletions(-)

diff --git a/be/src/core/column/column.h b/be/src/core/column/column.h
index 2d4a5eb1d16..033960ab5e0 100644
--- a/be/src/core/column/column.h
+++ b/be/src/core/column/column.h
@@ -803,7 +803,19 @@ ColumnType::Ptr check_and_get_column_ptr(const ColumnPtr& 
column) {
     if (raw_type_ptr == nullptr) {
         return nullptr;
     }
-    return typename ColumnType::Ptr(raw_type_ptr);
+    return ColumnType::cast_to_column_ptr(raw_type_ptr);
+}
+
+template <typename ColumnType>
+ColumnType::Ptr cast_to_column(const ColumnPtr& column) {
+    const ColumnType* raw_type_ptr = assert_cast<const 
ColumnType*>(column.get());
+    return ColumnType::cast_to_column_ptr(raw_type_ptr);
+}
+
+template <typename ColumnType>
+ColumnType::MutablePtr cast_to_column(MutableColumnPtr column) {
+    ColumnType* raw_type_ptr = assert_cast<ColumnType*>(column.get());
+    return ColumnType::cast_to_column_mutptr(raw_type_ptr);
 }
 
 /// True if column's an ColumnConst instance. It's just a syntax sugar for 
type check.
diff --git a/be/src/core/column/column_array.cpp 
b/be/src/core/column/column_array.cpp
index 951f637b80c..c018313d749 100644
--- a/be/src/core/column/column_array.cpp
+++ b/be/src/core/column/column_array.cpp
@@ -50,7 +50,8 @@ namespace doris {
 namespace {
 
 const ColumnArray::ColumnOffsets& check_array_offsets_column(const IColumn& 
offsets_column) {
-    const auto* offsets_concrete = typeid_cast<const 
ColumnArray::ColumnOffsets*>(&offsets_column);
+    const auto* offsets_concrete =
+            check_and_get_column<ColumnArray::ColumnOffsets>(&offsets_column);
     if (!offsets_concrete) {
         throw doris::Exception(ErrorCode::INTERNAL_ERROR, "offsets_column must 
be a ColumnUInt64");
         __builtin_unreachable();
@@ -58,6 +59,21 @@ const ColumnArray::ColumnOffsets& 
check_array_offsets_column(const IColumn& offs
     return *offsets_concrete;
 }
 
+ColumnArray::ColumnOffsets::Ptr check_array_offsets_column_ptr(const 
ColumnPtr& offsets_column) {
+    return ColumnArray::ColumnOffsets::cast_to_column_ptr(
+            &check_array_offsets_column(*offsets_column));
+}
+
+ColumnArray::ColumnOffsets::MutablePtr assert_mutable_array_offsets(
+        MutableColumnPtr&& offsets_column) {
+    check_array_offsets_column(*offsets_column);
+    auto mutable_offsets = ColumnArray::ColumnOffsets::cast_to_column_mutptr(
+            assert_cast<ColumnArray::ColumnOffsets*, 
TypeCheckOnRelease::DISABLE>(
+                    offsets_column.get()));
+    offsets_column = nullptr;
+    return mutable_offsets;
+}
+
 void validate_array_offsets(const IColumn& nested_column, const IColumn& 
offsets_column) {
     const auto& offsets_concrete = check_array_offsets_column(offsets_column);
     if (!offsets_concrete.empty()) {
@@ -84,7 +100,9 @@ void check_empty_array_data_without_offsets(const IColumn& 
nested_column) {
 } // namespace
 
 ColumnArray::ColumnArray(MutableColumnPtr&& nested_column, MutableColumnPtr&& 
offsets_column)
-        : data(std::move(nested_column)), offsets(std::move(offsets_column)) {
+        : data(std::move(nested_column)) {
+    static_cast<ColumnOffsets::Ptr&>(offsets) =
+            assert_mutable_array_offsets(std::move(offsets_column));
     // TODO(lihangyu) : we need to check the nullable attribute of array's 
data column.
     // but currently ColumnMap<ColumnString, ColumnString> is used to store 
sparse data of variant type,
     // so I temporarily disable this check.
@@ -99,7 +117,7 @@ ColumnArray::ColumnArray(MutableColumnPtr&& nested_column, 
MutableColumnPtr&& of
     // #endif
     check_const_only_in_top_level();
     validate_array_offsets(*static_cast<const IColumn::Ptr&>(data),
-                           *static_cast<const IColumn::Ptr&>(offsets));
+                           *static_cast<const ColumnOffsets::Ptr&>(offsets));
 
     /** NOTE
       * Arrays with constant value are possible and used in implementation of 
higher order functions (see FunctionReplicate).
@@ -110,15 +128,15 @@ ColumnArray::ColumnArray(MutableColumnPtr&& 
nested_column, MutableColumnPtr&& of
 ColumnArray::ColumnArray(MutableColumnPtr&& nested_column) : 
data(std::move(nested_column)) {
     data = data->convert_to_full_column_if_const();
     check_empty_array_data_without_offsets(*data);
-    offsets = ColumnOffsets::create();
+    static_cast<ColumnOffsets::Ptr&>(offsets) = ColumnOffsets::create();
 }
 
 ColumnArray::ColumnArray(SharedTag, ColumnPtr nested_column, ColumnPtr 
offsets_column) {
     static_cast<IColumn::Ptr&>(data) = std::move(nested_column);
-    static_cast<IColumn::Ptr&>(offsets) = std::move(offsets_column);
+    static_cast<ColumnOffsets::Ptr&>(offsets) = 
check_array_offsets_column_ptr(offsets_column);
     check_const_only_in_top_level();
     validate_array_offsets(*static_cast<const IColumn::Ptr&>(data),
-                           *static_cast<const IColumn::Ptr&>(offsets));
+                           *static_cast<const ColumnOffsets::Ptr&>(offsets));
 }
 
 std::string ColumnArray::get_name() const {
@@ -834,37 +852,37 @@ size_t filter_generic_inplace(const Filter& filter, 
IColumn& src_data, ColumnOff
 ColumnArrayDataOffsets filter_return_new_dispatch(const Filter& filt, ssize_t 
result_size_hint,
                                                   const ColumnPtr& data,
                                                   const ColumnOffsets* 
offsets) {
-    if (typeid_cast<const ColumnUInt8*>(data.get()))
+    if (check_and_get_column<ColumnUInt8>(data.get()))
         return filter_number_return_new<TYPE_BOOLEAN>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnInt8*>(data.get()))
+    if (check_and_get_column<ColumnInt8>(data.get()))
         return filter_number_return_new<TYPE_TINYINT>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnInt16*>(data.get()))
+    if (check_and_get_column<ColumnInt16>(data.get()))
         return filter_number_return_new<TYPE_SMALLINT>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnInt32*>(data.get()))
+    if (check_and_get_column<ColumnInt32>(data.get()))
         return filter_number_return_new<TYPE_INT>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnInt64*>(data.get()))
+    if (check_and_get_column<ColumnInt64>(data.get()))
         return filter_number_return_new<TYPE_BIGINT>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnFloat32*>(data.get()))
+    if (check_and_get_column<ColumnFloat32>(data.get()))
         return filter_number_return_new<TYPE_FLOAT>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnFloat64*>(data.get()))
+    if (check_and_get_column<ColumnFloat64>(data.get()))
         return filter_number_return_new<TYPE_DOUBLE>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnDate*>(data.get()))
+    if (check_and_get_column<ColumnDate>(data.get()))
         return filter_number_return_new<TYPE_DATE>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnDateV2*>(data.get()))
+    if (check_and_get_column<ColumnDateV2>(data.get()))
         return filter_number_return_new<TYPE_DATEV2>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnDateTime*>(data.get()))
+    if (check_and_get_column<ColumnDateTime>(data.get()))
         return filter_number_return_new<TYPE_DATETIME>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnDateTimeV2*>(data.get()))
+    if (check_and_get_column<ColumnDateTimeV2>(data.get()))
         return filter_number_return_new<TYPE_DATETIMEV2>(filt, 
result_size_hint, data, offsets);
-    if (typeid_cast<const ColumnTimeStampTz*>(data.get()))
+    if (check_and_get_column<ColumnTimeStampTz>(data.get()))
         return filter_number_return_new<TYPE_TIMESTAMPTZ>(filt, 
result_size_hint, data, offsets);
-    if (typeid_cast<const ColumnTimeV2*>(data.get()))
+    if (check_and_get_column<ColumnTimeV2>(data.get()))
         return filter_number_return_new<TYPE_TIMEV2>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnTime*>(data.get()))
+    if (check_and_get_column<ColumnTime>(data.get()))
         return filter_number_return_new<TYPE_TIME>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnIPv4*>(data.get()))
+    if (check_and_get_column<ColumnIPv4>(data.get()))
         return filter_number_return_new<TYPE_IPV4>(filt, result_size_hint, 
data, offsets);
-    if (typeid_cast<const ColumnString*>(data.get()))
+    if (check_and_get_column<ColumnString>(data.get()))
         return filter_string_return_new(filt, result_size_hint, data, offsets);
     return filter_generic_return_new(filt, result_size_hint, data, offsets);
 }
@@ -882,7 +900,7 @@ ColumnPtr ColumnArray::filter(const Filter& filt, ssize_t 
result_size_hint) cons
                                      res_null_map->get_data(), filt, 
result_size_hint);
 
         auto src_data = nullable_data_column->get_nested_column_ptr();
-        const auto* src_offsets = assert_cast<const 
ColumnOffsets*>(offsets.get());
+        const auto* src_offsets = offsets.get();
         auto array_of_nested =
                 filter_return_new_dispatch(filt, result_size_hint, src_data, 
src_offsets);
 
@@ -891,7 +909,7 @@ ColumnPtr ColumnArray::filter(const Filter& filt, ssize_t 
result_size_hint) cons
                 array_of_nested.offsets);
     } else {
         // filter offsets
-        const auto* src_offsets = assert_cast<const 
ColumnOffsets*>(offsets.get());
+        const auto* src_offsets = offsets.get();
         auto array_of_nested =
                 filter_return_new_dispatch(filt, result_size_hint, data, 
src_offsets);
         return ColumnArray::create(array_of_nested.data, 
std::move(array_of_nested.offsets));
@@ -902,35 +920,35 @@ ColumnPtr ColumnArray::filter(const Filter& filt, ssize_t 
result_size_hint) cons
 // In order not to destroy the original logic, the original logic is still 
retained here.
 size_t filter_inplace_dispatch(const Filter& filter, IColumn& src_data,
                                ColumnOffsets& src_offsets) {
-    if (typeid_cast<ColumnUInt8*>(&src_data))
+    if (check_and_get_column<ColumnUInt8>(&src_data))
         return filter_number_inplace<TYPE_BOOLEAN>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnInt8*>(&src_data))
+    if (check_and_get_column<ColumnInt8>(&src_data))
         return filter_number_inplace<TYPE_TINYINT>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnInt16*>(&src_data))
+    if (check_and_get_column<ColumnInt16>(&src_data))
         return filter_number_inplace<TYPE_SMALLINT>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnInt32*>(&src_data))
+    if (check_and_get_column<ColumnInt32>(&src_data))
         return filter_number_inplace<TYPE_INT>(filter, src_data, src_offsets);
-    if (typeid_cast<ColumnInt64*>(&src_data))
+    if (check_and_get_column<ColumnInt64>(&src_data))
         return filter_number_inplace<TYPE_BIGINT>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnFloat32*>(&src_data))
+    if (check_and_get_column<ColumnFloat32>(&src_data))
         return filter_number_inplace<TYPE_FLOAT>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnFloat64*>(&src_data))
+    if (check_and_get_column<ColumnFloat64>(&src_data))
         return filter_number_inplace<TYPE_DOUBLE>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnDate*>(&src_data))
+    if (check_and_get_column<ColumnDate>(&src_data))
         return filter_number_inplace<TYPE_DATE>(filter, src_data, src_offsets);
-    if (typeid_cast<ColumnDateV2*>(&src_data))
+    if (check_and_get_column<ColumnDateV2>(&src_data))
         return filter_number_inplace<TYPE_DATEV2>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnDateTime*>(&src_data))
+    if (check_and_get_column<ColumnDateTime>(&src_data))
         return filter_number_inplace<TYPE_DATETIME>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnDateTimeV2*>(&src_data))
+    if (check_and_get_column<ColumnDateTimeV2>(&src_data))
         return filter_number_inplace<TYPE_DATETIMEV2>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnTimeStampTz*>(&src_data))
+    if (check_and_get_column<ColumnTimeStampTz>(&src_data))
         return filter_number_inplace<TYPE_TIMESTAMPTZ>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnTimeV2*>(&src_data))
+    if (check_and_get_column<ColumnTimeV2>(&src_data))
         return filter_number_inplace<TYPE_TIMEV2>(filter, src_data, 
src_offsets);
-    if (typeid_cast<ColumnTime*>(&src_data))
+    if (check_and_get_column<ColumnTime>(&src_data))
         return filter_number_inplace<TYPE_TIME>(filter, src_data, src_offsets);
-    if (typeid_cast<ColumnIPv4*>(&src_data))
+    if (check_and_get_column<ColumnIPv4>(&src_data))
         return filter_number_inplace<TYPE_IPV4>(filter, src_data, src_offsets);
     return filter_generic_inplace(filter, src_data, src_offsets);
 }
@@ -940,17 +958,17 @@ size_t ColumnArray::filter(const Filter& filter) {
         return 0;
     }
 
-    if (auto* nullable_data_column = typeid_cast<ColumnNullable*>(data.get())) 
{
+    if (auto* nullable_data_column = 
check_and_get_column<ColumnNullable>(data.get())) {
         const auto result_size = filter_arrays_impl_only_data(
                 nullable_data_column->get_null_map_data(), get_offsets(), 
filter);
 
         auto& src_data = nullable_data_column->get_nested_column();
-        auto& src_offsets = assert_cast<ColumnOffsets&>(*offsets);
+        auto& src_offsets = *offsets;
         filter_inplace_dispatch(filter, src_data, src_offsets);
         return result_size;
     } else {
         auto& src_data = get_data();
-        auto& src_offsets = assert_cast<ColumnOffsets&>(*offsets);
+        auto& src_offsets = *offsets;
 
         return filter_inplace_dispatch(filter, src_data, src_offsets);
     }
diff --git a/be/src/core/column/column_array.h 
b/be/src/core/column/column_array.h
index e4357418950..46008ac99c4 100644
--- a/be/src/core/column/column_array.h
+++ b/be/src/core/column/column_array.h
@@ -37,6 +37,7 @@
 #include "core/field.h"
 #include "core/string_ref.h"
 #include "core/types.h"
+#include "util/defer_op.h"
 
 class SipHash;
 
@@ -193,24 +194,22 @@ public:
     IColumn& get_data() { return *data; }
     const IColumn& get_data() const { return *data; }
 
-    IColumn& get_offsets_column() { return *offsets; }
-    const IColumn& get_offsets_column() const { return *offsets; }
+    ColumnOffsets& get_offsets_column() { return *offsets; }
+    const ColumnOffsets& get_offsets_column() const { return *offsets; }
 
-    Offsets64& ALWAYS_INLINE get_offsets() {
-        return assert_cast<ColumnOffsets&, 
TypeCheckOnRelease::DISABLE>(*offsets).get_data();
-    }
+    Offsets64& ALWAYS_INLINE get_offsets() { return offsets->get_data(); }
 
-    const Offsets64& ALWAYS_INLINE get_offsets() const {
-        return assert_cast<const ColumnOffsets&, 
TypeCheckOnRelease::DISABLE>(*offsets).get_data();
-    }
+    const Offsets64& ALWAYS_INLINE get_offsets() const { return 
offsets->get_data(); }
 
     bool has_equal_offsets(const ColumnArray& other) const;
 
     const ColumnPtr& get_data_ptr() const { return data; }
     ColumnPtr& get_data_ptr() { return data; }
 
-    const ColumnPtr& get_offsets_ptr() const { return offsets; }
-    ColumnPtr& get_offsets_ptr() { return offsets; }
+    const ColumnOffsets::Ptr& get_offsets_ptr() const {
+        return static_cast<const ColumnOffsets::Ptr&>(offsets);
+    }
+    ColumnOffsets::Ptr& get_offsets_ptr() { return 
static_cast<ColumnOffsets::Ptr&>(offsets); }
 
     size_t ALWAYS_INLINE offset_at(ssize_t i) const { return get_offsets()[i - 
1]; }
     size_t ALWAYS_INLINE size_at(ssize_t i) const {
@@ -218,7 +217,12 @@ public:
     }
 
     void for_each_subcolumn(ColumnCallback callback) override {
-        callback(offsets);
+        IColumn::WrappedPtr 
offsets_column(std::move(static_cast<ColumnOffsets::Ptr&>(offsets)));
+        Defer defer([&] {
+            static_cast<ColumnOffsets::Ptr&>(offsets) =
+                    cast_to_column<ColumnOffsets>(static_cast<const 
IColumn::Ptr&>(offsets_column));
+        });
+        callback(offsets_column);
         callback(data);
     }
 
@@ -264,8 +268,8 @@ public:
 private:
     // [2,1,5,9,1]\n[1,2,4] --> data column [2,1,5,9,1,1,2,4], offset[-1] = 0, 
offset[0] = 5, offset[1] = 8
     // [[2,1,5],[9,1]]\n[[1,2]] --> data column [3 column array], offset[-1] = 
0, offset[0] = 2, offset[1] = 3
-    WrappedPtr data;
-    WrappedPtr offsets;
+    IColumn::WrappedPtr data;
+    ColumnOffsets::WrappedPtr offsets;
 };
 
 } // namespace doris
diff --git a/be/src/core/column/column_const.cpp 
b/be/src/core/column/column_const.cpp
index b056d4dbe0a..18efa4310a4 100644
--- a/be/src/core/column/column_const.cpp
+++ b/be/src/core/column/column_const.cpp
@@ -36,7 +36,7 @@ namespace doris {
 
 ColumnPtr squash_const(const ColumnPtr& col) {
     ColumnPtr res = col;
-    while (const auto* c = typeid_cast<const ColumnConst*>(res.get())) {
+    while (const auto* c = check_and_get_column<ColumnConst>(res.get())) {
         res = c->get_data_column_ptr();
     }
     return res;
diff --git a/be/src/core/column/column_const.h 
b/be/src/core/column/column_const.h
index 74dd0cb914e..dd5296cc1ef 100644
--- a/be/src/core/column/column_const.h
+++ b/be/src/core/column/column_const.h
@@ -105,7 +105,7 @@ class ColumnConst final : public COWHelper<IColumn, 
ColumnConst> {
 private:
     friend class COWHelper<IColumn, ColumnConst>;
     using Self = ColumnConst;
-    WrappedPtr data;
+    IColumn::WrappedPtr data;
     size_t s;
 
     ColumnConst(const ColumnPtr& data, size_t s_, bool create_with_empty = 
false,
@@ -266,7 +266,7 @@ public:
     void for_each_subcolumn(ColumnCallback callback) override { 
callback(data); }
 
     bool structure_equals(const IColumn& rhs) const override {
-        if (const auto* rhs_concrete = typeid_cast<const ColumnConst*>(&rhs)) {
+        if (const auto* rhs_concrete = 
check_and_get_column<ColumnConst>(&rhs)) {
             return data->structure_equals(*rhs_concrete->data);
         }
         return false;
diff --git a/be/src/core/column/column_decimal.h 
b/be/src/core/column/column_decimal.h
index abf705511bc..f474f1b8a8b 100644
--- a/be/src/core/column/column_decimal.h
+++ b/be/src/core/column/column_decimal.h
@@ -205,7 +205,7 @@ public:
     MutableColumnPtr permute(const IColumn::Permutation& perm, size_t limit) 
const override;
 
     bool structure_equals(const IColumn& rhs) const override {
-        if (auto rhs_concrete = typeid_cast<const ColumnDecimal<T>*>(&rhs))
+        if (auto rhs_concrete = check_and_get_column<ColumnDecimal<T>>(&rhs))
             return scale == rhs_concrete->scale;
         return false;
     }
diff --git a/be/src/core/column/column_map.cpp 
b/be/src/core/column/column_map.cpp
index 4825bd0d812..059063b8d0a 100644
--- a/be/src/core/column/column_map.cpp
+++ b/be/src/core/column/column_map.cpp
@@ -52,6 +52,18 @@ const ColumnMap::COffsets& check_map_offsets_column(const 
IColumn& offsets_colum
     return *offsets_concrete;
 }
 
+ColumnMap::COffsets::Ptr check_map_offsets_column_ptr(const ColumnPtr& 
offsets_column) {
+    return 
ColumnMap::COffsets::cast_to_column_ptr(&check_map_offsets_column(*offsets_column));
+}
+
+ColumnMap::COffsets::MutablePtr assert_mutable_map_offsets(MutableColumnPtr&& 
offsets_column) {
+    check_map_offsets_column(*offsets_column);
+    auto mutable_offsets = ColumnMap::COffsets::cast_to_column_mutptr(
+            assert_cast<ColumnMap::COffsets*, 
TypeCheckOnRelease::DISABLE>(offsets_column.get()));
+    offsets_column = nullptr;
+    return mutable_offsets;
+}
+
 void validate_map_columns(const IColumn& keys, const IColumn& values, const 
IColumn& offsets) {
     const auto& offsets_concrete = check_map_offsets_column(offsets);
 
@@ -86,21 +98,21 @@ std::string ColumnMap::get_name() const {
 ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values, 
MutableColumnPtr&& offsets)
         : keys_column(std::move(keys)),
           values_column(std::move(values)),
-          offsets_column(std::move(offsets)) {
+          offsets_column(assert_mutable_map_offsets(std::move(offsets))) {
     check_const_only_in_top_level();
     validate_map_columns(*static_cast<const IColumn::Ptr&>(keys_column),
                          *static_cast<const IColumn::Ptr&>(values_column),
-                         *static_cast<const IColumn::Ptr&>(offsets_column));
+                         *static_cast<const COffsets::Ptr&>(offsets_column));
 }
 
 ColumnMap::ColumnMap(SharedTag, ColumnPtr keys, ColumnPtr values, ColumnPtr 
offsets) {
     static_cast<IColumn::Ptr&>(keys_column) = std::move(keys);
     static_cast<IColumn::Ptr&>(values_column) = std::move(values);
-    static_cast<IColumn::Ptr&>(offsets_column) = std::move(offsets);
+    static_cast<COffsets::Ptr&>(offsets_column) = 
check_map_offsets_column_ptr(offsets);
     check_const_only_in_top_level();
     validate_map_columns(*static_cast<const IColumn::Ptr&>(keys_column),
                          *static_cast<const IColumn::Ptr&>(values_column),
-                         *static_cast<const IColumn::Ptr&>(offsets_column));
+                         *static_cast<const COffsets::Ptr&>(offsets_column));
 }
 
 // todo. here to resize every row map
@@ -547,10 +559,10 @@ void ColumnMap::insert_range_from_ignore_overflow(const 
IColumn& src, size_t sta
 
 ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) 
const {
     auto k_arr = ColumnArray::create(static_cast<const 
IColumn::Ptr&>(keys_column),
-                                     static_cast<const 
IColumn::Ptr&>(offsets_column))
+                                     static_cast<const 
IColumn::Ptr&>(get_offsets_ptr()))
                          ->filter(filt, result_size_hint);
     auto v_arr = ColumnArray::create(static_cast<const 
IColumn::Ptr&>(values_column),
-                                     static_cast<const 
IColumn::Ptr&>(offsets_column))
+                                     static_cast<const 
IColumn::Ptr&>(get_offsets_ptr()))
                          ->filter(filt, result_size_hint);
     return ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
                              assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
@@ -560,7 +572,7 @@ ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t 
result_size_hint) const
 size_t ColumnMap::filter(const Filter& filter) {
     // Move subcolumns out of this ColumnMap to get exclusive ownership, then 
write back.
     auto keys_mut = 
IColumn::mutate(std::move(static_cast<IColumn::Ptr&>(keys_column)));
-    auto offsets_mut = 
IColumn::mutate(std::move(static_cast<IColumn::Ptr&>(offsets_column)));
+    auto offsets_mut = IColumn::mutate(std::move(get_offsets_ptr()));
     auto values_mut = 
IColumn::mutate(std::move(static_cast<IColumn::Ptr&>(values_column)));
     // Clone offsets for values (both keys and values share the same offsets 
structure)
     MutableColumnPtr copied_off = offsets_mut->clone_empty();
@@ -571,19 +583,19 @@ size_t ColumnMap::filter(const Filter& filter) {
     v_arr->filter(filter);
     // Put filtered subcolumns back
     static_cast<IColumn::Ptr&>(keys_column) = k_arr->get_data_ptr();
-    static_cast<IColumn::Ptr&>(offsets_column) = k_arr->get_offsets_ptr();
+    offsets_column = k_arr->get_offsets_ptr();
     static_cast<IColumn::Ptr&>(values_column) = v_arr->get_data_ptr();
-    // Use const access to avoid assert_mutable_ref() on the just-written-back 
offsets_column
-    // (k_arr still holds a ref, so use_count > 1 until k_arr goes out of 
scope)
-    return static_cast<const IColumn::Ptr&>(offsets_column)->size();
+    // Read through a const view because k_arr still shares the 
just-written-back offsets
+    // until it goes out of scope, so mutable WrappedPtr access would hit 
assert_mutable_ref().
+    return static_cast<const COffsets::Ptr&>(offsets_column)->size();
 }
 
 MutableColumnPtr ColumnMap::permute(const Permutation& perm, size_t limit) 
const {
     auto k_arr = ColumnArray::create(static_cast<const 
IColumn::Ptr&>(keys_column),
-                                     static_cast<const 
IColumn::Ptr&>(offsets_column))
+                                     static_cast<const 
IColumn::Ptr&>(get_offsets_ptr()))
                          ->permute(perm, limit);
     auto v_arr = ColumnArray::create(static_cast<const 
IColumn::Ptr&>(values_column),
-                                     static_cast<const 
IColumn::Ptr&>(offsets_column))
+                                     static_cast<const 
IColumn::Ptr&>(get_offsets_ptr()))
                          ->permute(perm, limit);
 
     return ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
@@ -593,9 +605,9 @@ MutableColumnPtr ColumnMap::permute(const Permutation& 
perm, size_t limit) const
 
 Status ColumnMap::deduplicate_keys(bool recursive) {
     const IColumn& ck = *static_cast<const IColumn::Ptr&>(keys_column);
-    const IColumn& co = *static_cast<const IColumn::Ptr&>(offsets_column);
+    const auto& offsets = static_cast<const ColumnMap*>(this)->get_offsets();
     const auto inner_rows = ck.size();
-    const auto rows = co.size();
+    const auto rows = offsets.size();
 
     if (recursive) {
         const auto& values_ptr = static_cast<const 
IColumn::Ptr&>(values_column);
@@ -651,8 +663,6 @@ Status ColumnMap::deduplicate_keys(bool recursive) {
     auto& new_offsets_data = new_offsets->get_data();
 
     IColumn::Filter filter(inner_rows, 1);
-    const auto& offsets = static_cast<const ColumnMap*>(this)->get_offsets();
-
     Offset64 offset = 0;
     bool has_duplicated_key = false;
 
diff --git a/be/src/core/column/column_map.h b/be/src/core/column/column_map.h
index 45646e2019d..b40811acdcf 100644
--- a/be/src/core/column/column_map.h
+++ b/be/src/core/column/column_map.h
@@ -43,6 +43,7 @@
 #include "core/string_ref.h"
 #include "core/types.h"
 #include "exec/common/sip_hash.h"
+#include "util/defer_op.h"
 
 class SipHash;
 
@@ -74,9 +75,14 @@ public:
     std::string get_name() const override;
 
     void for_each_subcolumn(ColumnCallback callback) override {
+        IColumn::WrappedPtr 
offsets(std::move(static_cast<COffsets::Ptr&>(offsets_column)));
+        Defer defer([&] {
+            static_cast<COffsets::Ptr&>(offsets_column) =
+                    cast_to_column<COffsets>(static_cast<const 
IColumn::Ptr&>(offsets));
+        });
         callback(keys_column);
         callback(values_column);
-        callback(offsets_column);
+        callback(offsets);
     }
 
     void sanity_check() const override {
@@ -131,18 +137,17 @@ public:
                                "Method replace_column_data is not supported 
for " + get_name());
     }
 
-    ColumnArray::Offsets64& ALWAYS_INLINE get_offsets() {
-        return assert_cast<COffsets&, 
TypeCheckOnRelease::DISABLE>(*offsets_column).get_data();
-    }
+    ColumnArray::Offsets64& ALWAYS_INLINE get_offsets() { return 
offsets_column->get_data(); }
     const ColumnArray::Offsets64& ALWAYS_INLINE get_offsets() const {
-        return assert_cast<const COffsets&, 
TypeCheckOnRelease::DISABLE>(*offsets_column)
-                .get_data();
+        return offsets_column->get_data();
     }
-    IColumn& get_offsets_column() { return *offsets_column; }
-    const IColumn& get_offsets_column() const { return *offsets_column; }
+    COffsets& get_offsets_column() { return *offsets_column; }
+    const COffsets& get_offsets_column() const { return *offsets_column; }
 
-    const ColumnPtr& get_offsets_ptr() const { return offsets_column; }
-    ColumnPtr& get_offsets_ptr() { return offsets_column; }
+    const COffsets::Ptr& get_offsets_ptr() const {
+        return static_cast<const COffsets::Ptr&>(offsets_column);
+    }
+    COffsets::Ptr& get_offsets_ptr() { return 
static_cast<COffsets::Ptr&>(offsets_column); }
 
     size_t ALWAYS_INLINE offset_at(ssize_t i) const { return get_offsets()[i - 
1]; }
 
@@ -179,9 +184,13 @@ public:
     IColumn& get_keys() { return *keys_column; }
 
     const ColumnPtr get_keys_array_ptr() const {
-        return ColumnArray::create(keys_column, offsets_column);
+        return ColumnArray::create(keys_column,
+                                   static_cast<const 
IColumn::Ptr&>(get_offsets_ptr()));
+    }
+    ColumnPtr get_keys_array_ptr() {
+        return ColumnArray::create(keys_column,
+                                   static_cast<const 
IColumn::Ptr&>(get_offsets_ptr()));
     }
-    ColumnPtr get_keys_array_ptr() { return ColumnArray::create(keys_column, 
offsets_column); }
 
     const ColumnPtr& get_values_ptr() const { return values_column; }
     ColumnPtr& get_values_ptr() { return values_column; }
@@ -190,9 +199,13 @@ public:
     IColumn& get_values() { return *values_column; }
 
     const ColumnPtr get_values_array_ptr() const {
-        return ColumnArray::create(values_column, offsets_column);
+        return ColumnArray::create(values_column,
+                                   static_cast<const 
IColumn::Ptr&>(get_offsets_ptr()));
+    }
+    ColumnPtr get_values_array_ptr() {
+        return ColumnArray::create(values_column,
+                                   static_cast<const 
IColumn::Ptr&>(get_offsets_ptr()));
     }
-    ColumnPtr get_values_array_ptr() { return 
ColumnArray::create(values_column, offsets_column); }
 
     size_t ALWAYS_INLINE size_at(ssize_t i) const {
         return get_offsets()[i] - get_offsets()[i - 1];
@@ -239,9 +252,9 @@ public:
 private:
     friend class COWHelper<IColumn, ColumnMap>;
 
-    WrappedPtr keys_column;    // nullable
-    WrappedPtr values_column;  // nullable
-    WrappedPtr offsets_column; // offset
+    IColumn::WrappedPtr keys_column;     // nullable
+    IColumn::WrappedPtr values_column;   // nullable
+    COffsets::WrappedPtr offsets_column; // offset
 
     ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values, 
MutableColumnPtr&& offsets);
     ColumnMap(SharedTag, ColumnPtr keys, ColumnPtr values, ColumnPtr offsets);
diff --git a/be/src/core/column/column_nullable.cpp 
b/be/src/core/column/column_nullable.cpp
index 148fbb0dcf8..c7ce9ab8f26 100644
--- a/be/src/core/column/column_nullable.cpp
+++ b/be/src/core/column/column_nullable.cpp
@@ -43,6 +43,10 @@ const ColumnUInt8& check_nullable_null_map_column(const 
IColumn& null_map) {
     return *concrete;
 }
 
+ColumnUInt8::Ptr check_nullable_null_map_column_ptr(const ColumnPtr& null_map) 
{
+    return 
ColumnUInt8::cast_to_column_ptr(&check_nullable_null_map_column(*null_map));
+}
+
 void check_nullable_sizes(const IColumn& nested_column, const IColumn& 
null_map) {
     const auto& null_map_concrete = check_nullable_null_map_column(null_map);
     if (nested_column.size() != null_map_concrete.size()) {
@@ -55,24 +59,36 @@ void check_nullable_sizes(const IColumn& nested_column, 
const IColumn& null_map)
 
 } // namespace
 
+namespace {
+ColumnUInt8::MutablePtr assert_mutable_null_map(MutableColumnPtr&& null_map) {
+    if (is_column_const(*null_map)) [[unlikely]] {
+        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+                               "ColumnNullable cannot have constant null map");
+    }
+    auto mutable_null_map = ColumnUInt8::cast_to_column_mutptr(
+            assert_cast<ColumnUInt8*, 
TypeCheckOnRelease::DISABLE>(null_map.get()));
+    null_map = nullptr;
+    return mutable_null_map;
+}
+} // namespace
+
 ColumnNullable::ColumnNullable(MutableColumnPtr&& nested_column_, 
MutableColumnPtr&& null_map_)
+        : ColumnNullable(std::move(nested_column_), 
assert_mutable_null_map(std::move(null_map_))) {
+}
+
+ColumnNullable::ColumnNullable(MutableColumnPtr&& nested_column_,
+                               ColumnUInt8::MutablePtr&& null_map_)
         : _nested_column(std::move(nested_column_)), 
_null_map(std::move(null_map_)) {
     check_const_only_in_top_level();
     // after convert const column to full column, it may be a nullable column
     if (_nested_column->is_nullable()) {
-        assert_cast<ColumnNullable&>(*_nested_column)
-                .apply_null_map(static_cast<const 
ColumnUInt8&>(get_null_map_column()));
-        _null_map = 
assert_cast<ColumnNullable&>(*_nested_column).get_null_map_column_ptr();
-        _nested_column = 
assert_cast<ColumnNullable&>(*_nested_column).get_nested_column_ptr();
-    }
-
-    if (is_column_const(get_null_map_column())) [[unlikely]] {
-        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
-                               "ColumnNullable cannot have constant null map");
-        __builtin_unreachable();
+        auto& nested_nullable = assert_cast<ColumnNullable&>(*_nested_column);
+        nested_nullable.apply_null_map(get_null_map_column());
+        _null_map = nested_nullable._null_map;
+        _nested_column = nested_nullable.get_nested_column_ptr();
     }
     check_nullable_sizes(*static_cast<const IColumn::Ptr&>(_nested_column),
-                         *static_cast<const IColumn::Ptr&>(_null_map));
+                         *static_cast<const ColumnUInt8::Ptr&>(_null_map));
 }
 
 ColumnNullable::ColumnNullable(SharedTag, ColumnPtr nested_column_, ColumnPtr 
null_map_) {
@@ -80,8 +96,9 @@ ColumnNullable::ColumnNullable(SharedTag, ColumnPtr 
nested_column_, ColumnPtr nu
 
     if (const auto* nullable_nested = 
check_and_get_column<ColumnNullable>(nested_column_.get())) {
         auto merged_null_map = null_map_->clone_empty();
-        merged_null_map->insert_range_from(*null_map_, 0, null_map_->size());
-        auto& merged_null_map_data = 
assert_cast<ColumnUInt8&>(*merged_null_map).get_data();
+        auto merged_null_map_ptr = 
assert_mutable_null_map(std::move(merged_null_map));
+        merged_null_map_ptr->insert_range_from(*null_map_, 0, 
null_map_->size());
+        auto& merged_null_map_data = merged_null_map_ptr->get_data();
         const auto& nested_null_map_data = 
nullable_nested->get_null_map_data();
         DCHECK_EQ(merged_null_map_data.size(), nested_null_map_data.size());
         for (size_t i = 0; i != merged_null_map_data.size(); ++i) {
@@ -89,21 +106,21 @@ ColumnNullable::ColumnNullable(SharedTag, ColumnPtr 
nested_column_, ColumnPtr nu
         }
 
         static_cast<IColumn::Ptr&>(_nested_column) = 
nullable_nested->get_nested_column_ptr();
-        static_cast<IColumn::Ptr&>(_null_map) = std::move(merged_null_map);
+        static_cast<ColumnUInt8::Ptr&>(_null_map) = 
std::move(merged_null_map_ptr);
     } else {
         static_cast<IColumn::Ptr&>(_nested_column) = std::move(nested_column_);
-        static_cast<IColumn::Ptr&>(_null_map) = std::move(null_map_);
+        static_cast<ColumnUInt8::Ptr&>(_null_map) = 
check_nullable_null_map_column_ptr(null_map_);
     }
 
     check_const_only_in_top_level();
     check_nullable_sizes(*static_cast<const IColumn::Ptr&>(_nested_column),
-                         *static_cast<const IColumn::Ptr&>(_null_map));
+                         *static_cast<const ColumnUInt8::Ptr&>(_null_map));
 }
 
 void ColumnNullable::replace_columns(ColumnPtr nested_column, ColumnPtr 
null_map) {
     check_nullable_sizes(*nested_column, *null_map);
     static_cast<IColumn::Ptr&>(_nested_column) = std::move(nested_column);
-    static_cast<IColumn::Ptr&>(_null_map) = std::move(null_map);
+    static_cast<ColumnUInt8::Ptr&>(_null_map) = 
check_nullable_null_map_column_ptr(null_map);
     check_const_only_in_top_level();
 }
 
@@ -176,7 +193,8 @@ void ColumnNullable::update_crc32c_batch(uint32_t* 
__restrict hashes,
         // before this normalized nested column is written back.
         auto nested_mut = std::move(*static_cast<const 
IColumn::Ptr&>(_nested_column)).mutate();
         nested_mut->replace_column_null_data(real_null_data);
-        static_cast<IColumn::Ptr&>(const_cast<WrappedPtr&>(_nested_column)) = 
std::move(nested_mut);
+        
static_cast<IColumn::Ptr&>(const_cast<IColumn::WrappedPtr&>(_nested_column)) =
+                std::move(nested_mut);
         _nested_column->update_crc32c_batch(hashes, nullptr);
     } else {
         auto s = size();
@@ -431,10 +449,10 @@ void 
ColumnNullable::append_data_by_selector(IColumn::MutablePtr& res,
                                              const IColumn::Selector& 
selector, size_t begin,
                                              size_t end) const {
     auto& res_column = assert_cast<ColumnNullable&>(*res);
-    auto res_nested_column = res_column.get_nested_column_ptr();
+    MutableColumnPtr res_nested_column = res_column.get_nested_column_ptr();
     get_nested_column().append_data_by_selector(res_nested_column, selector, 
begin, end);
-    auto res_null_map = res_column.get_null_map_column_ptr();
-    get_null_map_column().append_data_by_selector(res_null_map, selector, 
begin, end);
+    IColumn::MutablePtr res_null_map_column = 
res_column.get_null_map_column_ptr();
+    get_null_map_column().append_data_by_selector(res_null_map_column, 
selector, begin, end);
 }
 
 void ColumnNullable::pop_back(size_t n) {
@@ -459,8 +477,8 @@ Status ColumnNullable::filter_by_selector(const uint16_t* 
sel, size_t sel_size,
     auto* nullable_col_ptr = assert_cast<ColumnNullable*>(col_ptr);
     // Access the nested column via const path to avoid assert_mutable_ref 
(which requires
     // exclusive ownership). The output col_ptr was just created, so its 
nested column is exclusive.
-    IColumn* nest_col_raw = const_cast<IColumn*>(
-            static_cast<const 
WrappedPtr&>(nullable_col_ptr->_nested_column).get());
+    auto nest_col_raw = const_cast<IColumn*>(
+            static_cast<const 
IColumn::WrappedPtr&>(nullable_col_ptr->_nested_column).get());
 
     /// `get_null_map_data` will set `_need_update_has_null` to true
     auto& res_nullmap = nullable_col_ptr->get_null_map_data();
diff --git a/be/src/core/column/column_nullable.h 
b/be/src/core/column/column_nullable.h
index d1bf9749e57..bc0a02cd95f 100644
--- a/be/src/core/column/column_nullable.h
+++ b/be/src/core/column/column_nullable.h
@@ -27,11 +27,13 @@
 #include "core/column/column_vector.h"
 #include "core/cow.h"
 #include "core/data_type/define_primitive_type.h"
+#include "core/data_type/primitive_type.h"
 #include "core/field.h"
 #include "core/string_ref.h"
 #include "core/typeid_cast.h"
 #include "core/types.h"
 #include "storage/olap_common.h"
+#include "util/defer_op.h"
 
 class SipHash;
 
@@ -59,6 +61,7 @@ private:
     ColumnNullable(MutableColumnPtr&& nested_column_, MutableColumnPtr&& 
null_map_);
     struct SharedTag {};
     ColumnNullable(SharedTag, ColumnPtr nested_column_, ColumnPtr null_map_);
+    ColumnNullable(MutableColumnPtr&& nested_column_, 
ColumnUInt8::MutablePtr&& null_map_);
     ColumnNullable(const ColumnNullable&) = default;
 
 public:
@@ -70,6 +73,11 @@ public:
         return Base::create(SharedTag {}, nested_column_, null_map_);
     }
 
+    static MutablePtr create(MutableColumnPtr&& nested_column_,
+                             ColumnUInt8::MutablePtr&& null_map_) {
+        return Base::create(std::move(nested_column_), std::move(null_map_));
+    }
+
     template <typename... Args>
     static MutablePtr create(Args&&... args)
         requires IsMutableColumns<Args...>::value
@@ -240,11 +248,16 @@ public:
 
     void for_each_subcolumn(ColumnCallback callback) override {
         callback(_nested_column);
-        callback(_null_map);
+
+        IColumn::WrappedPtr 
null_map(std::move(static_cast<ColumnUInt8::Ptr&>(_null_map)));
+        Defer defer([&] {
+            _null_map = cast_to_column<ColumnUInt8>(static_cast<const 
IColumn::Ptr&>(null_map));
+        });
+        callback(null_map);
     }
 
     bool structure_equals(const IColumn& rhs) const override {
-        if (const auto* rhs_nullable = typeid_cast<const 
ColumnNullable*>(&rhs)) {
+        if (const auto* rhs_nullable = 
check_and_get_column<ColumnNullable>(&rhs)) {
             return 
_nested_column->structure_equals(*rhs_nullable->_nested_column);
         }
         return false;
@@ -374,16 +387,16 @@ public:
     }
 
     // return the column that represents the byte map. if want use null_map, 
just call this.
-    const ColumnPtr& get_null_map_column_ptr() const { return _null_map; }
-    const ColumnUInt8& get_null_map_column() const {
-        return assert_cast<const ColumnUInt8&, 
TypeCheckOnRelease::DISABLE>(*_null_map);
-    }
+    const ColumnUInt8::Ptr& get_null_map_column_ptr() const { return 
_null_map; }
+    const ColumnUInt8& get_null_map_column() const { return *_null_map; }
     const NullMap& get_null_map_data() const { return 
get_null_map_column().get_data(); }
 
-    MutableColumnPtr get_null_map_column_ptr() { return 
_null_map->assert_mutable(); }
-    ColumnUInt8& get_null_map_column() {
-        return assert_cast<ColumnUInt8&, 
TypeCheckOnRelease::DISABLE>(*_null_map);
+    ColumnUInt8::MutablePtr get_null_map_column_ptr() {
+        auto null_map = _null_map->assert_mutable();
+        return ColumnUInt8::cast_to_column_mutptr(
+                assert_cast<ColumnUInt8*, 
TypeCheckOnRelease::DISABLE>(null_map.get()));
     }
+    ColumnUInt8& get_null_map_column() { return *_null_map; }
     NullMap& get_null_map_data() { return get_null_map_column().get_data(); }
 
     // push not null value wouldn't change the nullity. no need to update 
_has_null
@@ -397,8 +410,8 @@ private:
     template <bool negative>
     void apply_null_map_impl(const ColumnUInt8& map);
 
-    WrappedPtr _nested_column;
-    WrappedPtr _null_map;
+    IColumn::WrappedPtr _nested_column;
+    ColumnUInt8::WrappedPtr _null_map;
 };
 
 ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable = false);
diff --git a/be/src/core/column/column_struct.cpp 
b/be/src/core/column/column_struct.cpp
index 74e1a34914d..6be7514001d 100644
--- a/be/src/core/column/column_struct.cpp
+++ b/be/src/core/column/column_struct.cpp
@@ -386,7 +386,7 @@ void ColumnStruct::for_each_subcolumn(ColumnCallback 
callback) {
 }
 
 bool ColumnStruct::structure_equals(const IColumn& rhs) const {
-    if (const auto* rhs_tuple = typeid_cast<const ColumnStruct*>(&rhs)) {
+    if (const auto* rhs_tuple = check_and_get_column<ColumnStruct>(&rhs)) {
         const size_t tuple_size = columns.size();
         if (tuple_size != rhs_tuple->columns.size()) {
             return false;
diff --git a/be/src/core/column/column_struct.h 
b/be/src/core/column/column_struct.h
index 61a2902b0ec..d63f3445f47 100644
--- a/be/src/core/column/column_struct.h
+++ b/be/src/core/column/column_struct.h
@@ -51,7 +51,7 @@ class ColumnStruct final : public COWHelper<IColumn, 
ColumnStruct> {
 private:
     friend class COWHelper<IColumn, ColumnStruct>;
 
-    using TupleColumns = std::vector<WrappedPtr>;
+    using TupleColumns = std::vector<IColumn::WrappedPtr>;
     TupleColumns columns;
 
     template <bool positive>
diff --git a/be/src/core/column/column_variant.cpp 
b/be/src/core/column/column_variant.cpp
index be504a08701..84b009b093e 100644
--- a/be/src/core/column/column_variant.cpp
+++ b/be/src/core/column/column_variant.cpp
@@ -2122,7 +2122,7 @@ void ColumnVariant::clear_sparse_column() {
 }
 
 void ColumnVariant::ensure_binary_columns_rows() {
-    auto resize_if_empty = [this](WrappedPtr& column) {
+    auto resize_if_empty = [this](IColumn::WrappedPtr& column) {
         const auto& const_column = static_cast<const IColumn::Ptr&>(column);
         if (const_column->size() == num_rows) {
             return;
diff --git a/be/src/core/column/column_variant.h 
b/be/src/core/column/column_variant.h
index 1ae92afd54c..1d5c4eed137 100644
--- a/be/src/core/column/column_variant.h
+++ b/be/src/core/column/column_variant.h
@@ -247,7 +247,7 @@ public:
         /// Parts of column. Parts should be in increasing order in terms of 
subtypes/supertypes.
         /// That means that the least common type for i-th prefix is the type 
of i-th part
         /// and it's the supertype for all type of column from 0 to i-1.
-        std::vector<WrappedPtr> data;
+        std::vector<IColumn::WrappedPtr> data;
         std::vector<DataTypePtr> data_types;
         std::vector<DataTypeSerDeSPtr> data_serdes;
         /// Until we insert any non-default field we don't know further
@@ -275,10 +275,10 @@ private:
 
     // It's filled when the number of subcolumns reaches the limit.
     // It has type Map(String, String) and stores a map (path, binary 
serialized subcolumn value) for each row.
-    WrappedPtr serialized_sparse_column = ColumnMap::create(
+    IColumn::WrappedPtr serialized_sparse_column = ColumnMap::create(
             ColumnString::create(), ColumnString::create(), 
ColumnArray::ColumnOffsets::create());
 
-    WrappedPtr serialized_doc_value_column = ColumnMap::create(
+    IColumn::WrappedPtr serialized_doc_value_column = ColumnMap::create(
             ColumnString::create(), ColumnString::create(), 
ColumnArray::ColumnOffsets::create());
 
     // if `_max_subcolumns_count == 0`, all subcolumns are materialized.
diff --git a/be/src/core/cow.h b/be/src/core/cow.h
index 567682d1ac0..31daddc5c4e 100644
--- a/be/src/core/cow.h
+++ b/be/src/core/cow.h
@@ -352,13 +352,13 @@ protected:
                 : value(std::forward<std::initializer_list<U>>(arg)) {}
 
         const T* get() const { return value.get(); }
-        T* get() { return &value->assert_mutable_ref(); }
+        T* get() { return &static_cast<T&>(value->assert_mutable_ref()); }
 
         const T* operator->() const { return get(); }
         T* operator->() { return get(); }
 
         const T& operator*() const { return *value; }
-        T& operator*() { return value->assert_mutable_ref(); }
+        T& operator*() { return static_cast<T&>(value->assert_mutable_ref()); }
 
         operator const immutable_ptr<T>&() const { return value; }
         operator immutable_ptr<T>&() { return value; }
@@ -420,6 +420,7 @@ public:
     static_assert(std::is_base_of_v<doris::IColumn, Base>, "COWHelper only use 
in IColumn");
     using Ptr = typename Base::template immutable_ptr<Derived>;
     using MutablePtr = typename Base::template mutable_ptr<Derived>;
+    using WrappedPtr = typename Base::template chameleon_ptr<Derived>;
 
 #include "common/compile_check_avoid_begin.h"
 
@@ -434,6 +435,12 @@ public:
     }
 #include "common/compile_check_avoid_end.h"
 
+    static Ptr cast_to_column_ptr(const Derived* raw_type_ptr) { return 
Ptr(raw_type_ptr); }
+
+    static MutablePtr cast_to_column_mutptr(Derived* raw_type_ptr) {
+        return MutablePtr(raw_type_ptr);
+    }
+
     typename Base::MutablePtr clone() const override {
         return typename Base::MutablePtr(new Derived(static_cast<const 
Derived&>(*this)));
     }
diff --git a/be/src/core/data_type/data_type_array.cpp 
b/be/src/core/data_type/data_type_array.cpp
index 6e86067597a..bf6543065c7 100644
--- a/be/src/core/data_type/data_type_array.cpp
+++ b/be/src/core/data_type/data_type_array.cpp
@@ -154,7 +154,11 @@ const char* DataTypeArray::deserialize(const char* buf, 
MutableColumnPtr* column
         // children
         auto nested_column = std::move(*data_column->get_data_ptr()).mutate();
         buf = get_nested_type()->deserialize(buf, &nested_column, 
be_exec_version);
-        data_column->get_offsets_ptr() = std::move(offsets_column);
+        auto typed_offsets_column = 
ColumnArray::ColumnOffsets::cast_to_column_mutptr(
+                assert_cast<ColumnArray::ColumnOffsets*, 
TypeCheckOnRelease::DISABLE>(
+                        offsets_column.get()));
+        offsets_column = nullptr;
+        data_column->get_offsets_ptr() = std::move(typed_offsets_column);
         data_column->get_data_ptr() = std::move(nested_column);
         return buf;
     } else {
@@ -172,7 +176,11 @@ const char* DataTypeArray::deserialize(const char* buf, 
MutableColumnPtr* column
         // children
         auto nested_column = std::move(*data_column->get_data_ptr()).mutate();
         buf = get_nested_type()->deserialize(buf, &nested_column, 
be_exec_version);
-        data_column->get_offsets_ptr() = std::move(offsets_column);
+        auto typed_offsets_column = 
ColumnArray::ColumnOffsets::cast_to_column_mutptr(
+                assert_cast<ColumnArray::ColumnOffsets*, 
TypeCheckOnRelease::DISABLE>(
+                        offsets_column.get()));
+        offsets_column = nullptr;
+        data_column->get_offsets_ptr() = std::move(typed_offsets_column);
         data_column->get_data_ptr() = std::move(nested_column);
         return buf;
     }
diff --git a/be/src/core/data_type/data_type_map.cpp 
b/be/src/core/data_type/data_type_map.cpp
index 55f14a43104..f3afbd7af9b 100644
--- a/be/src/core/data_type/data_type_map.cpp
+++ b/be/src/core/data_type/data_type_map.cpp
@@ -165,7 +165,11 @@ const char* DataTypeMap::deserialize(const char* buf, 
MutableColumnPtr* column,
         auto nested_values_column = 
std::move(*map_column->get_values_ptr()).mutate();
         buf = get_key_type()->deserialize(buf, &nested_keys_column, 
be_exec_version);
         buf = get_value_type()->deserialize(buf, &nested_values_column, 
be_exec_version);
-        map_column->get_offsets_ptr() = std::move(offsets_column);
+        auto typed_offsets_column = ColumnMap::COffsets::cast_to_column_mutptr(
+                assert_cast<ColumnMap::COffsets*, TypeCheckOnRelease::DISABLE>(
+                        offsets_column.get()));
+        offsets_column = nullptr;
+        map_column->get_offsets_ptr() = std::move(typed_offsets_column);
         map_column->get_keys_ptr() = std::move(nested_keys_column);
         map_column->get_values_ptr() = std::move(nested_values_column);
         return buf;
@@ -185,7 +189,11 @@ const char* DataTypeMap::deserialize(const char* buf, 
MutableColumnPtr* column,
         auto nested_values_column = 
std::move(*map_column->get_values_ptr()).mutate();
         buf = get_key_type()->deserialize(buf, &nested_keys_column, 
be_exec_version);
         buf = get_value_type()->deserialize(buf, &nested_values_column, 
be_exec_version);
-        map_column->get_offsets_ptr() = std::move(offsets_column);
+        auto typed_offsets_column = ColumnMap::COffsets::cast_to_column_mutptr(
+                assert_cast<ColumnMap::COffsets*, TypeCheckOnRelease::DISABLE>(
+                        offsets_column.get()));
+        offsets_column = nullptr;
+        map_column->get_offsets_ptr() = std::move(typed_offsets_column);
         map_column->get_keys_ptr() = std::move(nested_keys_column);
         map_column->get_values_ptr() = std::move(nested_values_column);
         return buf;
diff --git a/be/src/exec/operator/table_function_operator.cpp 
b/be/src/exec/operator/table_function_operator.cpp
index d70f6538b7d..46077373f37 100644
--- a/be/src/exec/operator/table_function_operator.cpp
+++ b/be/src/exec/operator/table_function_operator.cpp
@@ -283,8 +283,7 @@ Status 
TableFunctionLocalState::_get_expanded_block_block_fast_path(
         if (out_col->is_nullable()) {
             auto* nullable = assert_cast<ColumnNullable*>(out_col.get());
             struct_col_ptr = 
assert_cast<ColumnStruct*>(nullable->get_nested_column_ptr().get());
-            outer_struct_nullmap_ptr =
-                    
assert_cast<ColumnUInt8*>(nullable->get_null_map_column_ptr().get());
+            outer_struct_nullmap_ptr = 
nullable->get_null_map_column_ptr().get();
         } else {
             struct_col_ptr = assert_cast<ColumnStruct*>(out_col.get());
         }
@@ -344,8 +343,7 @@ Status 
TableFunctionLocalState::_get_expanded_block_block_fast_path(
             val_nullable->get_nested_column_ptr()->insert_range_from(
                     *_block_fast_path_ctx.nested_col, 
segment_ctx.seg_nested_start,
                     segment_ctx.seg_nested_count);
-            auto* val_nullmap =
-                    
assert_cast<ColumnUInt8*>(val_nullable->get_null_map_column_ptr().get());
+            auto* val_nullmap = val_nullable->get_null_map_column_ptr().get();
             auto& val_nullmap_data = val_nullmap->get_data();
             const size_t old_size = val_nullmap_data.size();
             val_nullmap_data.resize(old_size + segment_ctx.seg_nested_count);
@@ -366,8 +364,7 @@ Status 
TableFunctionLocalState::_get_expanded_block_block_fast_path(
             out_nullable->get_nested_column_ptr()->insert_range_from(
                     *_block_fast_path_ctx.nested_col, 
segment_ctx.seg_nested_start,
                     segment_ctx.seg_nested_count);
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(out_nullable->get_null_map_column_ptr().get());
+            auto* nullmap_column = 
out_nullable->get_null_map_column_ptr().get();
             auto& nullmap_data = nullmap_column->get_data();
             const size_t old_size = nullmap_data.size();
             nullmap_data.resize(old_size + segment_ctx.seg_nested_count);
diff --git a/be/src/exprs/function/array/function_array_distance.h 
b/be/src/exprs/function/array/function_array_distance.h
index 8749364d51a..e40618267b2 100644
--- a/be/src/exprs/function/array/function_array_distance.h
+++ b/be/src/exprs/function/array/function_array_distance.h
@@ -196,7 +196,7 @@ public:
             materialized_col1 = arg1.column->convert_to_full_column_if_const();
             arr1 = _extract_array_column(materialized_col1.get(), "First 
argument", get_name());
             float1 = _extract_float_data(arr1, "First argument", get_name());
-            offset1 = assert_cast<const 
ColumnArray::ColumnOffsets*>(arr1->get_offsets_ptr().get());
+            offset1 = arr1->get_offsets_ptr().get();
             offsets_data1 = &offset1->get_data();
             float_data1 = float1->get_data().data();
         }
@@ -205,7 +205,7 @@ public:
             materialized_col2 = arg2.column->convert_to_full_column_if_const();
             arr2 = _extract_array_column(materialized_col2.get(), "Second 
argument", get_name());
             float2 = _extract_float_data(arr2, "Second argument", get_name());
-            offset2 = assert_cast<const 
ColumnArray::ColumnOffsets*>(arr2->get_offsets_ptr().get());
+            offset2 = arr2->get_offsets_ptr().get();
             offsets_data2 = &offset2->get_data();
             float_data2 = float2->get_data().data();
         }
diff --git a/be/src/exprs/function/array/function_array_exists.cpp 
b/be/src/exprs/function/array/function_array_exists.cpp
index a3d5f36e344..9009ba2f755 100644
--- a/be/src/exprs/function/array/function_array_exists.cpp
+++ b/be/src/exprs/function/array/function_array_exists.cpp
@@ -68,8 +68,7 @@ public:
         const auto first_column =
                 
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
         const ColumnArray& first_col_array = assert_cast<const 
ColumnArray&>(*first_column);
-        const auto& first_off_data = assert_cast<const 
ColumnArray::ColumnOffsets&>(
-                first_col_array.get_offsets_column());
+        const auto& first_off_data = first_col_array.get_offsets_column();
 
         const auto& nested_nullable_column =
                 assert_cast<const 
ColumnNullable&>(*first_col_array.get_data_ptr());
diff --git a/be/src/exprs/function/array/function_array_filter.cpp 
b/be/src/exprs/function/array/function_array_filter.cpp
index 71a1a9a1bf8..9d9329fa010 100644
--- a/be/src/exprs/function/array/function_array_filter.cpp
+++ b/be/src/exprs/function/array/function_array_filter.cpp
@@ -70,16 +70,12 @@ public:
                 
block.get_by_position(arguments[1]).column->convert_to_full_column_if_const();
 
         const ColumnArray& first_col_array = assert_cast<const 
ColumnArray&>(*first_column);
-        const auto& first_off_data =
-                assert_cast<const 
ColumnArray::ColumnOffsets&>(first_col_array.get_offsets_column())
-                        .get_data();
+        const auto& first_off_data = 
first_col_array.get_offsets_column().get_data();
         const auto& first_nested_nullable_column =
                 assert_cast<const 
ColumnNullable&>(*first_col_array.get_data_ptr());
 
         const ColumnArray& second_col_array = assert_cast<const 
ColumnArray&>(*second_column);
-        const auto& second_off_data = assert_cast<const 
ColumnArray::ColumnOffsets&>(
-                                              
second_col_array.get_offsets_column())
-                                              .get_data();
+        const auto& second_off_data = 
second_col_array.get_offsets_column().get_data();
         const auto& second_nested_null_map_data =
                 assert_cast<const 
ColumnNullable&>(*second_col_array.get_data_ptr())
                         .get_null_map_column()
diff --git a/be/src/exprs/function/array/function_array_flatten.cpp 
b/be/src/exprs/function/array/function_array_flatten.cpp
index fafea56df5c..d5f5f253156 100644
--- a/be/src/exprs/function/array/function_array_flatten.cpp
+++ b/be/src/exprs/function/array/function_array_flatten.cpp
@@ -63,9 +63,7 @@ public:
         auto* src_data_type_array =
                 assert_cast<const 
DataTypeArray*>(remove_nullable(src_data_type).get());
 
-        auto result_column_offsets = assert_cast<const 
ColumnArray::ColumnOffsets&>(
-                                             
src_column_array_ptr->get_offsets_column())
-                                             .clone();
+        auto result_column_offsets = 
src_column_array_ptr->get_offsets_column().clone();
         auto* offsets = 
assert_cast<ColumnArray::ColumnOffsets*>(result_column_offsets.get())
                                 ->get_data()
                                 .data();
diff --git a/be/src/exprs/function/function_always_not_nullable.h 
b/be/src/exprs/function/function_always_not_nullable.h
index 2277a921e94..20db2fd8d29 100644
--- a/be/src/exprs/function/function_always_not_nullable.h
+++ b/be/src/exprs/function/function_always_not_nullable.h
@@ -51,8 +51,7 @@ public:
             const ColumnNullable* col_nullable = 
check_and_get_column<ColumnNullable>(column.get());
             const ColumnType* col =
                     
check_and_get_column<ColumnType>(col_nullable->get_nested_column_ptr().get());
-            const ColumnUInt8* col_nullmap = check_and_get_column<ColumnUInt8>(
-                    col_nullable->get_null_map_column_ptr().get());
+            const ColumnUInt8* col_nullmap = 
col_nullable->get_null_map_column_ptr().get();
 
             if (col != nullptr && col_nullmap != nullptr) {
                 if constexpr (WithReturn) {
diff --git a/be/src/exprs/function/function_hll.cpp 
b/be/src/exprs/function/function_hll.cpp
index f8423c4cbf7..2ba0d12556c 100644
--- a/be/src/exprs/function/function_hll.cpp
+++ b/be/src/exprs/function/function_hll.cpp
@@ -100,8 +100,7 @@ public:
                     check_and_get_column<ColumnNullable>(column.get())) {
             const ColumnHLL* col =
                     
check_and_get_column<ColumnHLL>(col_nullable->get_nested_column_ptr().get());
-            const ColumnUInt8* col_nullmap = check_and_get_column<ColumnUInt8>(
-                    col_nullable->get_null_map_column_ptr().get());
+            const ColumnUInt8* col_nullmap = 
col_nullable->get_null_map_column_ptr().get();
 
             if (col != nullptr && col_nullmap != nullptr) {
                 Function::vector_nullable(col->get_data(), 
col_nullmap->get_data(), column_result);
diff --git a/be/src/exprs/function/function_quantile_state.cpp 
b/be/src/exprs/function/function_quantile_state.cpp
index af1b80822f0..a0ad4ff0b7c 100644
--- a/be/src/exprs/function/function_quantile_state.cpp
+++ b/be/src/exprs/function/function_quantile_state.cpp
@@ -95,8 +95,7 @@ public:
         const NullMap* nullmap = nullptr;
         if constexpr (is_nullable) {
             col_nullable = check_and_get_column<ColumnNullable>(column.get());
-            col_nullmap = check_and_get_column<ColumnUInt8>(
-                    col_nullable->get_null_map_column_ptr().get());
+            col_nullmap = col_nullable->get_null_map_column_ptr().get();
             col = 
check_and_get_column<ColumnFloat64>(col_nullable->get_nested_column_ptr().get());
             if (col == nullptr || col_nullmap == nullptr) {
                 return type_error();
diff --git a/be/src/exprs/function/if.cpp b/be/src/exprs/function/if.cpp
index 603879dda18..aaa37057968 100644
--- a/be/src/exprs/function/if.cpp
+++ b/be/src/exprs/function/if.cpp
@@ -240,7 +240,7 @@ public:
             return Status::OK();
         }
 
-        const auto* cond_col = typeid_cast<const 
ColumnUInt8*>(arg_cond.column.get());
+        const auto* cond_col = 
check_and_get_column<ColumnUInt8>(arg_cond.column.get());
         const ColumnConst* cond_const_col =
                 check_and_get_column_const<ColumnUInt8>(arg_cond.column.get());
 
@@ -361,19 +361,23 @@ public:
             // b. create a const_nullmap_column: it's a not nullable column or 
a const nullable column, contain a const value
             Block temporary_block;
             temporary_block.insert(arg_cond);
-            auto then_nested_null_map =
-                    (then_type_is_nullable && !then_column_is_const_nullable)
-                            ? then_is_nullable->get_null_map_column_ptr()
-                            : 
DataTypeUInt8().create_column_const_with_default_value(
-                                      input_rows_count);
+            ColumnPtr then_nested_null_map;
+            if (then_type_is_nullable && !then_column_is_const_nullable) {
+                then_nested_null_map = 
then_is_nullable->get_null_map_column_ptr();
+            } else {
+                then_nested_null_map =
+                        
DataTypeUInt8().create_column_const_with_default_value(input_rows_count);
+            }
             temporary_block.insert({then_nested_null_map, 
std::make_shared<DataTypeUInt8>(),
                                     "then_column_null_map"});
 
-            auto else_nested_null_map =
-                    (else_type_is_nullable && !else_column_is_const_nullable)
-                            ? else_is_nullable->get_null_map_column_ptr()
-                            : 
DataTypeUInt8().create_column_const_with_default_value(
-                                      input_rows_count);
+            ColumnPtr else_nested_null_map;
+            if (else_type_is_nullable && !else_column_is_const_nullable) {
+                else_nested_null_map = 
else_is_nullable->get_null_map_column_ptr();
+            } else {
+                else_nested_null_map =
+                        
DataTypeUInt8().create_column_const_with_default_value(input_rows_count);
+            }
             temporary_block.insert({else_nested_null_map, 
std::make_shared<DataTypeUInt8>(),
                                     "else_column_null_map"});
             temporary_block.insert(
diff --git a/be/src/exprs/lambda_function/varray_filter_function.cpp 
b/be/src/exprs/lambda_function/varray_filter_function.cpp
index 1f37bd253e0..60aeb26f201 100644
--- a/be/src/exprs/lambda_function/varray_filter_function.cpp
+++ b/be/src/exprs/lambda_function/varray_filter_function.cpp
@@ -85,9 +85,7 @@ public:
                                              column_array_nullmap.get_data());
         }
         const auto& first_col_array = assert_cast<const 
ColumnArray&>(*first_arg_column);
-        const auto& first_off_data =
-                assert_cast<const 
ColumnArray::ColumnOffsets&>(first_col_array.get_offsets_column())
-                        .get_data();
+        const auto& first_off_data = 
first_col_array.get_offsets_column().get_data();
         const auto& first_nested_nullable_column =
                 assert_cast<const 
ColumnNullable&>(*first_col_array.get_data_ptr());
 
@@ -109,9 +107,7 @@ public:
                                              column_array_nullmap.get_data());
         }
         const auto& second_col_array = assert_cast<const 
ColumnArray&>(*second_arg_column);
-        const auto& second_off_data = assert_cast<const 
ColumnArray::ColumnOffsets&>(
-                                              
second_col_array.get_offsets_column())
-                                              .get_data();
+        const auto& second_off_data = 
second_col_array.get_offsets_column().get_data();
         const auto& second_nested_null_map_data =
                 assert_cast<const 
ColumnNullable&>(*second_col_array.get_data_ptr())
                         .get_null_map_column()
diff --git a/be/src/exprs/lambda_function/varray_map_function.cpp 
b/be/src/exprs/lambda_function/varray_map_function.cpp
index d934ee77bc9..3125593bde0 100644
--- a/be/src/exprs/lambda_function/varray_map_function.cpp
+++ b/be/src/exprs/lambda_function/varray_map_function.cpp
@@ -147,9 +147,8 @@ public:
                                      ->get_nested_type();
 
                 // need to union nullmap from all columns
-                VectorizedUtils::update_null_map(
-                        outside_null_map->get_data(),
-                        assert_cast<const 
ColumnUInt8&>(*column_array_nullmap).get_data());
+                VectorizedUtils::update_null_map(outside_null_map->get_data(),
+                                                 
column_array_nullmap->get_data());
             }
 
             // here is the array column
@@ -159,8 +158,7 @@ public:
             if (i == 0) {
                 nested_array_column_rows = col_array.get_data_ptr()->size();
                 first_array_offsets = col_array.get_offsets_ptr();
-                const auto& off_data = assert_cast<const 
ColumnArray::ColumnOffsets&>(
-                        col_array.get_offsets_column());
+                const auto& off_data = col_array.get_offsets_column();
                 array_column_offset = 
off_data.clone_resized(col_array.get_offsets_column().size());
                 args_info.offsets_ptr = &col_array.get_offsets();
             } else {
diff --git a/be/src/exprs/lambda_function/varray_sort_function.cpp 
b/be/src/exprs/lambda_function/varray_sort_function.cpp
index b5bf523264a..6c2c657bdb4 100644
--- a/be/src/exprs/lambda_function/varray_sort_function.cpp
+++ b/be/src/exprs/lambda_function/varray_sort_function.cpp
@@ -97,9 +97,7 @@ public:
         }
 
         const auto& col_array = assert_cast<const ColumnArray&>(*arg_column);
-        const auto& off_data =
-                assert_cast<const 
ColumnArray::ColumnOffsets&>(col_array.get_offsets_column())
-                        .get_data();
+        const auto& off_data = col_array.get_offsets_column().get_data();
 
         const auto& nested_nullable_column =
                 assert_cast<const ColumnNullable&>(*col_array.get_data_ptr());
diff --git a/be/src/exprs/table_function/python_udtf_function.cpp 
b/be/src/exprs/table_function/python_udtf_function.cpp
index 45a47f59064..9f917ab94fa 100644
--- a/be/src/exprs/table_function/python_udtf_function.cpp
+++ b/be/src/exprs/table_function/python_udtf_function.cpp
@@ -196,9 +196,9 @@ void 
PythonUDTFFunction::get_same_many_values(MutableColumnPtr& column, int leng
         if (_is_nullable) {
             auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
             auto nested_column = nullable_column->get_nested_column_ptr();
-            auto nullmap_column = nullable_column->get_null_map_column_ptr();
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
             nested_column->insert_many_from(*_array_column_detail.nested_col, 
pos, length);
-            
assert_cast<ColumnUInt8*>(nullmap_column.get())->insert_many_defaults(length);
+            nullmap_column->insert_many_defaults(length);
         } else {
             column->insert_many_from(*_array_column_detail.nested_col, pos, 
length);
         }
@@ -216,8 +216,7 @@ int PythonUDTFFunction::get_value(MutableColumnPtr& column, 
int max_step) {
         if (_is_nullable) {
             auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
             auto nested_column = nullable_column->get_nested_column_ptr();
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
 
             nested_column->insert_range_from(*_array_column_detail.nested_col, 
pos, max_step);
             size_t old_size = nullmap_column->size();
diff --git a/be/src/exprs/table_function/udf_table_function.cpp 
b/be/src/exprs/table_function/udf_table_function.cpp
index 00e3fe410ac..cacc3046e75 100644
--- a/be/src/exprs/table_function/udf_table_function.cpp
+++ b/be/src/exprs/table_function/udf_table_function.cpp
@@ -162,9 +162,9 @@ void 
UDFTableFunction::get_same_many_values(MutableColumnPtr& column, int length
         if (_is_nullable) {
             auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
             auto nested_column = nullable_column->get_nested_column_ptr();
-            auto nullmap_column = nullable_column->get_null_map_column_ptr();
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
             nested_column->insert_many_from(*_array_column_detail.nested_col, 
pos, length);
-            
assert_cast<ColumnUInt8*>(nullmap_column.get())->insert_many_defaults(length);
+            nullmap_column->insert_many_defaults(length);
         } else {
             column->insert_many_from(*_array_column_detail.nested_col, pos, 
length);
         }
@@ -181,8 +181,7 @@ int UDFTableFunction::get_value(MutableColumnPtr& column, 
int max_step) {
         if (_is_nullable) {
             auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
             auto nested_column = nullable_column->get_nested_column_ptr();
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
             nested_column->insert_range_from(*_array_column_detail.nested_col, 
pos, max_step);
             size_t old_size = nullmap_column->size();
             nullmap_column->resize(old_size + max_step);
diff --git a/be/src/exprs/table_function/vexplode.cpp 
b/be/src/exprs/table_function/vexplode.cpp
index b4e23ba0977..27ba939ad1b 100644
--- a/be/src/exprs/table_function/vexplode.cpp
+++ b/be/src/exprs/table_function/vexplode.cpp
@@ -139,8 +139,8 @@ void 
VExplodeTableFunction::get_same_many_values(MutableColumnPtr& column, int l
             assert_cast<ColumnNullable*>(column.get())
                     ->get_nested_column_ptr()
                     ->insert_many_from(*_detail.nested_col, pos, length);
-            assert_cast<ColumnUInt8*>(
-                    
assert_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+            assert_cast<ColumnNullable*>(column.get())
+                    ->get_null_map_column_ptr()
                     ->insert_many_defaults(length);
         } else {
             column->insert_many_from(*_detail.nested_col, pos, length);
@@ -158,8 +158,7 @@ int VExplodeTableFunction::get_value(MutableColumnPtr& 
column, int max_step) {
         if (_is_nullable) {
             auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
             auto nested_column = nullable_column->get_nested_column_ptr();
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
             nested_column->insert_range_from(*_detail.nested_col, pos, 
max_step);
             size_t old_size = nullmap_column->size();
             nullmap_column->resize(old_size + max_step);
diff --git a/be/src/exprs/table_function/vexplode_bitmap.cpp 
b/be/src/exprs/table_function/vexplode_bitmap.cpp
index 741be03cab8..0956c6223a4 100644
--- a/be/src/exprs/table_function/vexplode_bitmap.cpp
+++ b/be/src/exprs/table_function/vexplode_bitmap.cpp
@@ -76,8 +76,8 @@ void 
VExplodeBitmapTableFunction::get_same_many_values(MutableColumnPtr& column,
             assert_cast<ColumnInt64*>(
                     
assert_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get())
                     ->insert_many_vals(**_cur_iter, length);
-            assert_cast<ColumnUInt8*>(
-                    
assert_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+            assert_cast<ColumnNullable*>(column.get())
+                    ->get_null_map_column_ptr()
                     ->insert_many_defaults(length);
         } else {
             
assert_cast<ColumnInt64*>(column.get())->insert_many_vals(**_cur_iter, length);
@@ -115,8 +115,8 @@ int 
VExplodeBitmapTableFunction::get_value(MutableColumnPtr& column, int max_ste
         if (_is_nullable) {
             target = assert_cast<ColumnInt64*>(
                     
assert_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get());
-            assert_cast<ColumnUInt8*>(
-                    
assert_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+            assert_cast<ColumnNullable*>(column.get())
+                    ->get_null_map_column_ptr()
                     ->insert_many_defaults(max_step);
         } else {
             target = assert_cast<ColumnInt64*>(column.get());
diff --git a/be/src/exprs/table_function/vexplode_json_object.cpp 
b/be/src/exprs/table_function/vexplode_json_object.cpp
index f2f927a2212..bea8378c03f 100644
--- a/be/src/exprs/table_function/vexplode_json_object.cpp
+++ b/be/src/exprs/table_function/vexplode_json_object.cpp
@@ -108,8 +108,8 @@ void 
VExplodeJsonObjectTableFunction::get_same_many_values(MutableColumnPtr& col
         // make map kv value into struct
         ret = assert_cast<ColumnStruct*>(
                 
assert_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get());
-        assert_cast<ColumnUInt8*>(
-                
assert_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+        assert_cast<ColumnNullable*>(column.get())
+                ->get_null_map_column_ptr()
                 ->insert_many_defaults(length);
     } else if (is_column<ColumnStruct>(column.get())) {
         ret = assert_cast<ColumnStruct*>(column.get());
@@ -138,8 +138,7 @@ int 
VExplodeJsonObjectTableFunction::get_value(MutableColumnPtr& column, int max
             auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
             struct_column =
                     
assert_cast<ColumnStruct*>(nullable_column->get_nested_column_ptr().get());
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
             // here nullmap_column insert max_step many defaults as if 
MAP[row_idx] is NULL
             // will be not update value, _cur_size = 0, means current_empty;
             // so here could insert directly
diff --git a/be/src/exprs/table_function/vexplode_map.cpp 
b/be/src/exprs/table_function/vexplode_map.cpp
index 9451693cf8f..09c9007f95b 100644
--- a/be/src/exprs/table_function/vexplode_map.cpp
+++ b/be/src/exprs/table_function/vexplode_map.cpp
@@ -102,8 +102,8 @@ void 
VExplodeMapTableFunction::get_same_many_values(MutableColumnPtr& column, in
         // make map kv value into struct
         ret = assert_cast<ColumnStruct*>(
                 
assert_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get());
-        assert_cast<ColumnUInt8*>(
-                
assert_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+        assert_cast<ColumnNullable*>(column.get())
+                ->get_null_map_column_ptr()
                 ->insert_many_defaults(length);
     } else if (is_column<ColumnStruct>(column.get())) {
         ret = assert_cast<ColumnStruct*>(column.get());
@@ -132,8 +132,7 @@ int VExplodeMapTableFunction::get_value(MutableColumnPtr& 
column, int max_step)
             auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
             struct_column =
                     
assert_cast<ColumnStruct*>(nullable_column->get_nested_column_ptr().get());
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
             // here nullmap_column insert max_step many defaults as if 
MAP[row_idx] is NULL
             // will be not update value, _cur_size = 0, means current_empty;
             // so here could insert directly
diff --git a/be/src/exprs/table_function/vexplode_numbers.cpp 
b/be/src/exprs/table_function/vexplode_numbers.cpp
index cb7997fd2de..43a93ffe877 100644
--- a/be/src/exprs/table_function/vexplode_numbers.cpp
+++ b/be/src/exprs/table_function/vexplode_numbers.cpp
@@ -104,8 +104,8 @@ void 
VExplodeNumbersTableFunction::get_same_many_values(MutableColumnPtr& column
             assert_cast<ColumnInt32*>(
                     
assert_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get())
                     ->insert_many_vals(static_cast<int32_t>(_cur_offset), 
length);
-            assert_cast<ColumnUInt8*>(
-                    
assert_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+            assert_cast<ColumnNullable*>(column.get())
+                    ->get_null_map_column_ptr()
                     ->insert_many_defaults(length);
         } else {
             assert_cast<ColumnInt32*>(column.get())
diff --git a/be/src/exprs/table_function/vexplode_numbers.h 
b/be/src/exprs/table_function/vexplode_numbers.h
index d09f4aac196..4108416bb70 100644
--- a/be/src/exprs/table_function/vexplode_numbers.h
+++ b/be/src/exprs/table_function/vexplode_numbers.h
@@ -46,11 +46,11 @@ public:
         max_step = std::min(max_step, (int)(_cur_size - _cur_offset));
         if (_is_const) {
             if (_is_nullable) {
-                static_cast<ColumnInt32*>(
-                        
static_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get())
+                assert_cast<ColumnInt32*>(
+                        
assert_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get())
                         ->insert_range_from(*_elements_column, _cur_offset, 
max_step);
-                static_cast<ColumnUInt8*>(
-                        
static_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+                assert_cast<ColumnNullable*>(column.get())
+                        ->get_null_map_column_ptr()
                         ->insert_many_defaults(max_step);
             } else {
                 static_cast<ColumnInt32*>(column.get())
@@ -67,9 +67,8 @@ public:
                     target = 
assert_cast<ColumnInt32*>(assert_cast<ColumnNullable*>(column.get())
                                                                
->get_nested_column_ptr()
                                                                .get());
-                    
assert_cast<ColumnUInt8*>(assert_cast<ColumnNullable*>(column.get())
-                                                      
->get_null_map_column_ptr()
-                                                      .get())
+                    assert_cast<ColumnNullable*>(column.get())
+                            ->get_null_map_column_ptr()
                             ->insert_many_defaults(max_step);
                 } else {
                     target = assert_cast<ColumnInt32*>(column.get());
diff --git a/be/src/exprs/table_function/vexplode_v2.cpp 
b/be/src/exprs/table_function/vexplode_v2.cpp
index b3c6d0b26c4..431fb6fb535 100644
--- a/be/src/exprs/table_function/vexplode_v2.cpp
+++ b/be/src/exprs/table_function/vexplode_v2.cpp
@@ -165,8 +165,7 @@ void 
VExplodeV2TableFunction::get_same_many_values(MutableColumnPtr& column, int
             auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
             struct_column =
                     
assert_cast<ColumnStruct*>(nullable_column->get_nested_column_ptr().get());
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
             nullmap_column->insert_many_defaults(length);
 
         } else {
@@ -194,8 +193,7 @@ void 
VExplodeV2TableFunction::get_same_many_values(MutableColumnPtr& column, int
             struct_field.insert_many_defaults(length);
         } else {
             auto* nullable_column = 
assert_cast<ColumnNullable*>(struct_field.get_ptr().get());
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
             // only need to check if the value at position pos is null
             if (element_size < _cur_offset ||
                 (detail.nested_nullmap_data && 
detail.nested_nullmap_data[pos])) {
@@ -225,8 +223,7 @@ int VExplodeV2TableFunction::get_value(MutableColumnPtr& 
column, int max_step) {
                 auto* nullable_column = 
assert_cast<ColumnNullable*>(column.get());
                 struct_column =
                         
assert_cast<ColumnStruct*>(nullable_column->get_nested_column_ptr().get());
-                auto* nullmap_column =
-                        
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+                auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
                 nullmap_column->insert_many_defaults(max_step);
 
             } else {
@@ -255,8 +252,7 @@ int VExplodeV2TableFunction::get_value(MutableColumnPtr& 
column, int max_step) {
                 struct_field.insert_many_defaults(max_step);
             } else {
                 auto* nullable_column = 
assert_cast<ColumnNullable*>(struct_field.get_ptr().get());
-                auto* nullmap_column =
-                        
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+                auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
                 if (element_size >= _cur_offset + max_step) {
                     
nullable_column->get_nested_column_ptr()->insert_range_from(*detail.nested_col,
                                                                                
 pos, max_step);
diff --git a/be/src/exprs/table_function/vjson_each.cpp 
b/be/src/exprs/table_function/vjson_each.cpp
index 741caed6965..649e6f5f19e 100644
--- a/be/src/exprs/table_function/vjson_each.cpp
+++ b/be/src/exprs/table_function/vjson_each.cpp
@@ -163,8 +163,7 @@ void 
VJsonEachTableFunction<TEXT_MODE>::get_same_many_values(MutableColumnPtr& c
     if (_is_nullable) {
         auto* nullable = assert_cast<ColumnNullable*>(column.get());
         ret = 
assert_cast<ColumnStruct*>(nullable->get_nested_column_ptr().get());
-        assert_cast<ColumnUInt8*>(nullable->get_null_map_column_ptr().get())
-                ->insert_many_defaults(length);
+        nullable->get_null_map_column_ptr()->insert_many_defaults(length);
     } else {
         ret = assert_cast<ColumnStruct*>(column.get());
     }
@@ -185,8 +184,7 @@ int 
VJsonEachTableFunction<TEXT_MODE>::get_value(MutableColumnPtr& column, int m
         if (_is_nullable) {
             auto* nullable_col = assert_cast<ColumnNullable*>(column.get());
             struct_col = 
assert_cast<ColumnStruct*>(nullable_col->get_nested_column_ptr().get());
-            
assert_cast<ColumnUInt8*>(nullable_col->get_null_map_column_ptr().get())
-                    ->insert_many_defaults(max_step);
+            
nullable_col->get_null_map_column_ptr()->insert_many_defaults(max_step);
         } else {
             struct_col = assert_cast<ColumnStruct*>(column.get());
         }
diff --git a/be/src/exprs/vcase_expr.h b/be/src/exprs/vcase_expr.h
index bab1ad48ac2..7c29c5bc65d 100644
--- a/be/src/exprs/vcase_expr.h
+++ b/be/src/exprs/vcase_expr.h
@@ -277,10 +277,7 @@ private:
                     continue;
                 }
                 const auto* __restrict cond_raw_nullmap =
-                        assert_cast<const ColumnUInt8*, 
TypeCheckOnRelease::DISABLE>(
-                                
column_nullable_ptr->get_null_map_column_ptr().get())
-                                ->get_data()
-                                .data();
+                        
column_nullable_ptr->get_null_map_column_ptr()->get_data().data();
                 for (int row_idx = 0; row_idx < rows_count; row_idx++) {
                     then_idx_ptr[row_idx] |= (!then_idx_ptr[row_idx] * 
cond_raw_data[row_idx] *
                                               !cond_raw_nullmap[row_idx]) *
diff --git a/be/src/exprs/vcompound_pred.h b/be/src/exprs/vcompound_pred.h
index 3796cc3dfb6..45759e2639b 100644
--- a/be/src/exprs/vcompound_pred.h
+++ b/be/src/exprs/vcompound_pred.h
@@ -406,10 +406,7 @@ private:
                     assert_cast<const 
ColumnUInt8*>(nullable_column->get_nested_column_ptr().get())
                             ->get_data()
                             .data();
-            auto* null_map = assert_cast<const ColumnUInt8*>(
-                                     
nullable_column->get_null_map_column_ptr().get())
-                                     ->get_data()
-                                     .data();
+            auto* null_map = 
nullable_column->get_null_map_column_ptr()->get_data().data();
             return std::make_pair(data_column, null_map);
         } else {
             auto* data_column = assert_cast<const 
ColumnUInt8*>(column.get())->get_data().data();
diff --git a/be/src/exprs/vcondition_expr.cpp b/be/src/exprs/vcondition_expr.cpp
index 729fcd352e2..becd215c78c 100644
--- a/be/src/exprs/vcondition_expr.cpp
+++ b/be/src/exprs/vcondition_expr.cpp
@@ -194,7 +194,7 @@ Status VectorizedIfExpr::execute_for_null_then_else(Block& 
block,
         return Status::OK();
     }
 
-    const auto* cond_col = typeid_cast<const 
ColumnUInt8*>(arg_cond.column.get());
+    const auto* cond_col = 
check_and_get_column<ColumnUInt8>(arg_cond.column.get());
     const ColumnConst* cond_const_col =
             check_and_get_column_const<ColumnUInt8>(arg_cond.column.get());
 
@@ -311,17 +311,23 @@ Status 
VectorizedIfExpr::execute_for_nullable_then_else(Block& block,
         // b. create a const_nullmap_column: it's a not nullable column or a 
const nullable column, contain a const value
         Block temporary_block;
         temporary_block.insert(arg_cond);
-        auto then_nested_null_map =
-                (then_type_is_nullable && !then_column_is_const_nullable)
-                        ? then_is_nullable->get_null_map_column_ptr()
-                        : 
DataTypeUInt8().create_column_const_with_default_value(input_rows_count);
+        ColumnPtr then_nested_null_map;
+        if (then_type_is_nullable && !then_column_is_const_nullable) {
+            then_nested_null_map = then_is_nullable->get_null_map_column_ptr();
+        } else {
+            then_nested_null_map =
+                    
DataTypeUInt8().create_column_const_with_default_value(input_rows_count);
+        }
         temporary_block.insert(
                 {then_nested_null_map, std::make_shared<DataTypeUInt8>(), 
"then_column_null_map"});
 
-        auto else_nested_null_map =
-                (else_type_is_nullable && !else_column_is_const_nullable)
-                        ? else_is_nullable->get_null_map_column_ptr()
-                        : 
DataTypeUInt8().create_column_const_with_default_value(input_rows_count);
+        ColumnPtr else_nested_null_map;
+        if (else_type_is_nullable && !else_column_is_const_nullable) {
+            else_nested_null_map = else_is_nullable->get_null_map_column_ptr();
+        } else {
+            else_nested_null_map =
+                    
DataTypeUInt8().create_column_const_with_default_value(input_rows_count);
+        }
         temporary_block.insert(
                 {else_nested_null_map, std::make_shared<DataTypeUInt8>(), 
"else_column_null_map"});
         temporary_block.insert(
@@ -775,4 +781,4 @@ Status VectorizedCoalesceExpr::execute_column(VExprContext* 
context, const Block
     return Status::OK();
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/storage/segment/column_reader.cpp 
b/be/src/storage/segment/column_reader.cpp
index 6ef3f861bb5..9ca528cc122 100644
--- a/be/src/storage/segment/column_reader.cpp
+++ b/be/src/storage/segment/column_reader.cpp
@@ -955,7 +955,13 @@ Status MapFileColumnIterator::next_batch(size_t* n, 
MutableColumnPtr& dst, bool*
     auto& column_map = assert_cast<ColumnMap&, TypeCheckOnRelease::DISABLE>(
             dst->is_nullable() ? 
static_cast<ColumnNullable&>(*dst).get_nested_column() : *dst);
     auto column_offsets_ptr = 
IColumn::mutate(std::move(column_map.get_offsets_ptr()));
-    Defer defer_offsets {[&] { column_map.get_offsets_ptr() = 
std::move(column_offsets_ptr); }};
+    Defer defer_offsets {[&] {
+        auto typed_column_offsets_ptr = 
ColumnMap::COffsets::cast_to_column_mutptr(
+                assert_cast<ColumnMap::COffsets*, TypeCheckOnRelease::DISABLE>(
+                        column_offsets_ptr.get()));
+        column_offsets_ptr = nullptr;
+        column_map.get_offsets_ptr() = std::move(typed_column_offsets_ptr);
+    }};
     bool offsets_has_null = false;
     ssize_t start = column_offsets_ptr->size();
     RETURN_IF_ERROR(_offsets_iterator->next_batch(n, column_offsets_ptr, 
&offsets_has_null));
@@ -990,11 +996,11 @@ Status MapFileColumnIterator::next_batch(size_t* n, 
MutableColumnPtr& dst, bool*
         // if so, we should set null_map to all null by default
         if (_null_iterator) {
             bool null_signs_has_null = false;
+            MutableColumnPtr null_map_column = std::move(null_map_ptr);
             RETURN_IF_ERROR(
-                    _null_iterator->next_batch(&num_read, null_map_ptr, 
&null_signs_has_null));
+                    _null_iterator->next_batch(&num_read, null_map_column, 
&null_signs_has_null));
         } else {
-            auto& null_map = assert_cast<ColumnUInt8&, 
TypeCheckOnRelease::DISABLE>(*null_map_ptr);
-            null_map.insert_many_vals(0, num_read);
+            null_map_ptr->insert_many_vals(0, num_read);
         }
         DCHECK(num_read == *n);
     }
@@ -1015,7 +1021,12 @@ Status MapFileColumnIterator::read_by_rowids(const 
rowid_t* rowids, const size_t
     auto& column_map = assert_cast<ColumnMap&, TypeCheckOnRelease::DISABLE>(
             dst->is_nullable() ? 
static_cast<ColumnNullable&>(*dst).get_nested_column() : *dst);
     auto offsets_ptr = 
IColumn::mutate(std::move(column_map.get_offsets_ptr()));
-    Defer defer_offsets {[&] { column_map.get_offsets_ptr() = 
std::move(offsets_ptr); }};
+    Defer defer_offsets {[&] {
+        auto typed_offsets_ptr = ColumnMap::COffsets::cast_to_column_mutptr(
+                assert_cast<ColumnMap::COffsets*, 
TypeCheckOnRelease::DISABLE>(offsets_ptr.get()));
+        offsets_ptr = nullptr;
+        column_map.get_offsets_ptr() = std::move(typed_offsets_ptr);
+    }};
     auto& offsets = static_cast<ColumnArray::ColumnOffsets&>(*offsets_ptr);
     size_t base = offsets.get_data().empty() ? 0 : offsets.get_data().back();
 
@@ -1029,12 +1040,13 @@ Status MapFileColumnIterator::read_by_rowids(const 
rowid_t* rowids, const size_t
         }
         auto null_map_ptr = 
static_cast<ColumnNullable&>(*dst).get_null_map_column_ptr();
         size_t null_before = null_map_ptr->size();
-        RETURN_IF_ERROR(_null_iterator->read_by_rowids(rowids, count, 
null_map_ptr));
+        auto* null_map_col = null_map_ptr.get();
+        MutableColumnPtr null_map_column = std::move(null_map_ptr);
+        RETURN_IF_ERROR(_null_iterator->read_by_rowids(rowids, count, 
null_map_column));
         // extract a light-weight view to decide element reads
-        auto& null_map_col = assert_cast<ColumnUInt8&>(*null_map_ptr);
         null_mask.reserve(count);
         for (size_t i = 0; i < count; ++i) {
-            null_mask.push_back(null_map_col.get_element(null_before + i));
+            null_mask.push_back(null_map_col->get_element(null_before + i));
         }
     } else if (dst->is_nullable()) {
         // in not-null to null linked-schemachange mode,
@@ -1042,8 +1054,7 @@ Status MapFileColumnIterator::read_by_rowids(const 
rowid_t* rowids, const size_t
         // so may dst from changed meta which is nullable but old data is not 
nullable,
         // if so, we should set null_map to all null by default
         auto null_map_ptr = 
static_cast<ColumnNullable&>(*dst).get_null_map_column_ptr();
-        auto& null_map = assert_cast<ColumnUInt8&>(*null_map_ptr);
-        null_map.insert_many_vals(0, count);
+        null_map_ptr->insert_many_vals(0, count);
     }
 
     // 2. bulk read start ordinals for requested rows
@@ -1310,11 +1321,11 @@ Status StructFileColumnIterator::next_batch(size_t* n, 
MutableColumnPtr& dst, bo
         // if so, we should set null_map to all null by default
         if (_null_iterator) {
             bool null_signs_has_null = false;
+            MutableColumnPtr null_map_column = std::move(null_map_ptr);
             RETURN_IF_ERROR(
-                    _null_iterator->next_batch(&num_read, null_map_ptr, 
&null_signs_has_null));
+                    _null_iterator->next_batch(&num_read, null_map_column, 
&null_signs_has_null));
         } else {
-            auto& null_map = assert_cast<ColumnUInt8&, 
TypeCheckOnRelease::DISABLE>(*null_map_ptr);
-            null_map.insert_many_vals(0, num_read);
+            null_map_ptr->insert_many_vals(0, num_read);
         }
         DCHECK(num_read == *n);
     }
@@ -1602,8 +1613,14 @@ Status ArrayFileColumnIterator::next_batch(size_t* n, 
MutableColumnPtr& dst, boo
             dst->is_nullable() ? 
static_cast<ColumnNullable&>(*dst).get_nested_column() : *dst);
 
     bool offsets_has_null = false;
-    auto column_offsets_ptr = 
IColumn::mutate(std::move(column_array.get_offsets_ptr()));
-    Defer defer_offsets {[&] { column_array.get_offsets_ptr() = 
std::move(column_offsets_ptr); }};
+    auto column_offsets_ptr = 
std::move(*column_array.get_offsets_ptr()).mutate();
+    Defer defer_offsets {[&] {
+        auto typed_column_offsets_ptr = 
ColumnArray::ColumnOffsets::cast_to_column_mutptr(
+                assert_cast<ColumnArray::ColumnOffsets*, 
TypeCheckOnRelease::DISABLE>(
+                        column_offsets_ptr.get()));
+        column_offsets_ptr = nullptr;
+        column_array.get_offsets_ptr() = std::move(typed_column_offsets_ptr);
+    }};
     ssize_t start = column_offsets_ptr->size();
     RETURN_IF_ERROR(_offset_iterator->next_batch(n, column_offsets_ptr, 
&offsets_has_null));
     if (*n == 0) {
@@ -1631,11 +1648,11 @@ Status ArrayFileColumnIterator::next_batch(size_t* n, 
MutableColumnPtr& dst, boo
         // if so, we should set null_map to all null by default
         if (_null_iterator) {
             bool null_signs_has_null = false;
+            MutableColumnPtr null_map_column = std::move(null_map_ptr);
             RETURN_IF_ERROR(
-                    _null_iterator->next_batch(&num_read, null_map_ptr, 
&null_signs_has_null));
+                    _null_iterator->next_batch(&num_read, null_map_column, 
&null_signs_has_null));
         } else {
-            auto& null_map = assert_cast<ColumnUInt8&, 
TypeCheckOnRelease::DISABLE>(*null_map_ptr);
-            null_map.insert_many_vals(0, num_read);
+            null_map_ptr->insert_many_vals(0, num_read);
         }
         DCHECK(num_read == *n);
     }
diff --git a/be/test/core/column/column_array_test.cpp 
b/be/test/core/column/column_array_test.cpp
index ab0984a02b2..d5c483e6b86 100644
--- a/be/test/core/column/column_array_test.cpp
+++ b/be/test/core/column/column_array_test.cpp
@@ -995,8 +995,8 @@ TEST_F(ColumnArrayTest, ArrayTypeTesterase) {
     // std::cout << tmp2.dump_data(0, tmp2.rows());
     auto* column_result = assert_cast<ColumnArray*>(column_res.get());
     auto& column_offsets_res = column_result->get_offsets_column();
-    auto& offset_data_res = assert_cast<ColumnOffset64&>(column_offsets_res);
-    auto& offset_data = assert_cast<ColumnOffset64&>(column_offsets);
+    auto& offset_data_res = column_offsets_res;
+    auto& offset_data = column_offsets;
     auto& column_data_res = assert_cast<ColumnInt32&>(
             
assert_cast<ColumnNullable&>(column_result->get_data()).get_nested_column());
     auto& column_data_origin = assert_cast<ColumnInt32&>(
@@ -1060,8 +1060,8 @@ TEST_F(ColumnArrayTest, ArrayTypeTest2erase) {
 
     auto* column_result = assert_cast<ColumnArray*>(column_res.get());
     auto& column_offsets_res = column_result->get_offsets_column();
-    auto& offset_data_res = assert_cast<ColumnOffset64&>(column_offsets_res);
-    auto& offset_data = assert_cast<ColumnOffset64&>(column_offsets);
+    auto& offset_data_res = column_offsets_res;
+    auto& offset_data = column_offsets;
     auto& column_data_res = assert_cast<ColumnInt32&>(
             
assert_cast<ColumnNullable&>(column_result->get_data()).get_nested_column());
     auto& column_data_origin = assert_cast<ColumnInt32&>(
diff --git a/be/test/core/column/column_map_test.cpp 
b/be/test/core/column/column_map_test.cpp
index 4ac21457eff..9e47be872aa 100644
--- a/be/test/core/column/column_map_test.cpp
+++ b/be/test/core/column/column_map_test.cpp
@@ -105,8 +105,8 @@ TEST_F(ColumnMapTest, MapTypeTesterase) {
     EXPECT_EQ(column_map->size(), 2);
     auto* column_result = assert_cast<ColumnMap*>(column_res.get());
     auto& column_offsets_res = column_result->get_offsets_column();
-    auto& offset_data_res = assert_cast<ColumnOffset64&>(column_offsets_res);
-    auto& offset_data = assert_cast<ColumnOffset64&>(column_offsets);
+    auto& offset_data_res = column_offsets_res;
+    auto& offset_data = column_offsets;
 
     auto& column_data_res = assert_cast<ColumnInt64&>(
             
assert_cast<ColumnNullable&>(assert_cast<ColumnMap&>(*column_res).get_values())
@@ -196,8 +196,8 @@ TEST_F(ColumnMapTest, MapTypeTest2erase) {
     EXPECT_EQ(column_map->size(), 3);
     auto* column_result = assert_cast<ColumnMap*>(column_res.get());
     auto& column_offsets_res = column_result->get_offsets_column();
-    auto& offset_data_res = assert_cast<ColumnOffset64&>(column_offsets_res);
-    auto& offset_data = assert_cast<ColumnOffset64&>(column_offsets);
+    auto& offset_data_res = column_offsets_res;
+    auto& offset_data = column_offsets;
 
     auto& column_data_res = assert_cast<ColumnInt64&>(
             
assert_cast<ColumnNullable&>(assert_cast<ColumnMap&>(*column_res).get_values())
diff --git a/be/test/exec/operator/table_function_operator_test.cpp 
b/be/test/exec/operator/table_function_operator_test.cpp
index a52cbb3a710..24217b7eb8d 100644
--- a/be/test/exec/operator/table_function_operator_test.cpp
+++ b/be/test/exec/operator/table_function_operator_test.cpp
@@ -145,8 +145,7 @@ public:
             auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
             
nullable_column->get_nested_column_ptr()->insert_range_from(*_detail.nested_col,
 pos,
                                                                         
max_step);
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
             const size_t old_size = nullmap_column->size();
             nullmap_column->resize(old_size + max_step);
             if (_detail.nested_nullmap_data != nullptr) {
diff --git a/be/test/exprs/function/function_test_util.cpp 
b/be/test/exprs/function/function_test_util.cpp
index d1a2a24df1c..b04dc5944fd 100644
--- a/be/test/exprs/function/function_test_util.cpp
+++ b/be/test/exprs/function/function_test_util.cpp
@@ -390,8 +390,7 @@ bool insert_cell(MutableColumnPtr& column, DataTypePtr 
type_ptr, const AnyType&
         auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
         auto col_type = remove_nullable(type_ptr);
         auto col = nullable_column->get_nested_column_ptr();
-        auto* nullmap_column =
-                
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+        auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
         bool ok = insert_cell(col, col_type, cell, datetime_is_string_format);
         nullmap_column->insert_value(ok ? 0 : 1);
     } else {
@@ -553,8 +552,7 @@ bool insert_cell(MutableColumnPtr& column, DataTypePtr 
type_ptr, const AnyType&
             auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
             auto* struct_column =
                     
assert_cast<ColumnStruct*>(nullable_column->get_nested_column_ptr().get());
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
+            auto* nullmap_column = 
nullable_column->get_null_map_column_ptr().get();
             nullmap_column->insert_default();
             for (size_t i = 0; i < v.size(); ++i) {
                 auto& field = v[i];
diff --git a/be/test/format/table/hive/hive_reader_test.cpp 
b/be/test/format/table/hive/hive_reader_test.cpp
index 231cfaebac1..93d8c4221a8 100644
--- a/be/test/format/table/hive/hive_reader_test.cpp
+++ b/be/test/format/table/hive/hive_reader_test.cpp
@@ -412,7 +412,8 @@ protected:
                     EXPECT_GT(col->size(), 0) << name << " column/subcolumn 
size should be > 0";
 
                     // Check if it's a nullable column
-                    if (const auto* nullable_col = typeid_cast<const 
ColumnNullable*>(col.get())) {
+                    if (const auto* nullable_col =
+                                
check_and_get_column<ColumnNullable>(col.get())) {
                         auto nested_type =
                                 assert_cast<const 
DataTypeNullable*>(type.get())->get_nested_type();
 
@@ -429,7 +430,8 @@ protected:
                                           nested_name, depth + 
(is_complex_type ? 1 : 0));
                     }
                     // Check if it's a struct column
-                    else if (const auto* struct_col = typeid_cast<const 
ColumnStruct*>(col.get())) {
+                    else if (const auto* struct_col =
+                                     
check_and_get_column<ColumnStruct>(col.get())) {
                         auto struct_type = assert_cast<const 
DataTypeStruct*>(type.get());
                         for (size_t i = 0; i < struct_col->tuple_size(); ++i) {
                             std::string field_name = 
struct_type->get_element_name(i);
@@ -439,7 +441,7 @@ protected:
                         }
                     }
                     // Check if it's an array column
-                    else if (const auto* array_col = typeid_cast<const 
ColumnArray*>(col.get())) {
+                    else if (const auto* array_col = 
check_and_get_column<ColumnArray>(col.get())) {
                         auto array_type = assert_cast<const 
DataTypeArray*>(type.get());
                         auto element_type = array_type->get_nested_type();
                         print_column_rows(array_col->get_data_ptr(), 
element_type, name + ".data",
diff --git a/be/test/format/table/iceberg/iceberg_reader_test.cpp 
b/be/test/format/table/iceberg/iceberg_reader_test.cpp
index a88709f1bb6..a7fa2c85dad 100644
--- a/be/test/format/table/iceberg/iceberg_reader_test.cpp
+++ b/be/test/format/table/iceberg/iceberg_reader_test.cpp
@@ -476,7 +476,8 @@ protected:
                     EXPECT_GT(col->size(), 0) << name << " column/subcolumn 
size should be > 0";
 
                     // Check if it's a nullable column
-                    if (const auto* nullable_col = typeid_cast<const 
ColumnNullable*>(col.get())) {
+                    if (const auto* nullable_col =
+                                
check_and_get_column<ColumnNullable>(col.get())) {
                         auto nested_type =
                                 assert_cast<const 
DataTypeNullable*>(type.get())->get_nested_type();
 
@@ -493,7 +494,8 @@ protected:
                                           nested_name, depth + 
(is_complex_type ? 1 : 0));
                     }
                     // Check if it's a struct column
-                    else if (const auto* struct_col = typeid_cast<const 
ColumnStruct*>(col.get())) {
+                    else if (const auto* struct_col =
+                                     
check_and_get_column<ColumnStruct>(col.get())) {
                         auto struct_type = assert_cast<const 
DataTypeStruct*>(type.get());
                         for (size_t i = 0; i < struct_col->tuple_size(); ++i) {
                             std::string field_name = 
struct_type->get_element_name(i);
@@ -503,7 +505,7 @@ protected:
                         }
                     }
                     // Check if it's an array column
-                    else if (const auto* array_col = typeid_cast<const 
ColumnArray*>(col.get())) {
+                    else if (const auto* array_col = 
check_and_get_column<ColumnArray>(col.get())) {
                         auto array_type = assert_cast<const 
DataTypeArray*>(type.get());
                         auto element_type = array_type->get_nested_type();
                         print_column_rows(array_col->get_data_ptr(), 
element_type, name + ".data",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to