This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a77916  feat(core): introduce merge tree spill buffers (#67)
4a77916 is described below

commit 4a77916da8170ae2551666b57bed5e84a665e3eb
Author: Zhang Jiawei <[email protected]>
AuthorDate: Wed Jun 10 11:14:21 2026 +0800

    feat(core): introduce merge tree spill buffers (#67)
---
 .../core/mergetree/in_memory_sort_buffer.cpp       | 186 ++++++++++++
 src/paimon/core/mergetree/in_memory_sort_buffer.h  |  90 ++++++
 src/paimon/core/mergetree/sort_buffer.h            |  65 +++++
 src/paimon/core/mergetree/spill_channel_manager.h  |  61 ++++
 .../core/mergetree/spill_channel_manager_test.cpp  | 110 ++++++++
 src/paimon/core/mergetree/spill_reader.cpp         | 169 +++++++++++
 src/paimon/core/mergetree/spill_reader.h           |  96 +++++++
 .../core/mergetree/spill_reader_writer_test.cpp    | 311 +++++++++++++++++++++
 src/paimon/core/mergetree/spill_writer.cpp         | 127 +++++++++
 src/paimon/core/mergetree/spill_writer.h           |  82 ++++++
 10 files changed, 1297 insertions(+)

diff --git a/src/paimon/core/mergetree/in_memory_sort_buffer.cpp 
b/src/paimon/core/mergetree/in_memory_sort_buffer.cpp
new file mode 100644
index 0000000..66c6dce
--- /dev/null
+++ b/src/paimon/core/mergetree/in_memory_sort_buffer.cpp
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/mergetree/in_memory_sort_buffer.h"
+
+#include <cassert>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/core/io/key_value_in_memory_record_reader.h"
+#include "paimon/core/io/key_value_record_reader.h"
+#include "paimon/data/decimal.h"
+
+namespace paimon {
+
+InMemorySortBuffer::InMemorySortBuffer(int64_t last_sequence_number,
+                                       const std::shared_ptr<arrow::DataType>& 
value_type,
+                                       const std::vector<std::string>& 
trimmed_primary_keys,
+                                       const std::vector<std::string>& 
user_defined_sequence_fields,
+                                       bool sequence_fields_ascending,
+                                       const 
std::shared_ptr<FieldsComparator>& key_comparator,
+                                       uint64_t write_buffer_size,
+                                       const std::shared_ptr<MemoryPool>& pool)
+    : pool_(pool),
+      value_type_(value_type),
+      trimmed_primary_keys_(trimmed_primary_keys),
+      user_defined_sequence_fields_(user_defined_sequence_fields),
+      sequence_fields_ascending_(sequence_fields_ascending),
+      key_comparator_(key_comparator),
+      write_buffer_size_(write_buffer_size),
+      next_sequence_number_(last_sequence_number + 1) {}
+
+void InMemorySortBuffer::Clear() {
+    buffered_batches_.clear();
+    current_memory_in_bytes_ = 0;
+    total_row_count_ = 0;
+}
+
+uint64_t InMemorySortBuffer::GetMemorySize() const {
+    return current_memory_in_bytes_;
+}
+
+Result<bool> InMemorySortBuffer::FlushMemory() {
+    return false;
+}
+
+Result<bool> InMemorySortBuffer::Write(std::unique_ptr<RecordBatch>&& 
moved_batch) {
+    if (ArrowArrayIsReleased(moved_batch->GetData())) {
+        return Status::Invalid("invalid batch: data is released");
+    }
+    std::unique_ptr<RecordBatch> batch = std::move(moved_batch);
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
arrow_array,
+                                      arrow::ImportArray(batch->GetData(), 
value_type_));
+    auto value_struct_array = 
std::dynamic_pointer_cast<arrow::StructArray>(arrow_array);
+    if (value_struct_array == nullptr) {
+        return Status::Invalid("invalid RecordBatch: cannot cast to 
StructArray");
+    }
+    PAIMON_ASSIGN_OR_RAISE(int64_t memory_in_bytes, 
EstimateMemoryUse(value_struct_array));
+    current_memory_in_bytes_ += static_cast<uint64_t>(memory_in_bytes);
+
+    BufferedWriteBatch buffered_batch;
+    buffered_batch.first_sequence_number = next_sequence_number_;
+    buffered_batch.struct_array = std::move(value_struct_array);
+    buffered_batch.row_kinds = batch->GetRowKind();
+    next_sequence_number_ += buffered_batch.struct_array->length();
+    total_row_count_ += buffered_batch.struct_array->length();
+    buffered_batches_.push_back(std::move(buffered_batch));
+    if (total_row_count_ > 0) {
+        estimated_memory_use_for_each_row_ = current_memory_in_bytes_ / 
total_row_count_;
+    }
+    return current_memory_in_bytes_ < write_buffer_size_;
+}
+
+Result<std::vector<std::unique_ptr<KeyValueRecordReader>>> 
InMemorySortBuffer::CreateReaders() {
+    std::vector<std::unique_ptr<KeyValueRecordReader>> readers;
+    if (buffered_batches_.empty()) {
+        return readers;
+    }
+
+    readers.reserve(buffered_batches_.size());
+    for (auto& buffered_batch : buffered_batches_) {
+        auto in_memory_reader = std::make_unique<KeyValueInMemoryRecordReader>(
+            buffered_batch.first_sequence_number, buffered_batch.struct_array,
+            buffered_batch.row_kinds, trimmed_primary_keys_, 
user_defined_sequence_fields_,
+            sequence_fields_ascending_, key_comparator_, pool_);
+        readers.push_back(std::move(in_memory_reader));
+    }
+    return readers;
+}
+
+bool InMemorySortBuffer::HasData() const {
+    return !buffered_batches_.empty();
+}
+
+uint64_t InMemorySortBuffer::GetEstimateMemoryUseForEachRow() const {
+    return estimated_memory_use_for_each_row_;
+}
+
+// TODO(jinli.zjw): Consider making the memory estimation more accurate.
+// https://github.com/alibaba/paimon-cpp/pull/206#discussion_r3021325389
+Result<int64_t> InMemorySortBuffer::EstimateMemoryUse(const 
std::shared_ptr<arrow::Array>& array) {
+    arrow::Type::type type = array->type()->id();
+    int64_t null_bits_size_in_bytes = (array->length() + 7) / 8;
+    switch (type) {
+        case arrow::Type::type::BOOL:
+            return null_bits_size_in_bytes + array->length() * sizeof(bool);
+        case arrow::Type::type::INT8:
+            return null_bits_size_in_bytes + array->length() * sizeof(int8_t);
+        case arrow::Type::type::INT16:
+            return null_bits_size_in_bytes + array->length() * sizeof(int16_t);
+        case arrow::Type::type::INT32:
+            return null_bits_size_in_bytes + array->length() * sizeof(int32_t);
+        case arrow::Type::type::DATE32:
+            return null_bits_size_in_bytes + array->length() * sizeof(int32_t);
+        case arrow::Type::type::INT64:
+            return null_bits_size_in_bytes + array->length() * sizeof(int64_t);
+        case arrow::Type::type::FLOAT:
+            return null_bits_size_in_bytes + array->length() * sizeof(float);
+        case arrow::Type::type::DOUBLE:
+            return null_bits_size_in_bytes + array->length() * sizeof(double);
+        case arrow::Type::type::TIMESTAMP:
+            return null_bits_size_in_bytes + array->length() * sizeof(int64_t);
+        case arrow::Type::type::DECIMAL:
+            return null_bits_size_in_bytes + array->length() * 
sizeof(Decimal::int128_t);
+        case arrow::Type::type::STRING:
+        case arrow::Type::type::BINARY: {
+            auto binary_array =
+                arrow::internal::checked_cast<const 
arrow::BinaryArray*>(array.get());
+            assert(binary_array);
+            int64_t value_length = binary_array->total_values_length();
+            int64_t offset_length = array->length() * sizeof(int32_t);
+            return null_bits_size_in_bytes + value_length + offset_length;
+        }
+        case arrow::Type::type::LIST: {
+            auto list_array = arrow::internal::checked_cast<const 
arrow::ListArray*>(array.get());
+            assert(list_array);
+            PAIMON_ASSIGN_OR_RAISE(int64_t value_mem, 
EstimateMemoryUse(list_array->values()));
+            return null_bits_size_in_bytes + value_mem;
+        }
+        case arrow::Type::type::MAP: {
+            auto map_array = arrow::internal::checked_cast<const 
arrow::MapArray*>(array.get());
+            assert(map_array);
+            PAIMON_ASSIGN_OR_RAISE(int64_t key_mem, 
EstimateMemoryUse(map_array->keys()));
+            PAIMON_ASSIGN_OR_RAISE(int64_t item_mem, 
EstimateMemoryUse(map_array->items()));
+            return null_bits_size_in_bytes + key_mem + item_mem;
+        }
+        case arrow::Type::type::STRUCT: {
+            auto struct_array =
+                arrow::internal::checked_cast<const 
arrow::StructArray*>(array.get());
+            assert(struct_array);
+            int64_t struct_mem = 0;
+            for (const auto& field : struct_array->fields()) {
+                PAIMON_ASSIGN_OR_RAISE(int64_t field_mem, 
EstimateMemoryUse(field));
+                struct_mem += field_mem;
+            }
+            return null_bits_size_in_bytes + struct_mem;
+        }
+        default:
+            return Status::Invalid(fmt::format("Do not support type {} in 
EstimateMemoryUse",
+                                               array->type()->ToString()));
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/mergetree/in_memory_sort_buffer.h 
b/src/paimon/core/mergetree/in_memory_sort_buffer.h
new file mode 100644
index 0000000..3632bb8
--- /dev/null
+++ b/src/paimon/core/mergetree/in_memory_sort_buffer.h
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/type_fwd.h"
+#include "paimon/core/mergetree/sort_buffer.h"
+#include "paimon/record_batch.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class Array;
+class DataType;
+class Schema;
+class StructArray;
+}  // namespace arrow
+
+namespace paimon {
+class FieldsComparator;
+class KeyValueRecordReader;
+class MemoryPool;
+
+/// A buffered write batch that records the data and the first sequence number 
of the batch.
+struct BufferedWriteBatch {
+    int64_t first_sequence_number = 0;
+    std::shared_ptr<arrow::StructArray> struct_array;
+    std::vector<RecordBatch::RowKind> row_kinds;
+};
+
+/// Pure in-memory SortBuffer: buffers RecordBatches and exposes them as sorted
+/// KeyValueRecordReaders. Does not support spill to disk.
+class InMemorySortBuffer : public SortBuffer {
+ public:
+    InMemorySortBuffer(int64_t last_sequence_number,
+                       const std::shared_ptr<arrow::DataType>& value_type,
+                       const std::vector<std::string>& trimmed_primary_keys,
+                       const std::vector<std::string>& 
user_defined_sequence_fields,
+                       bool sequence_fields_ascending,
+                       const std::shared_ptr<FieldsComparator>& key_comparator,
+                       uint64_t write_buffer_size, const 
std::shared_ptr<MemoryPool>& pool);
+
+    void Clear() override;
+    uint64_t GetMemorySize() const override;
+    Result<bool> FlushMemory() override;
+    Result<bool> Write(std::unique_ptr<RecordBatch>&& batch) override;
+    Result<std::vector<std::unique_ptr<KeyValueRecordReader>>> CreateReaders() 
override;
+    bool HasData() const override;
+    /// Get the estimated average memory usage per row in bytes.
+    uint64_t GetEstimateMemoryUseForEachRow() const;
+
+ private:
+    /// Estimate memory usage of an Arrow array.
+    static Result<int64_t> EstimateMemoryUse(const 
std::shared_ptr<arrow::Array>& array);
+
+    const std::shared_ptr<MemoryPool> pool_;
+    const std::shared_ptr<arrow::DataType> value_type_;
+    const std::vector<std::string> trimmed_primary_keys_;
+    const std::vector<std::string> user_defined_sequence_fields_;
+    const bool sequence_fields_ascending_;
+    const std::shared_ptr<FieldsComparator> key_comparator_;
+    const uint64_t write_buffer_size_;
+
+    std::vector<BufferedWriteBatch> buffered_batches_;
+    uint64_t current_memory_in_bytes_ = 0;
+    uint64_t estimated_memory_use_for_each_row_ = 0;
+    int64_t total_row_count_ = 0;
+    int64_t next_sequence_number_ = 0;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/mergetree/sort_buffer.h 
b/src/paimon/core/mergetree/sort_buffer.h
new file mode 100644
index 0000000..96ef36d
--- /dev/null
+++ b/src/paimon/core/mergetree/sort_buffer.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "paimon/record_batch.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class KeyValueRecordReader;
+
+/// SortBuffer is the interface for managing buffered records with sorting 
capability.
+/// It abstracts the in-memory and external sort buffer implementations.
+class SortBuffer {
+ public:
+    virtual ~SortBuffer() = default;
+
+    /// Reset the buffer, releasing all in-memory batches and on-disk spill 
files.
+    virtual void Clear() = 0;
+
+    /// Return the current memory usage in bytes.
+    virtual uint64_t GetMemorySize() const = 0;
+
+    /// Spill in-memory data to disk if supported by this implementation.
+    /// @return true for FlushMemory success with the buffer can accept more 
data afterwards.
+    ///         false for FlushMemory success with there is no more quota for 
next flush. (on-disk)
+    ///         false for the FlushMemory operation is not supported. 
(in-memory only)
+    ///         Status::Invalid for flush failure.
+    virtual Result<bool> FlushMemory() = 0;
+
+    /// Append a RecordBatch to the buffer.
+    /// @return true for write success with the buffer can accept more data 
afterwards.
+    ///         false for write success with the buffer is no more quota 
(memory or disk) for next
+    ///         write.
+    ///         Status::Invalid for write failure.
+    virtual Result<bool> Write(std::unique_ptr<RecordBatch>&& batch) = 0;
+
+    /// Create sorted KeyValueRecordReaders from all buffered data (in-memory 
+ on-disk).
+    /// This does not clear the buffer; the caller should invoke Clear() 
afterwards.
+    virtual Result<std::vector<std::unique_ptr<KeyValueRecordReader>>> 
CreateReaders() = 0;
+
+    /// Return true if there is any data to output (in-memory or on-disk).
+    virtual bool HasData() const = 0;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/mergetree/spill_channel_manager.h 
b/src/paimon/core/mergetree/spill_channel_manager.h
new file mode 100644
index 0000000..6f22c12
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_channel_manager.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <unordered_set>
+
+#include "paimon/core/disk/file_io_channel.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class SpillChannelManager {
+ public:
+    SpillChannelManager(const std::shared_ptr<FileSystem>& fs, size_t 
initial_capacity) : fs_(fs) {
+        channels_.reserve(initial_capacity);
+    }
+
+    void AddChannel(const FileIOChannel::ID& channel_id) {
+        channels_.emplace(channel_id);
+    }
+
+    Status DeleteChannel(const FileIOChannel::ID& channel_id) {
+        PAIMON_RETURN_NOT_OK(fs_->Delete(channel_id.GetPath()));
+        channels_.erase(channel_id);
+        return Status::OK();
+    }
+
+    void Reset() {
+        for (const auto& channel : channels_) {
+            [[maybe_unused]] auto status = fs_->Delete(channel.GetPath());
+        }
+        channels_.clear();
+    }
+
+    const std::unordered_set<FileIOChannel::ID, FileIOChannel::ID::Hash>& 
GetChannels() const {
+        return channels_;
+    }
+
+ private:
+    std::unordered_set<FileIOChannel::ID, FileIOChannel::ID::Hash> channels_;
+    std::shared_ptr<FileSystem> fs_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/mergetree/spill_channel_manager_test.cpp 
b/src/paimon/core/mergetree/spill_channel_manager_test.cpp
new file mode 100644
index 0000000..05e5de6
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_channel_manager_test.cpp
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/mergetree/spill_channel_manager.h"
+
+#include <memory>
+
+#include "gtest/gtest.h"
+#include "paimon/core/disk/io_manager.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class SpillChannelManagerTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        test_dir_ = UniqueTestDirectory::Create();
+        file_system_ = test_dir_->GetFileSystem();
+        io_manager_ = std::make_unique<IOManager>(test_dir_->Str(), 
test_dir_->GetFileSystem());
+    }
+
+    FileIOChannel::ID CreateTempFile() {
+        EXPECT_OK_AND_ASSIGN(auto channel_id, io_manager_->CreateChannel());
+        // Create the actual file on disk
+        EXPECT_OK_AND_ASSIGN(auto out,
+                             file_system_->Create(channel_id.GetPath(), 
/*overwrite=*/false));
+        EXPECT_OK(out->Close());
+        return channel_id;
+    }
+
+ protected:
+    std::shared_ptr<FileSystem> file_system_;
+    std::unique_ptr<UniqueTestDirectory> test_dir_;
+    std::unique_ptr<IOManager> io_manager_;
+};
+
+TEST_F(SpillChannelManagerTest, AddAndGetChannels) {
+    SpillChannelManager manager(file_system_, 128);
+
+    auto channel1 = CreateTempFile();
+    auto channel2 = CreateTempFile();
+
+    manager.AddChannel(channel1);
+    manager.AddChannel(channel2);
+
+    const auto& channels = manager.GetChannels();
+    ASSERT_EQ(channels.size(), 2);
+    ASSERT_GT(channels.count(channel1), 0);
+    ASSERT_GT(channels.count(channel2), 0);
+}
+
+TEST_F(SpillChannelManagerTest, DeleteChannelRemovesFileAndEntry) {
+    SpillChannelManager manager(file_system_, 128);
+
+    auto channel = CreateTempFile();
+    manager.AddChannel(channel);
+
+    ASSERT_OK_AND_ASSIGN(bool exists_before, 
file_system_->Exists(channel.GetPath()));
+    ASSERT_TRUE(exists_before);
+
+    ASSERT_OK(manager.DeleteChannel(channel));
+    ASSERT_EQ(manager.GetChannels().size(), 0);
+    ASSERT_OK_AND_ASSIGN(bool exists_after, 
file_system_->Exists(channel.GetPath()));
+    ASSERT_FALSE(exists_after);
+}
+
+TEST_F(SpillChannelManagerTest, ResetDeletesAllFiles) {
+    SpillChannelManager manager(file_system_, 128);
+
+    auto channel1 = CreateTempFile();
+    auto channel2 = CreateTempFile();
+    auto channel3 = CreateTempFile();
+
+    manager.AddChannel(channel1);
+    manager.AddChannel(channel2);
+    manager.AddChannel(channel3);
+
+    ASSERT_OK_AND_ASSIGN(bool e1, file_system_->Exists(channel1.GetPath()));
+    ASSERT_OK_AND_ASSIGN(bool e2, file_system_->Exists(channel2.GetPath()));
+    ASSERT_OK_AND_ASSIGN(bool e3, file_system_->Exists(channel3.GetPath()));
+    ASSERT_TRUE(e1);
+    ASSERT_TRUE(e2);
+    ASSERT_TRUE(e3);
+
+    manager.Reset();
+
+    ASSERT_OK_AND_ASSIGN(bool a1, file_system_->Exists(channel1.GetPath()));
+    ASSERT_OK_AND_ASSIGN(bool a2, file_system_->Exists(channel2.GetPath()));
+    ASSERT_OK_AND_ASSIGN(bool a3, file_system_->Exists(channel3.GetPath()));
+    ASSERT_FALSE(a1);
+    ASSERT_FALSE(a2);
+    ASSERT_FALSE(a3);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/mergetree/spill_reader.cpp 
b/src/paimon/core/mergetree/spill_reader.cpp
new file mode 100644
index 0000000..0b338c2
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_reader.cpp
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/mergetree/spill_reader.h"
+
+#include "paimon/common/data/columnar/columnar_row_ref.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+
+namespace paimon {
+
+SpillReader::SpillReader(const std::shared_ptr<FileSystem>& fs,
+                         const std::shared_ptr<arrow::Schema>& key_schema,
+                         const std::shared_ptr<arrow::Schema>& value_schema, 
bool use_threads,
+                         const std::shared_ptr<MemoryPool>& pool)
+    : fs_(fs),
+      key_schema_(key_schema),
+      value_schema_(value_schema),
+      pool_(pool),
+      arrow_pool_(GetArrowPool(pool)),
+      use_threads_(use_threads),
+      metrics_(std::make_shared<MetricsImpl>()) {}
+
+Result<std::unique_ptr<SpillReader>> SpillReader::Create(
+    const std::shared_ptr<FileSystem>& fs, const 
std::shared_ptr<arrow::Schema>& key_schema,
+    const std::shared_ptr<arrow::Schema>& value_schema, bool use_threads,
+    const FileIOChannel::ID& channel_id, const std::shared_ptr<MemoryPool>& 
pool) {
+    std::unique_ptr<SpillReader> reader(
+        new SpillReader(fs, key_schema, value_schema, use_threads, pool));
+    PAIMON_RETURN_NOT_OK(reader->Open(channel_id));
+    return reader;
+}
+
+Status SpillReader::Open(const FileIOChannel::ID& channel_id) {
+    const std::string& file_path = channel_id.GetPath();
+    PAIMON_ASSIGN_OR_RAISE(in_stream_, fs_->Open(file_path));
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStatus> file_status, 
fs_->GetFileStatus(file_path));
+    uint64_t file_len = file_status->GetLen();
+    arrow_input_stream_adapter_ =
+        std::make_shared<ArrowInputStreamAdapter>(in_stream_, arrow_pool_, 
file_len);
+    auto ipc_read_options = arrow::ipc::IpcReadOptions::Defaults();
+    ipc_read_options.memory_pool = arrow_pool_.get();
+    ipc_read_options.use_threads = use_threads_;
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+        arrow_reader_,
+        arrow::ipc::RecordBatchFileReader::Open(arrow_input_stream_adapter_, 
ipc_read_options));
+    num_record_batches_ = arrow_reader_->num_record_batches();
+    current_batch_index_ = 0;
+    return Status::OK();
+}
+
+SpillReader::Iterator::Iterator(SpillReader* reader) : reader_(reader) {}
+
+Result<bool> SpillReader::Iterator::HasNext() const {
+    return cursor_ < reader_->batch_length_;
+}
+
+Result<KeyValue> SpillReader::Iterator::Next() {
+    PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind,
+                           
RowKind::FromByteValue(reader_->row_kind_array_->Value(cursor_)));
+    int64_t sequence_number = reader_->sequence_number_array_->Value(cursor_);
+    auto key = std::make_unique<ColumnarRowRef>(reader_->key_ctx_, cursor_);
+    auto value = std::make_unique<ColumnarRowRef>(reader_->value_ctx_, 
cursor_);
+    cursor_++;
+    return KeyValue(row_kind, sequence_number, KeyValue::UNKNOWN_LEVEL, 
std::move(key),
+                    std::move(value));
+}
+
+Result<std::unique_ptr<KeyValueRecordReader::Iterator>> 
SpillReader::NextBatch() {
+    Reset();
+    if (current_batch_index_ >= num_record_batches_) {
+        return std::unique_ptr<KeyValueRecordReader::Iterator>();
+    }
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::RecordBatch> 
record_batch,
+                                      
arrow_reader_->ReadRecordBatch(current_batch_index_));
+    current_batch_index_++;
+
+    batch_length_ = record_batch->num_rows();
+
+    auto sequence_number_col =
+        record_batch->GetColumnByName(SpecialFields::SequenceNumber().Name());
+    if (!sequence_number_col) {
+        return Status::Invalid("cannot find _SEQUENCE_NUMBER column in spill 
file");
+    }
+    sequence_number_array_ =
+        
std::dynamic_pointer_cast<arrow::NumericArray<arrow::Int64Type>>(sequence_number_col);
+    if (!sequence_number_array_) {
+        return Status::Invalid("cannot cast _SEQUENCE_NUMBER column to int64 
arrow array");
+    }
+
+    auto value_kind_col = 
record_batch->GetColumnByName(SpecialFields::ValueKind().Name());
+    if (!value_kind_col) {
+        return Status::Invalid("cannot find _VALUE_KIND column in spill file");
+    }
+    row_kind_array_ =
+        
std::dynamic_pointer_cast<arrow::NumericArray<arrow::Int8Type>>(value_kind_col);
+    if (!row_kind_array_) {
+        return Status::Invalid("cannot cast _VALUE_KIND column to int8 arrow 
array");
+    }
+
+    arrow::ArrayVector key_fields;
+    key_fields.reserve(key_schema_->num_fields());
+    for (const auto& key_field : key_schema_->fields()) {
+        auto col = record_batch->GetColumnByName(key_field->name());
+        if (!col) {
+            return Status::Invalid("cannot find key field " + 
key_field->name() + " in spill file");
+        }
+        key_fields.emplace_back(col);
+    }
+
+    arrow::ArrayVector value_fields;
+    value_fields.reserve(value_schema_->num_fields());
+    for (const auto& value_field : value_schema_->fields()) {
+        auto col = record_batch->GetColumnByName(value_field->name());
+        if (!col) {
+            return Status::Invalid("cannot find value field " + 
value_field->name() +
+                                   " in spill file");
+        }
+        value_fields.emplace_back(col);
+    }
+
+    key_ctx_ = std::make_shared<ColumnarBatchContext>(key_fields, pool_);
+    value_ctx_ = std::make_shared<ColumnarBatchContext>(value_fields, pool_);
+
+    return std::make_unique<SpillReader::Iterator>(this);
+}
+
+std::shared_ptr<Metrics> SpillReader::GetReaderMetrics() const {
+    return metrics_;
+}
+
+void SpillReader::Close() {
+    Reset();
+    arrow_reader_.reset();
+    arrow_input_stream_adapter_.reset();
+    if (in_stream_) {
+        [[maybe_unused]] auto status = in_stream_->Close();
+        in_stream_.reset();
+    }
+}
+
+void SpillReader::Reset() {
+    key_ctx_.reset();
+    value_ctx_.reset();
+    sequence_number_array_.reset();
+    row_kind_array_.reset();
+    batch_length_ = 0;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/mergetree/spill_reader.h 
b/src/paimon/core/mergetree/spill_reader.h
new file mode 100644
index 0000000..6dba6fd
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_reader.h
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/array/array_primitive.h"
+#include "arrow/ipc/api.h"
+#include "paimon/common/data/columnar/columnar_batch_context.h"
+#include "paimon/core/disk/file_io_channel.h"
+#include "paimon/core/io/key_value_record_reader.h"
+#include "paimon/core/key_value.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace arrow {
+class MemoryPool;
+}  // namespace arrow
+
+namespace paimon {
+
+class ArrowInputStreamAdapter;
+class Metrics;
+
+class SpillReader : public KeyValueRecordReader {
+ public:
+    static Result<std::unique_ptr<SpillReader>> Create(
+        const std::shared_ptr<FileSystem>& fs, const 
std::shared_ptr<arrow::Schema>& key_schema,
+        const std::shared_ptr<arrow::Schema>& value_schema, bool use_threads,
+        const FileIOChannel::ID& channel_id, const 
std::shared_ptr<MemoryPool>& pool);
+
+    SpillReader(const SpillReader&) = delete;
+    SpillReader& operator=(const SpillReader&) = delete;
+
+    class Iterator : public KeyValueRecordReader::Iterator {
+     public:
+        explicit Iterator(SpillReader* reader);
+        Result<bool> HasNext() const override;
+        Result<KeyValue> Next() override;
+
+     private:
+        int64_t cursor_ = 0;
+        SpillReader* reader_ = nullptr;
+    };
+
+    Result<std::unique_ptr<KeyValueRecordReader::Iterator>> NextBatch() 
override;
+    std::shared_ptr<Metrics> GetReaderMetrics() const override;
+    void Close() override;
+
+ private:
+    SpillReader(const std::shared_ptr<FileSystem>& fs,
+                const std::shared_ptr<arrow::Schema>& key_schema,
+                const std::shared_ptr<arrow::Schema>& value_schema, bool 
use_threads,
+                const std::shared_ptr<MemoryPool>& pool);
+
+    Status Open(const FileIOChannel::ID& channel_id);
+    void Reset();
+
+    std::shared_ptr<FileSystem> fs_;
+    std::shared_ptr<arrow::Schema> key_schema_;
+    std::shared_ptr<arrow::Schema> value_schema_;
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+    bool use_threads_;
+    std::shared_ptr<Metrics> metrics_;
+
+    std::shared_ptr<InputStream> in_stream_;
+    std::shared_ptr<ArrowInputStreamAdapter> arrow_input_stream_adapter_;
+    std::shared_ptr<arrow::ipc::RecordBatchFileReader> arrow_reader_;
+    int32_t current_batch_index_ = 0;
+    int32_t num_record_batches_ = 0;
+
+    int64_t batch_length_ = 0;
+    std::shared_ptr<arrow::NumericArray<arrow::Int64Type>> 
sequence_number_array_;
+    std::shared_ptr<arrow::NumericArray<arrow::Int8Type>> row_kind_array_;
+    std::shared_ptr<ColumnarBatchContext> key_ctx_;
+    std::shared_ptr<ColumnarBatchContext> value_ctx_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/mergetree/spill_reader_writer_test.cpp 
b/src/paimon/core/mergetree/spill_reader_writer_test.cpp
new file mode 100644
index 0000000..e363389
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_reader_writer_test.cpp
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/disk/io_manager.h"
+#include "paimon/core/mergetree/spill_channel_manager.h"
+#include "paimon/core/mergetree/spill_reader.h"
+#include "paimon/core/mergetree/spill_writer.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class SpillReaderWriterTest : public ::testing::TestWithParam<std::string> {
+ public:
+    void SetUp() override {
+        read_pool_ = GetDefaultPool();
+        write_pool_ = GetDefaultPool();
+        test_dir_ = UniqueTestDirectory::Create();
+        file_system_ = test_dir_->GetFileSystem();
+
+        io_manager_ = std::make_unique<IOManager>(test_dir_->Str(), 
test_dir_->GetFileSystem());
+        ASSERT_OK_AND_ASSIGN(channel_enumerator_, 
io_manager_->CreateChannelEnumerator());
+        spill_channel_manager_ = 
std::make_shared<SpillChannelManager>(file_system_, 128);
+
+        // Build write schema: [_SEQUENCE_NUMBER, _VALUE_KIND, key fields..., 
value fields...]
+        value_fields_ = {DataField(0, arrow::field("f0", arrow::utf8())),
+                         DataField(1, arrow::field("f1", arrow::int32()))};
+        key_fields_ = {DataField(0, arrow::field("f0", arrow::utf8()))};
+
+        key_schema_ = DataField::ConvertDataFieldsToArrowSchema(key_fields_);
+        value_schema_ = 
DataField::ConvertDataFieldsToArrowSchema(value_fields_);
+        write_schema_ = 
SpecialFields::CompleteSequenceAndValueKindField(value_schema_);
+        write_type_ = arrow::struct_(write_schema_->fields());
+    }
+
+    std::shared_ptr<arrow::RecordBatch> CreateRecordBatch(const std::string& 
json_data,
+                                                          int64_t num_rows) 
const {
+        auto array = arrow::ipc::internal::json::ArrayFromJSON(write_type_, 
json_data).ValueOrDie();
+        auto struct_array = 
std::dynamic_pointer_cast<arrow::StructArray>(array);
+        return arrow::RecordBatch::Make(write_schema_, num_rows, 
struct_array->fields());
+    }
+
+    Result<std::unique_ptr<SpillWriter>> CreateSpillWriter() const {
+        return SpillWriter::Create(file_system_, write_schema_, 
channel_enumerator_,
+                                   spill_channel_manager_, GetParam(), 
/*compression_level=*/1,
+                                   /*use_threads=*/false, write_pool_);
+    }
+
+    FileIOChannel::ID WriteSpillFile(
+        const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
+        EXPECT_OK_AND_ASSIGN(auto writer, CreateSpillWriter());
+        for (const auto& batch : batches) {
+            EXPECT_OK(writer->WriteBatch(batch));
+        }
+        EXPECT_OK(writer->Close());
+        return writer->GetChannelId();
+    }
+
+    Result<std::unique_ptr<SpillReader>> CreateSpillReader(
+        const FileIOChannel::ID& channel_id) const {
+        return SpillReader::Create(file_system_, key_schema_, value_schema_, 
/*use_threads=*/false,
+                                   channel_id, read_pool_);
+    }
+
+ protected:
+    std::shared_ptr<MemoryPool> read_pool_;
+    std::shared_ptr<MemoryPool> write_pool_;
+    std::shared_ptr<FileSystem> file_system_;
+    std::unique_ptr<UniqueTestDirectory> test_dir_;
+    std::unique_ptr<IOManager> io_manager_;
+    std::shared_ptr<FileIOChannel::Enumerator> channel_enumerator_;
+    std::shared_ptr<SpillChannelManager> spill_channel_manager_;
+
+    std::vector<DataField> value_fields_;
+    std::vector<DataField> key_fields_;
+    std::shared_ptr<arrow::Schema> write_schema_;
+    std::shared_ptr<arrow::DataType> write_type_;
+    std::shared_ptr<arrow::Schema> key_schema_;
+    std::shared_ptr<arrow::Schema> value_schema_;
+};
+
+TEST_P(SpillReaderWriterTest, TestWriteBatch) {
+    FileIOChannel::ID channel_id_1;
+    FileIOChannel::ID channel_id_2;
+
+    // First writer
+    {
+        ASSERT_OK_AND_ASSIGN(auto writer, CreateSpillWriter());
+
+        auto batch = CreateRecordBatch(R"([
+            [0, 1, "Alice", 10],
+            [1, 1, "Bob",   20]
+        ])",
+                                       2);
+        ASSERT_OK(writer->WriteBatch(batch));
+        ASSERT_OK_AND_ASSIGN(int64_t file_size, writer->GetFileSize());
+        ASSERT_GT(file_size, 0);
+        ASSERT_OK(writer->Close());
+        channel_id_1 = writer->GetChannelId();
+        ASSERT_GT(write_pool_->MaxMemoryUsage(), 0);
+    }
+    // Second writer
+    {
+        ASSERT_OK_AND_ASSIGN(auto writer, CreateSpillWriter());
+
+        auto batch_a = CreateRecordBatch(R"([
+            [2, 1, "Carol", 30],
+            [3, 1, "Dave",  40]
+        ])",
+                                         2);
+        auto batch_b = CreateRecordBatch(R"([
+            [4, 1, "Eve",   50],
+            [5, 1, "Frank", 60],
+            [6, 1, "Grace", 70]
+        ])",
+                                         3);
+        ASSERT_OK(writer->WriteBatch(batch_a));
+        ASSERT_OK_AND_ASSIGN(int64_t size_before, writer->GetFileSize());
+        ASSERT_OK(writer->WriteBatch(batch_b));
+        ASSERT_OK_AND_ASSIGN(int64_t size_after, writer->GetFileSize());
+        ASSERT_GT(size_after, size_before);
+        ASSERT_OK(writer->Close());
+        channel_id_2 = writer->GetChannelId();
+    }
+    // Read back first writer's data
+    {
+        ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id_1));
+
+        std::vector<std::string_view> expected_keys = {"Alice", "Bob"};
+        int total_rows = 0;
+        int batch_count = 0;
+        while (true) {
+            ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch());
+            if (iter == nullptr) {
+                break;
+            }
+            batch_count++;
+            while (true) {
+                ASSERT_OK_AND_ASSIGN(bool has_next, iter->HasNext());
+                if (!has_next) {
+                    break;
+                }
+                ASSERT_OK_AND_ASSIGN(auto kv, iter->Next());
+                ASSERT_EQ(kv.key->GetStringView(0), expected_keys[total_rows]);
+                total_rows++;
+            }
+        }
+        ASSERT_EQ(batch_count, 1);
+        ASSERT_EQ(total_rows, 2);
+        reader->Close();
+        ASSERT_GT(read_pool_->MaxMemoryUsage(), 0);
+    }
+    // Read back second writer's data
+    {
+        ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id_2));
+
+        std::vector<std::string_view> expected_keys = {"Carol", "Dave", "Eve", 
"Frank", "Grace"};
+        int total_rows = 0;
+        int batch_count = 0;
+        while (true) {
+            ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch());
+            if (iter == nullptr) {
+                break;
+            }
+            batch_count++;
+            while (true) {
+                ASSERT_OK_AND_ASSIGN(bool has_next, iter->HasNext());
+                if (!has_next) {
+                    break;
+                }
+                ASSERT_OK_AND_ASSIGN(auto kv, iter->Next());
+                ASSERT_EQ(kv.key->GetStringView(0), expected_keys[total_rows]);
+                total_rows++;
+            }
+        }
+        ASSERT_EQ(batch_count, 2);
+        ASSERT_EQ(total_rows, 5);
+        reader->Close();
+    }
+}
+
+TEST_P(SpillReaderWriterTest, TestReadBatch) {
+    {
+        auto batch1 = CreateRecordBatch(R"([[0, 1, "Alice", 10], [1, 1, "Bob", 
20]])", 2);
+        auto batch2 =
+            CreateRecordBatch(R"([[2, 1, "Carol", 30], [3, 2, "Dave", 40], [4, 
3, "Eve", 50]])", 3);
+
+        auto channel_id = WriteSpillFile({batch1, batch2});
+        ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id));
+
+        std::vector<std::string_view> expected_keys = {"Alice", "Bob", 
"Carol", "Dave", "Eve"};
+        std::vector<int32_t> expected_values = {10, 20, 30, 40, 50};
+        std::vector<int64_t> expected_seqs = {0, 1, 2, 3, 4};
+        std::vector<int8_t> expected_kinds = {1, 1, 1, 2, 3};
+
+        int total_rows = 0;
+        int batch_count = 0;
+        while (true) {
+            ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch());
+            if (iter == nullptr) break;
+            batch_count++;
+            while (true) {
+                ASSERT_OK_AND_ASSIGN(bool has_next, iter->HasNext());
+                if (!has_next) {
+                    break;
+                }
+                ASSERT_OK_AND_ASSIGN(auto kv, iter->Next());
+                ASSERT_EQ(kv.key->GetStringView(0), expected_keys[total_rows]);
+                ASSERT_EQ(kv.value->GetStringView(0), 
expected_keys[total_rows]);
+                ASSERT_EQ(kv.value->GetInt(1), expected_values[total_rows]);
+                ASSERT_EQ(kv.sequence_number, expected_seqs[total_rows]);
+                ASSERT_EQ(kv.value_kind->ToByteValue(), 
expected_kinds[total_rows]);
+                total_rows++;
+            }
+        }
+        ASSERT_EQ(batch_count, 2);
+        ASSERT_EQ(total_rows, 5);
+        reader->Close();
+    }
+    {
+        auto empty_batch =
+            arrow::RecordBatch::Make(write_schema_, 0,
+                                     
{arrow::MakeEmptyArray(arrow::int64()).ValueOrDie(),
+                                      
arrow::MakeEmptyArray(arrow::int8()).ValueOrDie(),
+                                      
arrow::MakeEmptyArray(arrow::utf8()).ValueOrDie(),
+                                      
arrow::MakeEmptyArray(arrow::int32()).ValueOrDie()});
+
+        auto channel_id = WriteSpillFile({empty_batch});
+        ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id));
+
+        ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch());
+        if (iter != nullptr) {
+            ASSERT_OK_AND_ASSIGN(bool has_next, iter->HasNext());
+            ASSERT_FALSE(has_next);
+        }
+        reader->Close();
+    }
+}
+
+TEST_P(SpillReaderWriterTest, TestWriterCloseIdempotency) {
+    ASSERT_OK_AND_ASSIGN(auto writer, CreateSpillWriter());
+    auto batch = CreateRecordBatch(R"([[0, 1, "Alice", 10]])", 1);
+    ASSERT_OK(writer->WriteBatch(batch));
+    ASSERT_OK(writer->Close());
+    // second close should be idempotent
+    ASSERT_OK(writer->Close());
+
+    auto channel_id = writer->GetChannelId();
+    ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id));
+
+    // Read and verify data is intact.
+    ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch());
+    ASSERT_NE(iter, nullptr);
+    ASSERT_OK_AND_ASSIGN(auto kv, iter->Next());
+    ASSERT_EQ(kv.key->GetStringView(0), "Alice");
+    ASSERT_OK_AND_ASSIGN(bool has_next, iter->HasNext());
+    ASSERT_FALSE(has_next);
+    reader->Close();
+}
+
+TEST_P(SpillReaderWriterTest, TestReaderSchemaMismatchErrors) {
+    auto batch = CreateRecordBatch(R"([[0, 1, "Alice", 10], [1, 1, "Bob", 
20]])", 2);
+    auto channel_id = WriteSpillFile({batch});
+    {
+        auto wrong_key_schema = arrow::schema({arrow::field("nonexistent_key", 
arrow::utf8())});
+        ASSERT_OK_AND_ASSIGN(auto reader,
+                             SpillReader::Create(file_system_, 
wrong_key_schema, value_schema_,
+                                                 /*use_threads=*/false, 
channel_id, read_pool_));
+        ASSERT_NOK_WITH_MSG(reader->NextBatch(),
+                            "cannot find key field nonexistent_key in spill 
file");
+    }
+    {
+        auto wrong_value_schema = arrow::schema(
+            {arrow::field("f0", arrow::utf8()), 
arrow::field("nonexistent_value", arrow::int32())});
+        ASSERT_OK_AND_ASSIGN(auto reader,
+                             SpillReader::Create(file_system_, key_schema_, 
wrong_value_schema,
+                                                 /*use_threads=*/false, 
channel_id, read_pool_));
+        ASSERT_NOK_WITH_MSG(reader->NextBatch(),
+                            "cannot find value field nonexistent_value in 
spill file");
+    }
+}
+
+INSTANTIATE_TEST_SUITE_P(CompressionTypes, SpillReaderWriterTest,
+                         ::testing::Values("zstd", "none", "uncompressed", 
"lz4"));
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/mergetree/spill_writer.cpp 
b/src/paimon/core/mergetree/spill_writer.cpp
new file mode 100644
index 0000000..3e9b917
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_writer.cpp
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/mergetree/spill_writer.h"
+
+#include "paimon/common/utils/arrow/arrow_output_stream_adapter.h"
+#include "paimon/common/utils/arrow/arrow_utils.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/core/mergetree/spill_channel_manager.h"
+
+namespace paimon {
+
+SpillWriter::SpillWriter(const std::shared_ptr<FileSystem>& fs,
+                         const std::shared_ptr<arrow::Schema>& schema,
+                         const std::shared_ptr<FileIOChannel::Enumerator>& 
channel_enumerator,
+                         const std::shared_ptr<SpillChannelManager>& 
spill_channel_manager,
+                         const std::string& compression, int32_t 
compression_level,
+                         bool use_threads, const std::shared_ptr<MemoryPool>& 
pool)
+    : fs_(fs),
+      schema_(schema),
+      channel_enumerator_(channel_enumerator),
+      spill_channel_manager_(spill_channel_manager),
+      compression_(compression),
+      compression_level_(compression_level),
+      use_threads_(use_threads),
+      arrow_pool_(GetArrowPool(pool)) {}
+
+Result<std::unique_ptr<SpillWriter>> SpillWriter::Create(
+    const std::shared_ptr<FileSystem>& fs, const 
std::shared_ptr<arrow::Schema>& schema,
+    const std::shared_ptr<FileIOChannel::Enumerator>& channel_enumerator,
+    const std::shared_ptr<SpillChannelManager>& spill_channel_manager,
+    const std::string& compression, int32_t compression_level, bool 
use_threads,
+    const std::shared_ptr<MemoryPool>& pool) {
+    std::unique_ptr<SpillWriter> writer(new SpillWriter(fs, schema, 
channel_enumerator,
+                                                        spill_channel_manager, 
compression,
+                                                        compression_level, 
use_threads, pool));
+    PAIMON_RETURN_NOT_OK(writer->Open());
+    return writer;
+}
+
+Status SpillWriter::Open() {
+    channel_id_ = channel_enumerator_->Next();
+    auto ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults();
+    ipc_write_options.memory_pool = arrow_pool_.get();
+    ipc_write_options.use_threads = use_threads_;
+    auto cleanup_guard = ScopeGuard([&]() {
+        arrow_writer_.reset();
+        arrow_output_stream_adapter_.reset();
+        if (out_stream_) {
+            [[maybe_unused]] auto status = out_stream_->Close();
+            out_stream_.reset();
+        }
+        if (!channel_id_.GetPath().empty()) {
+            [[maybe_unused]] auto status = fs_->Delete(channel_id_.GetPath());
+        }
+    });
+    PAIMON_ASSIGN_OR_RAISE(arrow::Compression::type arrow_compression,
+                           ArrowUtils::GetCompressionType(compression_));
+    if (!arrow::util::Codec::SupportsCompressionLevel(arrow_compression)) {
+        compression_level_ = arrow::util::Codec::UseDefaultCompressionLevel();
+    }
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+        ipc_write_options.codec, arrow::util::Codec::Create(arrow_compression, 
compression_level_));
+    PAIMON_ASSIGN_OR_RAISE(out_stream_, fs_->Create(channel_id_.GetPath(), 
/*overwrite=*/false));
+    arrow_output_stream_adapter_ = 
std::make_shared<ArrowOutputStreamAdapter>(out_stream_);
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+        arrow_writer_,
+        arrow::ipc::MakeFileWriter(arrow_output_stream_adapter_, schema_, 
ipc_write_options));
+    spill_channel_manager_->AddChannel(channel_id_);
+    cleanup_guard.Release();
+    return Status::OK();
+}
+
+Status SpillWriter::WriteBatch(const std::shared_ptr<arrow::RecordBatch>& 
batch) {
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow_writer_->WriteRecordBatch(*batch));
+    return Status::OK();
+}
+
+Status SpillWriter::Close() {
+    if (closed_) {
+        return Status::OK();
+    }
+    if (arrow_writer_) {
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow_writer_->Close());
+    }
+    if (out_stream_) {
+        PAIMON_RETURN_NOT_OK(out_stream_->Close());
+    }
+    closed_ = true;
+    return Status::OK();
+}
+
+Result<int64_t> SpillWriter::GetFileSize() const {
+    if (channel_id_.GetPath().empty()) {
+        return Status::Invalid("spill writer has no channel id");
+    }
+    if (!closed_ && arrow_output_stream_adapter_ != nullptr) {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(int64_t file_size, 
arrow_output_stream_adapter_->Tell());
+        return file_size;
+    }
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStatus> file_status,
+                           fs_->GetFileStatus(channel_id_.GetPath()));
+    return static_cast<int64_t>(file_status->GetLen());
+}
+
+const FileIOChannel::ID& SpillWriter::GetChannelId() const {
+    return channel_id_;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/mergetree/spill_writer.h 
b/src/paimon/core/mergetree/spill_writer.h
new file mode 100644
index 0000000..6676dc2
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_writer.h
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/ipc/api.h"
+#include "paimon/core/disk/file_io_channel.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class RecordBatch;
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+
+class ArrowOutputStreamAdapter;
+class MemoryPool;
+class SpillChannelManager;
+
+class SpillWriter {
+ public:
+    static Result<std::unique_ptr<SpillWriter>> Create(
+        const std::shared_ptr<FileSystem>& fs, const 
std::shared_ptr<arrow::Schema>& schema,
+        const std::shared_ptr<FileIOChannel::Enumerator>& channel_enumerator,
+        const std::shared_ptr<SpillChannelManager>& spill_channel_manager,
+        const std::string& compression, int32_t compression_level, bool 
use_threads,
+        const std::shared_ptr<MemoryPool>& pool);
+
+    SpillWriter(const SpillWriter&) = delete;
+    SpillWriter& operator=(const SpillWriter&) = delete;
+
+    Status WriteBatch(const std::shared_ptr<arrow::RecordBatch>& batch);
+    Status Close();
+    Result<int64_t> GetFileSize() const;
+    const FileIOChannel::ID& GetChannelId() const;
+
+ private:
+    SpillWriter(const std::shared_ptr<FileSystem>& fs, const 
std::shared_ptr<arrow::Schema>& schema,
+                const std::shared_ptr<FileIOChannel::Enumerator>& 
channel_enumerator,
+                const std::shared_ptr<SpillChannelManager>& 
spill_channel_manager,
+                const std::string& compression, int32_t compression_level, 
bool use_threads,
+                const std::shared_ptr<MemoryPool>& pool);
+
+    Status Open();
+
+    std::shared_ptr<FileSystem> fs_;
+    std::shared_ptr<arrow::Schema> schema_;
+    std::shared_ptr<FileIOChannel::Enumerator> channel_enumerator_;
+    std::shared_ptr<SpillChannelManager> spill_channel_manager_;
+    std::string compression_;
+    int32_t compression_level_;
+    bool use_threads_;
+    std::shared_ptr<OutputStream> out_stream_;
+    std::shared_ptr<ArrowOutputStreamAdapter> arrow_output_stream_adapter_;
+    std::unique_ptr<arrow::MemoryPool> arrow_pool_;
+    std::shared_ptr<arrow::ipc::RecordBatchWriter> arrow_writer_;
+    FileIOChannel::ID channel_id_;
+    bool closed_ = false;
+};
+
+}  // namespace paimon

Reply via email to