This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c0380ad4594c74e444d707e43537a15b98c61bf1
Author: yiguolei <[email protected]>
AuthorDate: Wed Mar 4 13:11:14 2026 +0800

    change spill file to shared ptr
---
 be/src/pipeline/exec/multi_cast_data_streamer.cpp  |  2 +-
 be/src/pipeline/exec/multi_cast_data_streamer.h    |  2 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |  9 ++++++
 .../exec/partitioned_aggregation_sink_operator.h   |  2 +-
 .../partitioned_aggregation_source_operator.cpp    |  5 ++++
 .../exec/partitioned_aggregation_source_operator.h |  2 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  | 20 +++++++++++--
 .../exec/partitioned_hash_join_probe_operator.h    |  6 ++--
 .../exec/partitioned_hash_join_sink_operator.cpp   |  9 ++++++
 .../exec/partitioned_hash_join_sink_operator.h     |  2 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  5 ++++
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  2 +-
 .../pipeline/exec/spill_sort_source_operator.cpp   | 13 +++++++-
 be/src/pipeline/exec/spill_sort_source_operator.h  |  2 +-
 be/src/vec/spill/spill_file.cpp                    |  9 +++---
 be/src/vec/spill/spill_file.h                      | 12 ++++----
 be/src/vec/spill/spill_file_reader.h               |  2 +-
 be/src/vec/spill/spill_file_writer.cpp             | 35 +++++++++++++++-------
 be/src/vec/spill/spill_file_writer.h               | 10 +++----
 be/src/vec/spill/spill_repartitioner.cpp           |  2 +-
 be/src/vec/spill/spill_repartitioner.h             |  6 ++--
 21 files changed, 113 insertions(+), 44 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index b98763746fe..9006a236d28 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -252,7 +252,7 @@ Status 
MultiCastDataStreamer::_start_spill_task(RuntimeState* state,
     auto spill_func = [state, blocks = std::move(blocks), spill_file = 
std::move(spill_file),
                        sink_profile]() mutable {
         const auto blocks_count = blocks.size();
-        vectorized::SpillFileWriterUPtr writer;
+        vectorized::SpillFileWriterSPtr writer;
         RETURN_IF_ERROR(spill_file->create_writer(state, sink_profile, 
writer));
         for (auto& block : blocks) {
             if (state->is_cancelled()) break;
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 5e8526c8dda..6c123148319 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -45,7 +45,7 @@ struct MultiCastBlock {
 };
 
 struct SpillingReader {
-    vectorized::SpillFileReaderUPtr reader;
+    vectorized::SpillFileReaderSPtr reader;
     vectorized::SpillFileSPtr spill_file;
     int64_t block_offset {0};
     bool all_data_read {false};
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 25d49935fde..ac729bf911c 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -77,6 +77,15 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* 
state, Status exec_stat
     if (Base::_closed) {
         return Status::OK();
     }
+
+    for (auto& writer : _spill_writers) {
+        if (writer) {
+            RETURN_IF_ERROR(writer->close());
+            writer.reset();
+        }
+    }
+    _spill_writers.clear();
+
     return Base::close(state, exec_status);
 }
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 6ac9759ebfe..5538c296562 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -97,7 +97,7 @@ public:
 
     RuntimeProfile::Counter* _spill_serialize_hash_table_timer = nullptr;
 
-    std::vector<vectorized::SpillFileWriterUPtr> _spill_writers;
+    std::vector<vectorized::SpillFileWriterSPtr> _spill_writers;
 
     std::atomic<bool> _eos = false;
 };
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index fd3f6c284be..fe2ebb2df78 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -101,6 +101,11 @@ Status PartitionedAggLocalState::close(RuntimeState* 
state) {
         return Status::OK();
     }
 
+    if (_current_reader) {
+        RETURN_IF_ERROR(_current_reader->close());
+        _current_reader.reset();
+    }
+
     // Clean up partition queue resources.
     for (auto& partition : _partition_queue) {
         if (partition.spill_file) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index ff40cd411d4..518e59b5ac5 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -116,7 +116,7 @@ private:
     SpillRepartitioner _repartitioner;
 
     // Persistent reader for _recover_blocks_from_partition (survives across 
yield calls)
-    vectorized::SpillFileReaderUPtr _current_reader;
+    vectorized::SpillFileReaderSPtr _current_reader;
 };
 
 class AggSourceOperatorX;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 49156b83c9e..b24f86846d5 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -186,6 +186,24 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
+
+    for (auto& writer : _probe_writers) {
+        if (writer) {
+            RETURN_IF_ERROR(writer->close());
+            writer.reset();
+        }
+    }
+    _probe_writers.clear();
+
+    if (_current_build_reader) {
+        RETURN_IF_ERROR(_current_build_reader->close());
+        _current_build_reader.reset();
+    }
+    if (_current_probe_reader) {
+        RETURN_IF_ERROR(_current_probe_reader->close());
+        _current_probe_reader.reset();
+    }
+
     // Clean up any remaining spill partition queue entries
     for (auto& entry : _spill_partition_queue) {
         if (entry.build_file) {
@@ -203,8 +221,6 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
         
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(_current_partition.probe_file);
     }
     _current_partition = JoinSpillPartitionInfo {};
-    _current_build_reader.reset();
-    _current_probe_reader.reset();
     _queue_probe_blocks.clear();
 
     RETURN_IF_ERROR(PipelineXSpillLocalState::close(state));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index a375fe1d1f7..bbaccf5906b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -158,14 +158,14 @@ private:
     std::map<uint32_t, std::vector<vectorized::Block>> _probe_blocks;
 
     std::vector<vectorized::SpillFileSPtr> _probe_spilling_groups;
-    std::vector<vectorized::SpillFileWriterUPtr> _probe_writers;
+    std::vector<vectorized::SpillFileWriterSPtr> _probe_writers;
 
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
 
     // Persistent readers for recovery across scheduling slices
-    vectorized::SpillFileReaderUPtr _current_build_reader;
-    vectorized::SpillFileReaderUPtr _current_probe_reader;
+    vectorized::SpillFileReaderSPtr _current_build_reader;
+    vectorized::SpillFileReaderSPtr _current_probe_reader;
 
     // ---- Spill partition queue state ----
     // Whether _spill_partition_queue has been initialized from spilled build 
groups +
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 5f8526b4317..cac453c8b69 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -84,6 +84,15 @@ Status 
PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec
         
RETURN_IF_ERROR(p._inner_sink_operator->close(_shared_state->_inner_runtime_state.get(),
                                                       exec_status));
     }
+
+    for (auto& writer : _build_writers) {
+        if (writer) {
+            RETURN_IF_ERROR(writer->close());
+            writer.reset();
+        }
+    }
+    _build_writers.clear();
+
     return PipelineXSpillSinkLocalState::close(state, exec_status);
 }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 61f275c7156..12d4810bbdd 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -94,7 +94,7 @@ protected:
     RuntimeProfile::Counter* _in_mem_rows_counter = nullptr;
     RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
 
-    std::vector<vectorized::SpillFileWriterUPtr> _build_writers;
+    std::vector<vectorized::SpillFileWriterSPtr> _build_writers;
 };
 
 class PartitionedHashJoinSinkOperatorX
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 87aa89021b4..5ce2130eff4 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -66,6 +66,11 @@ void SpillSortSinkLocalState::update_profile(RuntimeProfile* 
child_profile) {
 #undef UPDATE_PROFILE
 
 Status SpillSortSinkLocalState::close(RuntimeState* state, Status 
execsink_status) {
+    if (_spilling_writer) {
+        RETURN_IF_ERROR(_spilling_writer->close());
+        _spilling_writer.reset();
+    }
+    _spilling_file.reset();
     return Base::close(state, execsink_status);
 }
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 19f9a0c85c9..36710e14582 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -62,7 +62,7 @@ private:
     RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;
 
     vectorized::SpillFileSPtr _spilling_file;
-    vectorized::SpillFileWriterUPtr _spilling_writer;
+    vectorized::SpillFileWriterSPtr _spilling_writer;
 
     std::atomic<bool> _eos = false;
 };
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 06150cce856..9b4eb4b0d0a 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -64,6 +64,17 @@ Status SpillSortLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
+
+    for (auto& reader : _current_merging_readers) {
+        if (reader) {
+            RETURN_IF_ERROR(reader->close());
+            reader.reset();
+        }
+    }
+    _current_merging_readers.clear();
+    _current_merging_files.clear();
+    _merger.reset();
+
     return Base::close(state);
 }
 
@@ -100,7 +111,7 @@ Status 
SpillSortLocalState::execute_merge_sort_spill_files(RuntimeState* state)
                                          
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
         
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(relative_path,
                                                                                
     tmp_file));
-        vectorized::SpillFileWriterUPtr tmp_writer;
+        vectorized::SpillFileWriterSPtr tmp_writer;
         RETURN_IF_ERROR(tmp_file->create_writer(state, operator_profile(), 
tmp_writer));
         _shared_state->sorted_spill_groups.emplace_back(tmp_file);
 
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index 3b2943033a2..fe13c2aad20 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -63,7 +63,7 @@ protected:
 
     std::vector<vectorized::SpillFileSPtr> _current_merging_files;
     /// Readers held alive during merge; one per SpillFile, reads parts 
sequentially.
-    std::vector<vectorized::SpillFileReaderUPtr> _current_merging_readers;
+    std::vector<vectorized::SpillFileReaderSPtr> _current_merging_readers;
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
diff --git a/be/src/vec/spill/spill_file.cpp b/be/src/vec/spill/spill_file.cpp
index 7be97d738c3..81d8d322478 100644
--- a/be/src/vec/spill/spill_file.cpp
+++ b/be/src/vec/spill/spill_file.cpp
@@ -65,17 +65,18 @@ void SpillFile::gc() {
 }
 
 Status SpillFile::create_writer(RuntimeState* state, RuntimeProfile* profile,
-                                SpillFileWriterUPtr& writer) {
-    writer = std::make_unique<SpillFileWriter>(this, state, profile, 
_data_dir, _spill_dir);
+                                SpillFileWriterSPtr& writer) {
+    writer = std::make_shared<SpillFileWriter>(shared_from_this(), state, 
profile, _data_dir,
+                                               _spill_dir);
     // record active writer
     _active_writer = writer.get();
     return Status::OK();
 }
 
-SpillFileReaderUPtr SpillFile::create_reader(RuntimeState* state, 
RuntimeProfile* profile) const {
+SpillFileReaderSPtr SpillFile::create_reader(RuntimeState* state, 
RuntimeProfile* profile) const {
     // It's a programming error to create a reader while a writer is still 
active.
     DCHECK(_active_writer == nullptr) << "create_reader() called while writer 
still active";
-    return std::make_unique<SpillFileReader>(state, profile, _spill_dir, 
_part_count);
+    return std::make_shared<SpillFileReader>(state, profile, _spill_dir, 
_part_count);
 }
 
 void SpillFile::finish_writing(int64_t total_written_bytes, size_t part_count) 
{
diff --git a/be/src/vec/spill/spill_file.h b/be/src/vec/spill/spill_file.h
index 2d938d98fdb..362b522f10c 100644
--- a/be/src/vec/spill/spill_file.h
+++ b/be/src/vec/spill/spill_file.h
@@ -32,8 +32,8 @@ class Block;
 class SpillDataDir;
 class SpillFileWriter;
 class SpillFileReader;
-using SpillFileWriterUPtr = std::unique_ptr<SpillFileWriter>;
-using SpillFileReaderUPtr = std::unique_ptr<SpillFileReader>;
+using SpillFileWriterSPtr = std::shared_ptr<SpillFileWriter>;
+using SpillFileReaderSPtr = std::shared_ptr<SpillFileReader>;
 
 /// SpillFile represents a logical spill file that may consist of multiple
 /// physical "part" files on disk. Parts are managed automatically by
@@ -46,7 +46,7 @@ using SpillFileReaderUPtr = std::unique_ptr<SpillFileReader>;
 ///   +-- ...
 ///
 /// Writing workflow:
-///   SpillFileWriterUPtr writer;
+///   SpillFileWriterSPtr writer;
 ///   RETURN_IF_ERROR(spill_file->create_writer(state, profile, writer));
 ///   RETURN_IF_ERROR(writer->write_block(state, block)); // auto-rotates parts
 ///   RETURN_IF_ERROR(writer->close());                   // finalizes all 
parts
@@ -55,7 +55,7 @@ using SpillFileReaderUPtr = std::unique_ptr<SpillFileReader>;
 ///   auto reader = spill_file->create_reader(state, profile);
 ///   RETURN_IF_ERROR(reader->open());
 ///   while (!eos) { RETURN_IF_ERROR(reader->read(&block, &eos)); }
-class SpillFile {
+class SpillFile : public std::enable_shared_from_this<SpillFile> {
 public:
     // to avoid too many small file writes
     static constexpr size_t MIN_SPILL_WRITE_BATCH_MEM = 512 * 1024;
@@ -80,11 +80,11 @@ public:
     /// Create a SpillFileWriter that automatically manages multi-part 
rotation.
     /// Only one writer should exist per SpillFile at a time.
     /// Part size threshold is read from config::spill_file_part_size_bytes.
-    Status create_writer(RuntimeState* state, RuntimeProfile* profile, 
SpillFileWriterUPtr& writer);
+    Status create_writer(RuntimeState* state, RuntimeProfile* profile, 
SpillFileWriterSPtr& writer);
 
     /// Create a SpillFileReader that reads sequentially across all parts.
     /// The caller should call reader->open() before reading.
-    SpillFileReaderUPtr create_reader(RuntimeState* state, RuntimeProfile* 
profile) const;
+    SpillFileReaderSPtr create_reader(RuntimeState* state, RuntimeProfile* 
profile) const;
 
 private:
     friend class SpillFileWriter;
diff --git a/be/src/vec/spill/spill_file_reader.h 
b/be/src/vec/spill/spill_file_reader.h
index e76bf8355db..c77b58a3290 100644
--- a/be/src/vec/spill/spill_file_reader.h
+++ b/be/src/vec/spill/spill_file_reader.h
@@ -107,7 +107,7 @@ private:
     std::shared_ptr<ResourceContext> _resource_ctx = nullptr;
 };
 
-using SpillFileReaderUPtr = std::unique_ptr<SpillFileReader>;
+using SpillFileReaderSPtr = std::shared_ptr<SpillFileReader>;
 
 } // namespace doris::vectorized
 #include "common/compile_check_end.h"
diff --git a/be/src/vec/spill/spill_file_writer.cpp 
b/be/src/vec/spill/spill_file_writer.cpp
index 3413142f9c8..d0ef50dbe60 100644
--- a/be/src/vec/spill/spill_file_writer.cpp
+++ b/be/src/vec/spill/spill_file_writer.cpp
@@ -32,10 +32,10 @@
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 
-SpillFileWriter::SpillFileWriter(SpillFile* spill_file, RuntimeState* state,
+SpillFileWriter::SpillFileWriter(const std::shared_ptr<SpillFile>& spill_file, 
RuntimeState* state,
                                  RuntimeProfile* profile, SpillDataDir* 
data_dir,
                                  const std::string& spill_dir)
-        : _spill_file(spill_file),
+        : _spill_file_wptr(spill_file),
           _data_dir(data_dir),
           _spill_dir(spill_dir),
           _max_part_size(config::spill_file_part_size_bytes),
@@ -46,8 +46,9 @@ SpillFileWriter::SpillFileWriter(SpillFile* spill_file, 
RuntimeState* state,
     _memory_used_counter = common_profile->get_counter("MemoryUsage");
 
     // Register this writer as the active writer for the SpillFile.
-    if (_spill_file) {
-        _spill_file->_active_writer = this;
+    auto spill_file_locked = _spill_file_wptr.lock();
+    if (spill_file_locked) {
+        spill_file_locked->_active_writer = this;
     }
 
     // Custom (spill-specific) counters
@@ -63,7 +64,14 @@ SpillFileWriter::SpillFileWriter(SpillFile* spill_file, 
RuntimeState* state,
 }
 
 SpillFileWriter::~SpillFileWriter() {
-    DCHECK(_closed) << "SpillFileWriter destroyed without close(), possible 
memory leak";
+    if (_closed) {
+        return;
+    }
+    Status st = close();
+    if (!st.ok()) {
+        LOG(WARNING) << "SpillFileWriter::~SpillFileWriter() failed: " << 
st.to_string()
+                     << ", spill_dir=" << _spill_dir;
+    }
 }
 
 Status SpillFileWriter::_open_next_part() {
@@ -158,13 +166,18 @@ Status SpillFileWriter::close() {
 
     RETURN_IF_ERROR(_close_current_part());
 
-    if (!_spill_file || _spill_file->_active_writer != this) {
-        return Status::Error<INTERNAL_ERROR>(
-                "SpillFileWriter close() called but not registered as active 
writer, possible "
-                "double close or logic error");
+    // Use weak_ptr lock to safely access SpillFile during close.
+    // If the SpillFile has already been destroyed, we skip the callback.
+    auto spill_file = _spill_file_wptr.lock();
+    if (spill_file) {
+        if (spill_file->_active_writer != this) {
+            return Status::Error<INTERNAL_ERROR>(
+                    "SpillFileWriter close() called but not registered as 
active writer, possible "
+                    "double close or logic error");
+        }
+        spill_file->finish_writing(_total_written_bytes, _total_parts);
+        spill_file->_active_writer = nullptr;
     }
-    _spill_file->finish_writing(_total_written_bytes, _total_parts);
-    _spill_file->_active_writer = nullptr;
 
     return Status::OK();
 }
diff --git a/be/src/vec/spill/spill_file_writer.h 
b/be/src/vec/spill/spill_file_writer.h
index 2e45c8ae32c..8492d11ba13 100644
--- a/be/src/vec/spill/spill_file_writer.h
+++ b/be/src/vec/spill/spill_file_writer.h
@@ -38,7 +38,7 @@ class SpillFile;
 /// (config::spill_file_part_size_bytes).
 ///
 /// Usage:
-///   SpillFileWriterUPtr writer;
+///   SpillFileWriterSPtr writer;
 ///   RETURN_IF_ERROR(spill_file->create_writer(state, profile, writer));
 ///   RETURN_IF_ERROR(writer->write_block(state, block));
 ///   RETURN_IF_ERROR(writer->close());
@@ -48,8 +48,8 @@ class SpillFile;
 /// directory.
 class SpillFileWriter {
 public:
-    SpillFileWriter(SpillFile* spill_file, RuntimeState* state, 
RuntimeProfile* profile,
-                    SpillDataDir* data_dir, const std::string& spill_dir);
+    SpillFileWriter(const std::shared_ptr<SpillFile>& spill_file, 
RuntimeState* state,
+                    RuntimeProfile* profile, SpillDataDir* data_dir, const 
std::string& spill_dir);
 
     ~SpillFileWriter();
 
@@ -75,7 +75,7 @@ private:
     Status _write_internal(const Block& block);
 
     // ── Back-reference ──
-    SpillFile* _spill_file; // non-owning; for close() callback
+    std::weak_ptr<SpillFile> _spill_file_wptr; // weak ref; use lock() in 
close()
 
     // ── Configuration ──
     SpillDataDir* _data_dir = nullptr;
@@ -109,7 +109,7 @@ private:
 
     std::shared_ptr<ResourceContext> _resource_ctx = nullptr;
 };
-using SpillFileWriterUPtr = std::unique_ptr<SpillFileWriter>;
+using SpillFileWriterSPtr = std::shared_ptr<SpillFileWriter>;
 } // namespace vectorized
 } // namespace doris
 
diff --git a/be/src/vec/spill/spill_repartitioner.cpp 
b/be/src/vec/spill/spill_repartitioner.cpp
index 452ce9a8c71..780927e117c 100644
--- a/be/src/vec/spill/spill_repartitioner.cpp
+++ b/be/src/vec/spill/spill_repartitioner.cpp
@@ -134,7 +134,7 @@ Status SpillRepartitioner::repartition(RuntimeState* state,
     return Status::OK();
 }
 
-Status SpillRepartitioner::repartition(RuntimeState* state, 
vectorized::SpillFileReaderUPtr& reader,
+Status SpillRepartitioner::repartition(RuntimeState* state, 
vectorized::SpillFileReaderSPtr& reader,
                                        bool* done) {
     DCHECK(_output_spill_files != nullptr) << "setup_output() must be called 
first";
     DCHECK(reader != nullptr) << "reader must not be null";
diff --git a/be/src/vec/spill/spill_repartitioner.h 
b/be/src/vec/spill/spill_repartitioner.h
index 0effe325f54..f6894dc3637 100644
--- a/be/src/vec/spill/spill_repartitioner.h
+++ b/be/src/vec/spill/spill_repartitioner.h
@@ -107,7 +107,7 @@ public:
     /// and wants to repartition only the remaining data without re-reading
     /// from the beginning. Ownership of the reader is transferred on 
completion.
     /// Call repeatedly until done == true.
-    Status repartition(RuntimeState* state, vectorized::SpillFileReaderUPtr& 
reader, bool* done);
+    Status repartition(RuntimeState* state, vectorized::SpillFileReaderSPtr& 
reader, bool* done);
 
     /// Route a single in-memory block into output files via persistent 
writers.
     Status route_block(RuntimeState* state, vectorized::Block& block);
@@ -158,11 +158,11 @@ private:
 
     // ── Persistent state across repartition/route_block calls ──────
     // Output writers (one per partition), created by setup_output()
-    std::vector<vectorized::SpillFileWriterUPtr> _output_writers;
+    std::vector<vectorized::SpillFileWriterSPtr> _output_writers;
     // Pointer to caller's output SpillFiles vector (for finalize)
     std::vector<vectorized::SpillFileSPtr>* _output_spill_files = nullptr;
     // Input reader for repartition(), persists across yield calls
-    vectorized::SpillFileReaderUPtr _input_reader;
+    vectorized::SpillFileReaderSPtr _input_reader;
     vectorized::SpillFileSPtr _current_input_file;
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to