github-actions[bot] commented on code in PR #60978: URL: https://github.com/apache/doris/pull/60978#discussion_r2876960788
########## be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp: ########## @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/iceberg/viceberg_sort_writer.h" + +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "vec/spill/spill_stream.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris::vectorized { + +Status VIcebergSortWriter::open(RuntimeState* state, RuntimeProfile* profile, + const RowDescriptor* row_desc) { + // row_desc is required for initializing sort expressions + DCHECK(row_desc != nullptr); + _runtime_state = state; + _profile = profile; + _row_desc = row_desc; + + // Initialize sort expressions from sort_info (contains ordering columns, asc/desc, nulls first/last) + RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool)); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc)); + RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + + // Create FullSorter for in-memory sorting with spill support enabled. + // Parameters: limit=-1 (no limit), offset=0 (no offset) + _sorter = vectorized::FullSorter::create_unique(_vsort_exec_exprs, -1, 0, &_pool, + _sort_info.is_asc_order, _sort_info.nulls_first, + *row_desc, state, _profile); + _sorter->init_profile(_profile); + // Enable spill support so the sorter can be used with the spill framework + _sorter->set_enable_spill(); + _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", TUnit::UNIT); + + // Open the underlying partition writer that handles actual file I/O + RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, row_desc)); + return Status::OK(); +} + +Status VIcebergSortWriter::write(vectorized::Block& block) { + // Append incoming block data to the sorter's internal buffer + RETURN_IF_ERROR(_sorter->append_block(&block)); + _update_spill_block_batch_row_count(block); + + // When accumulated data size reaches the target file size threshold, + // sort the data in memory and flush it directly to a Parquet/ORC file. + // This avoids holding too much data in memory before writing. + if (_sorter->data_size() >= _target_file_size_bytes) { + return _flush_to_file(); + } + + // If data size is below threshold, wait for more data. + // Note: trigger_spill() may be called externally by the memory management + // system if memory pressure is high. + return Status::OK(); +} + +Status VIcebergSortWriter::close(const Status& status) { + // Track the actual internal status of operations performed during close. + // This is important because if intermediate operations (like do_sort()) fail, + // we need to propagate the actual error status to the underlying partition writer's + // close() call, rather than the original status parameter which could be OK. + Status internal_status = Status::OK(); + + // Defer ensures the underlying partition writer is always closed and + // spill streams are cleaned up, regardless of whether intermediate operations succeed. + // Uses internal_status to propagate any errors that occurred during close operations. + Defer defer {[&]() { + // If any intermediate operation failed, pass that error to the partition writer; + // otherwise, pass the original status from the caller. + Status st = + _iceberg_partition_writer->close(internal_status.ok() ? status : internal_status); + if (!st.ok()) { + LOG(WARNING) << fmt::format("_iceberg_partition_writer close failed, reason: {}", + st.to_string()); Review Comment: **[Medium]** The Defer lambda catches the return status of `_iceberg_partition_writer->close()` but only logs a warning and discards it. If the underlying writer's close fails (e.g., a Parquet file flush error), the caller of `VIcebergSortWriter::close()` will receive `Status::OK()` (from line 140) while the actual file close error is silently lost. This could mask data loss—the caller thinks the write succeeded but the file was not properly closed. Consider propagating this error, e.g., by capturing it in a local variable and returning it from `close()`. Note: This was the same behavior in the original code before this PR, but since this PR explicitly aims to fix error handling in `close()` (item 4 in the PR description), it would be good to address this too. ########## be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp: ########## @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/iceberg/viceberg_sort_writer.h" + +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "vec/spill/spill_stream.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris::vectorized { + +Status VIcebergSortWriter::open(RuntimeState* state, RuntimeProfile* profile, + const RowDescriptor* row_desc) { + // row_desc is required for initializing sort expressions + DCHECK(row_desc != nullptr); + _runtime_state = state; + _profile = profile; + _row_desc = row_desc; + + // Initialize sort expressions from sort_info (contains ordering columns, asc/desc, nulls first/last) + RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool)); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc)); + RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + + // Create FullSorter for in-memory sorting with spill support enabled. + // Parameters: limit=-1 (no limit), offset=0 (no offset) + _sorter = vectorized::FullSorter::create_unique(_vsort_exec_exprs, -1, 0, &_pool, + _sort_info.is_asc_order, _sort_info.nulls_first, + *row_desc, state, _profile); + _sorter->init_profile(_profile); + // Enable spill support so the sorter can be used with the spill framework + _sorter->set_enable_spill(); + _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", TUnit::UNIT); + + // Open the underlying partition writer that handles actual file I/O + RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, row_desc)); + return Status::OK(); +} + +Status VIcebergSortWriter::write(vectorized::Block& block) { + // Append incoming block data to the sorter's internal buffer + RETURN_IF_ERROR(_sorter->append_block(&block)); + _update_spill_block_batch_row_count(block); + + // When accumulated data size reaches the target file size threshold, + // sort the data in memory and flush it directly to a Parquet/ORC file. + // This avoids holding too much data in memory before writing. + if (_sorter->data_size() >= _target_file_size_bytes) { + return _flush_to_file(); + } + + // If data size is below threshold, wait for more data. + // Note: trigger_spill() may be called externally by the memory management + // system if memory pressure is high. + return Status::OK(); +} + +Status VIcebergSortWriter::close(const Status& status) { + // Track the actual internal status of operations performed during close. + // This is important because if intermediate operations (like do_sort()) fail, + // we need to propagate the actual error status to the underlying partition writer's + // close() call, rather than the original status parameter which could be OK. + Status internal_status = Status::OK(); + + // Defer ensures the underlying partition writer is always closed and + // spill streams are cleaned up, regardless of whether intermediate operations succeed. + // Uses internal_status to propagate any errors that occurred during close operations. + Defer defer {[&]() { + // If any intermediate operation failed, pass that error to the partition writer; + // otherwise, pass the original status from the caller. + Status st = + _iceberg_partition_writer->close(internal_status.ok() ? status : internal_status); + if (!st.ok()) { + LOG(WARNING) << fmt::format("_iceberg_partition_writer close failed, reason: {}", + st.to_string()); + } + _cleanup_spill_streams(); + }}; + + // If the original status is already an error or the query is cancelled, + // skip all close operations and propagate the original error + if (!status.ok() || _runtime_state->is_cancelled()) { + return status; + } + + // If sorter was never initialized (e.g., no data was written), nothing to do + if (_sorter == nullptr) { + return Status::OK(); + } + + // Check if there is any remaining data in the sorter (either unsorted or already sorted blocks) + if (!_sorter->merge_sort_state()->unsorted_block()->empty() || + !_sorter->merge_sort_state()->get_sorted_block().empty()) { + if (_sorted_streams.empty()) { + // No spill has occurred, all data is in memory. + // Sort the remaining data, prepare for reading, and write to file. + internal_status = _sorter->do_sort(); + if (!internal_status.ok()) { + return internal_status; + } + internal_status = _sorter->prepare_for_read(false); + if (!internal_status.ok()) { + return internal_status; + } + internal_status = _write_sorted_data(); + return internal_status; + } + + // Some data has already been spilled to disk. + // Spill the remaining in-memory data to a new spill stream. + internal_status = _do_spill(); + if (!internal_status.ok()) { + return internal_status; + } + } + + // Merge all spilled streams using multi-way merge sort and output final sorted data to files + if (!_sorted_streams.empty()) { + internal_status = _combine_files_output(); + if (!internal_status.ok()) { + return internal_status; + } + } + + return Status::OK(); +} + +void VIcebergSortWriter::_update_spill_block_batch_row_count(const vectorized::Block& block) { + auto rows = block.rows(); + // Calculate average row size from the first non-empty block to determine + // the optimal batch size for spill operations + if (rows > 0 && 0 == _avg_row_bytes) { + _avg_row_bytes = std::max(1UL, block.bytes() / rows); + int64_t spill_batch_bytes = _runtime_state->spill_sort_batch_bytes(); // default 8MB + // Calculate how many rows fit in one spill batch (ceiling division) + _spill_block_batch_row_count = (spill_batch_bytes + _avg_row_bytes - 1) / _avg_row_bytes; + } +} + +Status VIcebergSortWriter::_flush_to_file() { + // Sort the accumulated data in memory + RETURN_IF_ERROR(_sorter->do_sort()); + // Prepare the sorted data for sequential reading (builds merge tree if needed) + RETURN_IF_ERROR(_sorter->prepare_for_read(false)); + // Write the sorted data to the current Parquet/ORC file + RETURN_IF_ERROR(_write_sorted_data()); + // Close the current file (it has reached the target size) and open a new writer + RETURN_IF_ERROR(_close_current_writer_and_open_next()); + // Reset the sorter state to accept new data for the next file + _sorter->reset(); + return Status::OK(); +} + +Status VIcebergSortWriter::_write_sorted_data() { + // Read sorted blocks from the sorter one by one and write them + // to the underlying partition writer (Parquet/ORC file) + bool eos = false; + Block block; + while (!eos && !_runtime_state->is_cancelled()) { + RETURN_IF_ERROR(_sorter->get_next(_runtime_state, &block, &eos)); + RETURN_IF_ERROR(_iceberg_partition_writer->write(block)); + block.clear_column_data(); + } + return Status::OK(); +} + +Status VIcebergSortWriter::_close_current_writer_and_open_next() { + // Save the current file name and index before closing, so the next file + // can use an incremented index (e.g., file_0, file_1, file_2, ...) + std::string current_file_name = _iceberg_partition_writer->file_name(); + int current_file_index = _iceberg_partition_writer->file_name_index(); + RETURN_IF_ERROR(_iceberg_partition_writer->close(Status::OK())); + + // Use the lambda to create a new partition writer with the next file index + _iceberg_partition_writer = _create_writer_lambda(¤t_file_name, current_file_index + 1); + if (!_iceberg_partition_writer) { + return Status::InternalError("Failed to create new partition writer"); + } + + RETURN_IF_ERROR(_iceberg_partition_writer->open(_runtime_state, _profile, _row_desc)); + return Status::OK(); +} + +int32_t VIcebergSortWriter::_get_spill_batch_size() const { + // Clamp the batch row count to int32_t max to prevent overflow + if (_spill_block_batch_row_count > std::numeric_limits<int32_t>::max()) { + return std::numeric_limits<int32_t>::max(); + } + return static_cast<int32_t>(_spill_block_batch_row_count); +} + +Status VIcebergSortWriter::_do_spill() { + COUNTER_UPDATE(_do_spill_count_counter, 1); + + // Explicitly sort the data before preparing for spill read. + // Although FullSorter::prepare_for_read(is_spill=true) internally calls do_sort() + // when there is unsorted data (see sorter.cpp), we call do_sort() explicitly here + // for clarity and to guarantee that the data written to the spill stream is sorted. + // This ensures correctness of the subsequent multi-way merge phase. + RETURN_IF_ERROR(_sorter->do_sort()); + + // prepare_for_read(is_spill=true) adjusts limit/offset for spill mode + // and builds the merge tree for reading sorted data + RETURN_IF_ERROR(_sorter->prepare_for_read(true)); + int32_t batch_size = _get_spill_batch_size(); + + // Register a new spill stream to store the sorted data on disk + SpillStreamSPtr spilling_stream; + RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( + _runtime_state, spilling_stream, print_id(_runtime_state->query_id()), "iceberg-sort", + 1 /* node_id */, batch_size, _runtime_state->spill_sort_batch_bytes(), _profile)); + _sorted_streams.emplace_back(spilling_stream); + + // Read sorted data from the sorter in batches and write to the spill stream + bool eos = false; + Block block; + while (!eos && !_runtime_state->is_cancelled()) { + RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill( + _runtime_state, &block, (int)_spill_block_batch_row_count, &eos)); + RETURN_IF_ERROR(spilling_stream->spill_block(_runtime_state, block, eos)); Review Comment: **[Medium]** This uses a C-style cast `(int)_spill_block_batch_row_count` which is a narrowing conversion from `size_t` to `int`. The `_get_spill_batch_size()` method was specifically created (a few lines above) to handle this conversion safely with bounds checking. This line should use it: ```cpp RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill( _runtime_state, &block, _get_spill_batch_size(), &eos)); ``` Using the C-style cast defeats the purpose of having the safe helper method. ########## be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp: ########## @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/iceberg/viceberg_sort_writer.h" + +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "vec/spill/spill_stream.h" +#include "vec/spill/spill_stream_manager.h" Review Comment: **[Low]** This new `.cpp` file is missing `compile_check_begin.h` / `compile_check_end.h` includes. The codebase convention (and the code-review checklist) requires new files to include these paired headers to enable `-Wconversion` as an error, which would have caught the narrowing cast issue on line 234. Consider adding: ```cpp #include "common/compile_check_begin.h" ``` after the last include, and: ```cpp #include "common/compile_check_end.h" ``` at the end of the file after the namespace closing brace. ########## be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp: ########## @@ -182,7 +182,7 @@ void SpillIcebergTableSinkLocalState::_init_spill_counters() { ADD_COUNTER_WITH_LEVEL(profile, "SpillReadTaskCount", TUnit::UNIT, 1); ADD_TIMER_WITH_LEVEL(profile, "SpillReadTaskWaitInQueueTime", 1); ADD_TIMER_WITH_LEVEL(profile, "SpillReadFileTime", 1); - ADD_TIMER_WITH_LEVEL(profile, "SpillReadDerializeBlockTime", 1); + ADD_TIMER_WITH_LEVEL(profile, "SpillReadDeserializeBlockTime", 1); ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockCount", TUnit::UNIT, 1); Review Comment: **[Medium]** The typo fix from `SpillReadDerializeBlockTime` to `SpillReadDeserializeBlockTime` is correct, but it's **incomplete across the codebase**. Two test files still use the old misspelled name and need to be updated as well: 1. `be/test/pipeline/operator/spillable_operator_test_helper.cpp` line 56: `"SpillReadDerializeBlockTime"` 2. `be/test/pipeline/exec/multi_cast_data_streamer_test.cpp` lines 59 and 103: `"SpillReadDerializeBlockTime"` After this PR, the production code registers the counter as `SpillReadDeserializeBlockTime`, but these tests will still register counters with the old name `SpillReadDerializeBlockTime`. This will cause `get_counter()` lookups to return `nullptr` when the spill reader tries to find its counter by the new name, potentially causing test failures or null dereferences. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
