This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cc48ea03a7 [bug](iceberg) fix iceberg sink writer with spill report 
error (#62899)
5cc48ea03a7 is described below

commit 5cc48ea03a77167cf6abd96adae4d13a4d648c63
Author: zhangstar333 <[email protected]>
AuthorDate: Mon May 25 14:56:00 2026 +0800

    [bug](iceberg) fix iceberg sink writer with spill report error (#62899)
    
    spill thread with write thread is different, so add mutex in the write
    function.
    
    ```
    mysql> create table web_sales
        -> order by (ws_sold_date_sk, ws_item_sk, ws_order_number)
        -> PROPERTIES (
        ->   'write-format'='parquet'
        -> )
        -> as select * from tpcds_sf1000_parquet.web_sales;
    ERROR 1105 (HY000): errCode = 2, detailMessage = 
(172.20.49.239)[INTERNAL_ERROR][E6] Size of permutation (40800) is less than 
required (48960)        0#  doris::Exception::Exception(int, 
std::basic_string_view<char, std::char_traits<char> > const&, bool) at 
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/common/exception.cpp:0
            1#  doris::Exception::Exception(int, std::basic_string_view<char, 
std::char_traits<char> > const&) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/basic_stri
    mysql>
    
    ```
---
 .../operator/spill_iceberg_table_sink_operator.cpp | 33 +++++++++++-----------
 .../sink/writer/iceberg/viceberg_sort_writer.cpp   | 29 +++++++++++++++++++
 .../sink/writer/iceberg/viceberg_sort_writer.h     | 25 ++++++++++++----
 .../sink/writer/iceberg/viceberg_table_writer.cpp  |  6 ++--
 .../sink/writer/iceberg/viceberg_table_writer.h    | 10 +++++--
 5 files changed, 75 insertions(+), 28 deletions(-)

diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp 
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index 8d2a2041ea9..67040e2762d 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -53,43 +53,42 @@ bool SpillIcebergTableSinkLocalState::is_blockable() const {
 }
 
 size_t SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
-    if (!_writer || !_writer->current_writer()) {
+    if (!_writer) {
         return 0;
     }
-
-    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
-    if (!sort_writer || !sort_writer->sorter()) {
+    auto current_writer = _writer->current_writer();
+    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+    if (!sort_writer) {
         return 0;
     }
 
-    return sort_writer->sorter()->get_reserve_mem_size(state, eos);
+    return sort_writer->get_reserve_mem_size(state, eos);
 }
 
 size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* 
state) const {
-    if (!_writer || !_writer->current_writer()) {
+    if (!_writer) {
         return 0;
     }
-
-    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
-    if (!sort_writer || !sort_writer->sorter()) {
+    auto current_writer = _writer->current_writer();
+    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+    if (!sort_writer) {
         return 0;
     }
 
-    return sort_writer->sorter()->data_size();
+    return sort_writer->data_size();
 }
 
 Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
-    if (!_writer || !_writer->current_writer()) {
+    if (!_writer) {
         return Status::OK();
     }
-
-    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
-
-    if (!sort_writer || !sort_writer->sorter()) {
+    auto current_writer = _writer->current_writer();
+    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+    if (!sort_writer) {
         return Status::OK();
     }
 
-    auto exception_catch_func = [sort_writer]() {
+    auto exception_catch_func = [current_writer, sort_writer]() {
         auto status = [&]() {
             RETURN_IF_CATCH_EXCEPTION({ return sort_writer->trigger_spill(); 
});
         }();
@@ -173,4 +172,4 @@ void 
SpillIcebergTableSinkLocalState::_init_spill_counters() {
     ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentCount", TUnit::UNIT, 
1);
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp 
b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
index 3d36e1f10eb..3827cb6d925 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
@@ -55,6 +55,8 @@ Status VIcebergSortWriter::open(RuntimeState* state, 
RuntimeProfile* profile,
 }
 
 Status VIcebergSortWriter::write(Block& block) {
+    std::lock_guard<std::mutex> lock(_sorter_mutex);
+
     // Append incoming block data to the sorter's internal buffer
     RETURN_IF_ERROR(_sorter->append_block(&block));
     _update_spill_block_batch_row_count(block);
@@ -72,7 +74,34 @@ Status VIcebergSortWriter::write(Block& block) {
     return Status::OK();
 }
 
+size_t VIcebergSortWriter::data_size() const {
+    std::lock_guard<std::mutex> lock(_sorter_mutex);
+    return _sorter == nullptr ? 0 : _sorter->data_size();
+}
+
+size_t VIcebergSortWriter::get_reserve_mem_size(RuntimeState* state, bool eos) 
const {
+    std::lock_guard<std::mutex> lock(_sorter_mutex);
+    return _sorter == nullptr ? 0 : _sorter->get_reserve_mem_size(state, eos);
+}
+
+Status VIcebergSortWriter::trigger_spill() {
+    std::lock_guard<std::mutex> lock(_sorter_mutex);
+    if (_closed || _sorter == nullptr) {
+        return Status::OK();
+    }
+    return _do_spill();
+}
+
 Status VIcebergSortWriter::close(const Status& status) {
+    std::lock_guard<std::mutex> lock(_sorter_mutex);
+    if (_closed) {
+        return Status::OK();
+    }
+    Defer mark_closed {[&]() { _closed = true; }};
+    return _close_locked(status);
+}
+
+Status VIcebergSortWriter::_close_locked(const Status& status) {
     // Track the actual internal status of operations performed during close.
     // This is important because if intermediate operations (like do_sort()) 
fail,
     // we need to propagate the actual error status to the underlying 
partition writer's
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h 
b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
index 45858f473c9..e1e512f0a0c 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
+++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
@@ -18,7 +18,7 @@
 #pragma once
 
 #include <cstdint>
-#include <limits>
+#include <mutex>
 #include <utility>
 #include <vector>
 
@@ -101,12 +101,12 @@ public:
 
     inline size_t written_len() const override { return 
_iceberg_partition_writer->written_len(); }
 
-    // Returns a raw pointer to the FullSorter, used by 
SpillIcebergTableSinkLocalState
-    // to query memory usage (data_size, get_reserve_mem_size)
-    auto sorter() const { return _sorter.get(); }
+    size_t data_size() const;
+
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) const;
 
     // Called by the memory management system to trigger spilling data to disk
-    Status trigger_spill() { return _do_spill(); }
+    Status trigger_spill();
 
 private:
     // Calculate average row size from the first non-empty block to determine
@@ -129,6 +129,8 @@ private:
     // Explicitly calls do_sort() before prepare_for_read() to guarantee 
sorted output.
     Status _do_spill();
 
+    Status _close_locked(const Status& status);
+
     // Merge all spilled streams and output final sorted data to Parquet/ORC 
files.
     // Handles file splitting when output exceeds target file size.
     Status _combine_files_output();
@@ -168,6 +170,17 @@ private:
     std::unique_ptr<FullSorter> _sorter;
     std::unique_ptr<VSortedRunMerger> _merger;
 
+    // Serialize all accesses to _sorter because async writes and revoke 
spills run on
+    // different thread pools but touch the same FullSorter instance.
+    mutable std::mutex _sorter_mutex;
+
+    // Set to true once close() has finished tearing down the sorter / 
underlying writer.
+    // Late-arriving revoke spills (which run on a different thread than the 
async writer)
+    // must become no-ops after close, otherwise they would write to a fresh 
spill stream
+    // whose data never gets merged out (close has already produced the final 
output and
+    // cleaned up spill files).
+    bool _closed = false;
+
     // Queue of spill files waiting to be merged (FIFO order)
     std::deque<SpillFileSPtr> _sorted_spill_files;
     // Files currently being consumed by the merger
@@ -185,4 +198,4 @@ private:
     RuntimeProfile::Counter* _do_spill_count_counter = nullptr;
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
index 6a67b6ca8bd..5584491e99c 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -283,7 +283,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block& 
output_block) {
         SCOPED_RAW_TIMER(&_partition_writers_write_ns);
         output_block.erase(_non_write_columns_indices);
         RETURN_IF_ERROR(writer->write(output_block));
-        _current_writer = writer;
+        _current_writer.store(writer);
         return Status::OK();
     }
 
@@ -326,7 +326,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block& 
output_block) {
         SCOPED_RAW_TIMER(&_partition_writers_write_ns);
         output_block.erase(_non_write_columns_indices);
         RETURN_IF_ERROR(writer->write(output_block));
-        _current_writer = writer;
+        _current_writer.store(writer);
         return Status::OK();
     }
 
@@ -429,7 +429,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block& 
output_block) {
         Block filtered_block;
         RETURN_IF_ERROR(_filter_block(output_block, &it->second, 
&filtered_block));
         RETURN_IF_ERROR(it->first->write(filtered_block));
-        _current_writer = it->first;
+        _current_writer.store(it->first);
     }
     return Status::OK();
 }
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
index f94ce4feb6b..cc7cec1fdad 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
@@ -19,6 +19,7 @@
 
 #include <gen_cpp/DataSinks_types.h>
 
+#include "common/atomic_shared_ptr.h"
 #include "core/block/block.h"
 #include "core/column/column.h"
 #include "exec/sink/writer/async_result_writer.h"
@@ -66,12 +67,17 @@ public:
     // Getter for the current partition writer.
     // Used by SpillIcebergTableSinkLocalState to access the current writer for
     // memory management operations (get_reserve_mem_size, revocable_mem_size, 
etc.).
-    const std::shared_ptr<IPartitionWriterBase>& current_writer() const { 
return _current_writer; }
+    // Returns a snapshot by value: the async writer thread updates 
_current_writer
+    // concurrently with the spill/revoke path, so callers must hold their own 
copy
+    // while operating on it instead of dereferencing the underlying member 
directly.
+    std::shared_ptr<IPartitionWriterBase> current_writer() const { return 
_current_writer.load(); }
 
 private:
     // The currently active partition writer (may be VIcebergPartitionWriter 
or VIcebergSortWriter).
     // Updated during write() to track which writer received the most recent 
data.
-    std::shared_ptr<IPartitionWriterBase> _current_writer;
+    // Wrapped in atomic_shared_ptr because revoke_memory / 
get_revocable_mem_size run on
+    // a different thread than the async writer that assigns to it.
+    doris::atomic_shared_ptr<IPartitionWriterBase> _current_writer;
     class IcebergPartitionColumn {
     public:
         IcebergPartitionColumn(const iceberg::PartitionField& field,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to