This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new bcfa30d9c54 branch-4.1: [bug](iceberg) fix iceberg sink writer with
spill report error (#62899) (#63993)
bcfa30d9c54 is described below
commit bcfa30d9c54f6290dbb449c75c187c19d91dc87d
Author: zhangstar333 <[email protected]>
AuthorDate: Tue Jun 2 21:12:44 2026 +0800
branch-4.1: [bug](iceberg) fix iceberg sink writer with spill report error
(#62899) (#63993)
cherry-pick from master (#62899)
---
.../operator/spill_iceberg_table_sink_operator.cpp | 33 +++++++++++-----------
.../sink/writer/iceberg/viceberg_sort_writer.cpp | 29 +++++++++++++++++++
.../sink/writer/iceberg/viceberg_sort_writer.h | 25 ++++++++++++----
.../sink/writer/iceberg/viceberg_table_writer.cpp | 6 ++--
.../sink/writer/iceberg/viceberg_table_writer.h | 10 +++++--
5 files changed, 75 insertions(+), 28 deletions(-)
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index c557ac58e60..ab02a0a1f36 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -54,43 +54,42 @@ bool SpillIcebergTableSinkLocalState::is_blockable() const {
}
size_t SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState*
state, bool eos) {
- if (!_writer || !_writer->current_writer()) {
+ if (!_writer) {
return 0;
}
-
- auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
- if (!sort_writer || !sort_writer->sorter()) {
+ auto current_writer = _writer->current_writer();
+ auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+ if (!sort_writer) {
return 0;
}
- return sort_writer->sorter()->get_reserve_mem_size(state, eos);
+ return sort_writer->get_reserve_mem_size(state, eos);
}
size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState*
state) const {
- if (!_writer || !_writer->current_writer()) {
+ if (!_writer) {
return 0;
}
-
- auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
- if (!sort_writer || !sort_writer->sorter()) {
+ auto current_writer = _writer->current_writer();
+ auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+ if (!sort_writer) {
return 0;
}
- return sort_writer->sorter()->data_size();
+ return sort_writer->data_size();
}
Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
- if (!_writer || !_writer->current_writer()) {
+ if (!_writer) {
return Status::OK();
}
-
- auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
-
- if (!sort_writer || !sort_writer->sorter()) {
+ auto current_writer = _writer->current_writer();
+ auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+ if (!sort_writer) {
return Status::OK();
}
- auto exception_catch_func = [sort_writer]() {
+ auto exception_catch_func = [current_writer, sort_writer]() {
auto status = [&]() {
RETURN_IF_CATCH_EXCEPTION({ return sort_writer->trigger_spill();
});
}();
@@ -175,4 +174,4 @@ void
SpillIcebergTableSinkLocalState::_init_spill_counters() {
}
#include "common/compile_check_end.h"
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
index 15f72b0f7cc..db453c511fa 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
@@ -56,6 +56,8 @@ Status VIcebergSortWriter::open(RuntimeState* state,
RuntimeProfile* profile,
}
Status VIcebergSortWriter::write(Block& block) {
+ std::lock_guard<std::mutex> lock(_sorter_mutex);
+
// Append incoming block data to the sorter's internal buffer
RETURN_IF_ERROR(_sorter->append_block(&block));
_update_spill_block_batch_row_count(block);
@@ -73,7 +75,34 @@ Status VIcebergSortWriter::write(Block& block) {
return Status::OK();
}
+size_t VIcebergSortWriter::data_size() const {
+ std::lock_guard<std::mutex> lock(_sorter_mutex);
+ return _sorter == nullptr ? 0 : _sorter->data_size();
+}
+
+size_t VIcebergSortWriter::get_reserve_mem_size(RuntimeState* state, bool eos)
const {
+ std::lock_guard<std::mutex> lock(_sorter_mutex);
+ return _sorter == nullptr ? 0 : _sorter->get_reserve_mem_size(state, eos);
+}
+
+Status VIcebergSortWriter::trigger_spill() {
+ std::lock_guard<std::mutex> lock(_sorter_mutex);
+ if (_closed || _sorter == nullptr) {
+ return Status::OK();
+ }
+ return _do_spill();
+}
+
Status VIcebergSortWriter::close(const Status& status) {
+ std::lock_guard<std::mutex> lock(_sorter_mutex);
+ if (_closed) {
+ return Status::OK();
+ }
+ Defer mark_closed {[&]() { _closed = true; }};
+ return _close_locked(status);
+}
+
+Status VIcebergSortWriter::_close_locked(const Status& status) {
// Track the actual internal status of operations performed during close.
// This is important because if intermediate operations (like do_sort())
fail,
// we need to propagate the actual error status to the underlying
partition writer's
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
index 45858f473c9..e1e512f0a0c 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
+++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
@@ -18,7 +18,7 @@
#pragma once
#include <cstdint>
-#include <limits>
+#include <mutex>
#include <utility>
#include <vector>
@@ -101,12 +101,12 @@ public:
inline size_t written_len() const override { return
_iceberg_partition_writer->written_len(); }
- // Returns a raw pointer to the FullSorter, used by
SpillIcebergTableSinkLocalState
- // to query memory usage (data_size, get_reserve_mem_size)
- auto sorter() const { return _sorter.get(); }
+ size_t data_size() const;
+
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) const;
// Called by the memory management system to trigger spilling data to disk
- Status trigger_spill() { return _do_spill(); }
+ Status trigger_spill();
private:
// Calculate average row size from the first non-empty block to determine
@@ -129,6 +129,8 @@ private:
// Explicitly calls do_sort() before prepare_for_read() to guarantee
sorted output.
Status _do_spill();
+ Status _close_locked(const Status& status);
+
// Merge all spilled streams and output final sorted data to Parquet/ORC
files.
// Handles file splitting when output exceeds target file size.
Status _combine_files_output();
@@ -168,6 +170,17 @@ private:
std::unique_ptr<FullSorter> _sorter;
std::unique_ptr<VSortedRunMerger> _merger;
+ // Serialize all accesses to _sorter because async writes and revoke
spills run on
+ // different thread pools but touch the same FullSorter instance.
+ mutable std::mutex _sorter_mutex;
+
+ // Set to true once close() has finished tearing down the sorter /
underlying writer.
+ // Late-arriving revoke spills (which run on a different thread than the
async writer)
+ // must become no-ops after close, otherwise they would write to a fresh
spill stream
+ // whose data never gets merged out (close has already produced the final
output and
+ // cleaned up spill files).
+ bool _closed = false;
+
// Queue of spill files waiting to be merged (FIFO order)
std::deque<SpillFileSPtr> _sorted_spill_files;
// Files currently being consumed by the merger
@@ -185,4 +198,4 @@ private:
RuntimeProfile::Counter* _do_spill_count_counter = nullptr;
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
index 91ce0911200..c79aa3262bf 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -284,7 +284,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block&
output_block) {
SCOPED_RAW_TIMER(&_partition_writers_write_ns);
output_block.erase(_non_write_columns_indices);
RETURN_IF_ERROR(writer->write(output_block));
- _current_writer = writer;
+ _current_writer.store(writer);
return Status::OK();
}
@@ -327,7 +327,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block&
output_block) {
SCOPED_RAW_TIMER(&_partition_writers_write_ns);
output_block.erase(_non_write_columns_indices);
RETURN_IF_ERROR(writer->write(output_block));
- _current_writer = writer;
+ _current_writer.store(writer);
return Status::OK();
}
@@ -430,7 +430,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block&
output_block) {
Block filtered_block;
RETURN_IF_ERROR(_filter_block(output_block, &it->second,
&filtered_block));
RETURN_IF_ERROR(it->first->write(filtered_block));
- _current_writer = it->first;
+ _current_writer.store(it->first);
}
return Status::OK();
}
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
index f94ce4feb6b..cc7cec1fdad 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
@@ -19,6 +19,7 @@
#include <gen_cpp/DataSinks_types.h>
+#include "common/atomic_shared_ptr.h"
#include "core/block/block.h"
#include "core/column/column.h"
#include "exec/sink/writer/async_result_writer.h"
@@ -66,12 +67,17 @@ public:
// Getter for the current partition writer.
// Used by SpillIcebergTableSinkLocalState to access the current writer for
// memory management operations (get_reserve_mem_size, revocable_mem_size,
etc.).
- const std::shared_ptr<IPartitionWriterBase>& current_writer() const {
return _current_writer; }
+ // Returns a snapshot by value: the async writer thread updates
_current_writer
+ // concurrently with the spill/revoke path, so callers must hold their own
copy
+ // while operating on it instead of dereferencing the underlying member
directly.
+ std::shared_ptr<IPartitionWriterBase> current_writer() const { return
_current_writer.load(); }
private:
// The currently active partition writer (may be VIcebergPartitionWriter
or VIcebergSortWriter).
// Updated during write() to track which writer received the most recent
data.
- std::shared_ptr<IPartitionWriterBase> _current_writer;
+ // Wrapped in atomic_shared_ptr because revoke_memory /
get_revocable_mem_size run on
+ // a different thread than the async writer that assigns to it.
+ doris::atomic_shared_ptr<IPartitionWriterBase> _current_writer;
class IcebergPartitionColumn {
public:
IcebergPartitionColumn(const iceberg::PartitionField& field,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]