github-actions[bot] commented on code in PR #63001:
URL: https://github.com/apache/doris/pull/63001#discussion_r3278167125


##########
be/src/storage/iterator/vcollect_iterator.cpp:
##########
@@ -292,7 +292,7 @@ Status VCollectIterator::_topn_next(Block* block) {
             }
         }
     }
-    MutableBlock mutable_block = 
MutableBlock::build_mutable_block(&clone_block);
+    MutableBlock mutable_block = 
MutableBlock::build_mutable_block(std::move(clone_block));
 

Review Comment:
   `clone_block` is consumed here, but the loop below still checks `column_idx 
< clone_block.columns()`. After the move, `clone_block` is a moved-from/empty 
block, so any non-empty `read_orderby_key_columns` can trip the check even 
though the mutable block was built from the correct schema. Please capture the 
column count before moving, or validate against `mutable_block.columns()` 
instead.



##########
be/src/exec/operator/aggregation_sink_operator.cpp:
##########
@@ -299,16 +299,20 @@ Status 
AggSinkLocalState::_merge_with_serialized_key_helper(Block* block) {
 
     for (int i = 0; i < key_size; ++i) {
         if constexpr (for_spill) {
-            key_columns[i] = block->get_by_position(i).column.get();
             key_locs[i] = i;
         } else {
             int& result_column_id = key_locs[i];
             RETURN_IF_ERROR(
                     Base::_shared_state->probe_expr_ctxs[i]->execute(block, 
&result_column_id));
             block->replace_by_position_if_const(result_column_id);
-            key_columns[i] = 
block->get_by_position(result_column_id).column.get();
         }
-        key_columns[i]->assume_mutable()->replace_float_special_values();
+        {
+            auto mutable_col =
+                    
IColumn::mutate(std::move(block->get_by_position(key_locs[i]).column));
+            mutable_col->replace_float_special_values();

Review Comment:
   This sink-side key normalization consumes the live input block slot before 
`replace_float_special_values()` is guaranteed to finish. If detaching a 
shared/complex key column or normalizing nested values throws, the aggregation 
sink returns/unwinds with that input slot moved-from. This is a separate path 
from the existing aggregation source output comments; please mutate through a 
scoped column guard or detach from a copied owner and replace only after 
normalization succeeds.



##########
be/src/exec/operator/distinct_streaming_aggregation_operator.cpp:
##########
@@ -162,7 +162,13 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
                     in_block->get_by_position(result_column_id)
                             .column->convert_to_full_column_if_const();
             key_columns[i] = 
in_block->get_by_position(result_column_id).column.get();
-            key_columns[i]->assume_mutable()->replace_float_special_values();
+            {
+                auto mutable_col = IColumn::mutate(
+                        
std::move(in_block->get_by_position(result_column_id).column));
+                mutable_col->replace_float_special_values();

Review Comment:
   The distinct streaming pre-agg path has the same live-slot gap: it moves the 
expression result column out of `in_block`, then calls 
`replace_float_special_values()`. Any exception during COW detach or nested 
normalization leaves `in_block` with an empty slot. Please use a scoped 
mutation guard or commit the replacement only after the error-producing work 
succeeds.



##########
be/src/core/data_type/data_type_array.cpp:
##########
@@ -120,15 +120,18 @@ const char* DataTypeArray::deserialize(const char* buf, 
MutableColumnPtr* column
     buf = deserialize_const_flag_and_row_num(buf, column, 
&real_have_saved_num);
 
     auto* data_column = assert_cast<ColumnArray*>(origin_column);
-    auto& offsets = data_column->get_offsets();
 
     // offsets
+    auto offsets_column = std::move(*data_column->get_offsets_ptr()).mutate();
+    auto& offsets = 
assert_cast<ColumnArray::ColumnOffsets&>(*offsets_column).get_data();

Review Comment:
   `offsets_column` may be the live offsets subcolumn when the array is 
exclusive, and it is resized/copied before nested deserialization succeeds. If 
nested deserialization fails on malformed input or OOM, the `ColumnArray` keeps 
the new offsets with the old child data, violating the offsets/data-size 
invariant. Please build offsets and nested data in temporaries and publish both 
together only after all deserialization succeeds, or restore the original 
subcolumns on failure.



##########
be/src/format/json/new_json_reader.cpp:
##########
@@ -452,17 +452,38 @@ Status NewJsonReader::_get_range_params() {
     return Status::OK();
 }
 
-static Status ignore_malformed_json_append_null(Block& block) {
-    for (auto& column : block.get_columns()) {
-        if (!column->is_nullable()) [[unlikely]] {
+Status json_reader_detail::append_null_for_malformed_json(Block& block) {
+    for (int i = 0; i < block.columns(); ++i) {
+        auto& column_with_type = block.get_by_position(i);
+        if (!column_with_type.column->is_nullable()) [[unlikely]] {
             return Status::DataQualityError("malformed json, but the column 
`{}` is not nullable.",
-                                            column->get_name());
+                                            
column_with_type.column->get_name());
         }
-        
static_cast<ColumnNullable*>(column->assume_mutable().get())->insert_default();
+        auto column = IColumn::mutate(std::move(column_with_type.column));
+        assert_cast<ColumnNullable*>(column.get())->insert_default();

Review Comment:
   This helper now moves each live block slot out before `insert_default()` 
succeeds. With `openx_json_ignore_malformed`, an allocation failure while 
appending the null/default row skips the write-back and leaves the reusable 
`Block` with a moved-from column; earlier columns may also already have the 
extra row. The cleanup helpers added below have the same issue around 
`pop_back()`. Please use 
`block.mutate_column_scoped(i)`/`mutate_columns_scoped()` or otherwise restore 
the slot on every throwing exit.



##########
be/src/exec/operator/distinct_streaming_aggregation_operator.cpp:
##########
@@ -210,18 +216,22 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
             if (out_block->rows() + _distinct_row.size() > batch_size) {
                 size_t split_size = batch_size - out_block->rows();
                 for (int i = 0; i < key_size; ++i) {
-                    auto output_dst = 
out_block->get_by_position(i).column->assume_mutable();
+                    auto output_dst =
+                            
IColumn::mutate(std::move(out_block->get_by_position(i).column));
                     key_columns[i]->append_data_by_selector(output_dst, 
_distinct_row, 0,

Review Comment:
   In the split mem-reuse path, `out_block` is consumed before 
`append_data_by_selector()` finishes, and `_cache_block` is consumed before the 
second append finishes. If either append allocates and throws, one of the 
reusable blocks keeps a moved-from column. Please guard both destination slots 
and release/commit only after the corresponding append succeeds.



##########
be/src/exec/operator/join/process_hash_table_probe_impl.h:
##########
@@ -164,7 +164,10 @@ void 
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(MutableColumns&
     for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
         if (_left_output_slot_flags[i]) {
             if (_parent_operator->need_finalize_variant_column()) {
-                
std::move(*probe_block.get_by_position(i).column).mutate()->finalize();
+                auto mutable_column =
+                        
IColumn::mutate(std::move(probe_block.get_by_position(i).column));
+                mutable_column->finalize();

Review Comment:
   Probe-side variant finalization now moves the reusable probe block slot out 
before `finalize()` succeeds. If variant finalization allocates or throws, 
`_probe_block` retains a moved-from slot. This is distinct from the existing 
build-side finalize thread; please use a scoped column mutation or finalize a 
detached temporary and assign back only after success.



##########
be/src/core/data_type/data_type_map.cpp:
##########
@@ -129,16 +129,20 @@ const char* DataTypeMap::deserialize(const char* buf, 
MutableColumnPtr* column,
     buf = deserialize_const_flag_and_row_num(buf, column, 
&real_have_saved_num);
 
     auto* map_column = assert_cast<ColumnMap*>(origin_column);
-    auto& map_offsets = map_column->get_offsets();
     // offsets
+    auto offsets_column = std::move(*map_column->get_offsets_ptr()).mutate();
+    auto& map_offsets = 
assert_cast<ColumnMap::COffsets&>(*offsets_column).get_data();

Review Comment:
   The map deserializer can now partially mutate live subcolumns: offsets are 
resized/copied before key/value deserialization completes, and keys may be 
updated before values fail. On an error/OOM path this leaves offsets, keys, and 
values out of sync. Please deserialize into temporary owners and publish 
offsets/keys/values atomically after all three parts succeed, or install a 
restore guard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to