This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 4a77916 feat(core): introduce merge tree spill buffers (#67)
4a77916 is described below
commit 4a77916da8170ae2551666b57bed5e84a665e3eb
Author: Zhang Jiawei <[email protected]>
AuthorDate: Wed Jun 10 11:14:21 2026 +0800
feat(core): introduce merge tree spill buffers (#67)
---
.../core/mergetree/in_memory_sort_buffer.cpp | 186 ++++++++++++
src/paimon/core/mergetree/in_memory_sort_buffer.h | 90 ++++++
src/paimon/core/mergetree/sort_buffer.h | 65 +++++
src/paimon/core/mergetree/spill_channel_manager.h | 61 ++++
.../core/mergetree/spill_channel_manager_test.cpp | 110 ++++++++
src/paimon/core/mergetree/spill_reader.cpp | 169 +++++++++++
src/paimon/core/mergetree/spill_reader.h | 96 +++++++
.../core/mergetree/spill_reader_writer_test.cpp | 311 +++++++++++++++++++++
src/paimon/core/mergetree/spill_writer.cpp | 127 +++++++++
src/paimon/core/mergetree/spill_writer.h | 82 ++++++
10 files changed, 1297 insertions(+)
diff --git a/src/paimon/core/mergetree/in_memory_sort_buffer.cpp
b/src/paimon/core/mergetree/in_memory_sort_buffer.cpp
new file mode 100644
index 0000000..66c6dce
--- /dev/null
+++ b/src/paimon/core/mergetree/in_memory_sort_buffer.cpp
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/mergetree/in_memory_sort_buffer.h"
+
+#include <cassert>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/core/io/key_value_in_memory_record_reader.h"
+#include "paimon/core/io/key_value_record_reader.h"
+#include "paimon/data/decimal.h"
+
+namespace paimon {
+
+InMemorySortBuffer::InMemorySortBuffer(int64_t last_sequence_number,
+ const std::shared_ptr<arrow::DataType>&
value_type,
+ const std::vector<std::string>&
trimmed_primary_keys,
+ const std::vector<std::string>&
user_defined_sequence_fields,
+ bool sequence_fields_ascending,
+ const
std::shared_ptr<FieldsComparator>& key_comparator,
+ uint64_t write_buffer_size,
+ const std::shared_ptr<MemoryPool>& pool)
+ : pool_(pool),
+ value_type_(value_type),
+ trimmed_primary_keys_(trimmed_primary_keys),
+ user_defined_sequence_fields_(user_defined_sequence_fields),
+ sequence_fields_ascending_(sequence_fields_ascending),
+ key_comparator_(key_comparator),
+ write_buffer_size_(write_buffer_size),
+ next_sequence_number_(last_sequence_number + 1) {}
+
+void InMemorySortBuffer::Clear() {
+ buffered_batches_.clear();
+ current_memory_in_bytes_ = 0;
+ total_row_count_ = 0;
+}
+
+uint64_t InMemorySortBuffer::GetMemorySize() const {
+ return current_memory_in_bytes_;
+}
+
+Result<bool> InMemorySortBuffer::FlushMemory() {
+ return false;
+}
+
+Result<bool> InMemorySortBuffer::Write(std::unique_ptr<RecordBatch>&&
moved_batch) {
+ if (ArrowArrayIsReleased(moved_batch->GetData())) {
+ return Status::Invalid("invalid batch: data is released");
+ }
+ std::unique_ptr<RecordBatch> batch = std::move(moved_batch);
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array>
arrow_array,
+ arrow::ImportArray(batch->GetData(),
value_type_));
+ auto value_struct_array =
std::dynamic_pointer_cast<arrow::StructArray>(arrow_array);
+ if (value_struct_array == nullptr) {
+ return Status::Invalid("invalid RecordBatch: cannot cast to
StructArray");
+ }
+ PAIMON_ASSIGN_OR_RAISE(int64_t memory_in_bytes,
EstimateMemoryUse(value_struct_array));
+ current_memory_in_bytes_ += static_cast<uint64_t>(memory_in_bytes);
+
+ BufferedWriteBatch buffered_batch;
+ buffered_batch.first_sequence_number = next_sequence_number_;
+ buffered_batch.struct_array = std::move(value_struct_array);
+ buffered_batch.row_kinds = batch->GetRowKind();
+ next_sequence_number_ += buffered_batch.struct_array->length();
+ total_row_count_ += buffered_batch.struct_array->length();
+ buffered_batches_.push_back(std::move(buffered_batch));
+ if (total_row_count_ > 0) {
+ estimated_memory_use_for_each_row_ = current_memory_in_bytes_ /
total_row_count_;
+ }
+ return current_memory_in_bytes_ < write_buffer_size_;
+}
+
+Result<std::vector<std::unique_ptr<KeyValueRecordReader>>>
InMemorySortBuffer::CreateReaders() {
+ std::vector<std::unique_ptr<KeyValueRecordReader>> readers;
+ if (buffered_batches_.empty()) {
+ return readers;
+ }
+
+ readers.reserve(buffered_batches_.size());
+ for (auto& buffered_batch : buffered_batches_) {
+ auto in_memory_reader = std::make_unique<KeyValueInMemoryRecordReader>(
+ buffered_batch.first_sequence_number, buffered_batch.struct_array,
+ buffered_batch.row_kinds, trimmed_primary_keys_,
user_defined_sequence_fields_,
+ sequence_fields_ascending_, key_comparator_, pool_);
+ readers.push_back(std::move(in_memory_reader));
+ }
+ return readers;
+}
+
+bool InMemorySortBuffer::HasData() const {
+ return !buffered_batches_.empty();
+}
+
+uint64_t InMemorySortBuffer::GetEstimateMemoryUseForEachRow() const {
+ return estimated_memory_use_for_each_row_;
+}
+
+// TODO(jinli.zjw): Consider making the memory estimation more accurate.
+// https://github.com/alibaba/paimon-cpp/pull/206#discussion_r3021325389
+Result<int64_t> InMemorySortBuffer::EstimateMemoryUse(const
std::shared_ptr<arrow::Array>& array) {
+ arrow::Type::type type = array->type()->id();
+ int64_t null_bits_size_in_bytes = (array->length() + 7) / 8;
+ switch (type) {
+ case arrow::Type::type::BOOL:
+ return null_bits_size_in_bytes + array->length() * sizeof(bool);
+ case arrow::Type::type::INT8:
+ return null_bits_size_in_bytes + array->length() * sizeof(int8_t);
+ case arrow::Type::type::INT16:
+ return null_bits_size_in_bytes + array->length() * sizeof(int16_t);
+ case arrow::Type::type::INT32:
+ return null_bits_size_in_bytes + array->length() * sizeof(int32_t);
+ case arrow::Type::type::DATE32:
+ return null_bits_size_in_bytes + array->length() * sizeof(int32_t);
+ case arrow::Type::type::INT64:
+ return null_bits_size_in_bytes + array->length() * sizeof(int64_t);
+ case arrow::Type::type::FLOAT:
+ return null_bits_size_in_bytes + array->length() * sizeof(float);
+ case arrow::Type::type::DOUBLE:
+ return null_bits_size_in_bytes + array->length() * sizeof(double);
+ case arrow::Type::type::TIMESTAMP:
+ return null_bits_size_in_bytes + array->length() * sizeof(int64_t);
+ case arrow::Type::type::DECIMAL:
+ return null_bits_size_in_bytes + array->length() *
sizeof(Decimal::int128_t);
+ case arrow::Type::type::STRING:
+ case arrow::Type::type::BINARY: {
+ auto binary_array =
+ arrow::internal::checked_cast<const
arrow::BinaryArray*>(array.get());
+ assert(binary_array);
+ int64_t value_length = binary_array->total_values_length();
+ int64_t offset_length = array->length() * sizeof(int32_t);
+ return null_bits_size_in_bytes + value_length + offset_length;
+ }
+ case arrow::Type::type::LIST: {
+ auto list_array = arrow::internal::checked_cast<const
arrow::ListArray*>(array.get());
+ assert(list_array);
+ PAIMON_ASSIGN_OR_RAISE(int64_t value_mem,
EstimateMemoryUse(list_array->values()));
+ return null_bits_size_in_bytes + value_mem;
+ }
+ case arrow::Type::type::MAP: {
+ auto map_array = arrow::internal::checked_cast<const
arrow::MapArray*>(array.get());
+ assert(map_array);
+ PAIMON_ASSIGN_OR_RAISE(int64_t key_mem,
EstimateMemoryUse(map_array->keys()));
+ PAIMON_ASSIGN_OR_RAISE(int64_t item_mem,
EstimateMemoryUse(map_array->items()));
+ return null_bits_size_in_bytes + key_mem + item_mem;
+ }
+ case arrow::Type::type::STRUCT: {
+ auto struct_array =
+ arrow::internal::checked_cast<const
arrow::StructArray*>(array.get());
+ assert(struct_array);
+ int64_t struct_mem = 0;
+ for (const auto& field : struct_array->fields()) {
+ PAIMON_ASSIGN_OR_RAISE(int64_t field_mem,
EstimateMemoryUse(field));
+ struct_mem += field_mem;
+ }
+ return null_bits_size_in_bytes + struct_mem;
+ }
+ default:
+ return Status::Invalid(fmt::format("Do not support type {} in
EstimateMemoryUse",
+ array->type()->ToString()));
+ }
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/mergetree/in_memory_sort_buffer.h
b/src/paimon/core/mergetree/in_memory_sort_buffer.h
new file mode 100644
index 0000000..3632bb8
--- /dev/null
+++ b/src/paimon/core/mergetree/in_memory_sort_buffer.h
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/type_fwd.h"
+#include "paimon/core/mergetree/sort_buffer.h"
+#include "paimon/record_batch.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class Array;
+class DataType;
+class Schema;
+class StructArray;
+} // namespace arrow
+
+namespace paimon {
+class FieldsComparator;
+class KeyValueRecordReader;
+class MemoryPool;
+
+/// A buffered write batch that records the data and the first sequence number
of the batch.
+struct BufferedWriteBatch {
+ int64_t first_sequence_number = 0;
+ std::shared_ptr<arrow::StructArray> struct_array;
+ std::vector<RecordBatch::RowKind> row_kinds;
+};
+
+/// Pure in-memory SortBuffer: buffers RecordBatches and exposes them as sorted
+/// KeyValueRecordReaders. Does not support spill to disk.
+class InMemorySortBuffer : public SortBuffer {
+ public:
+ InMemorySortBuffer(int64_t last_sequence_number,
+ const std::shared_ptr<arrow::DataType>& value_type,
+ const std::vector<std::string>& trimmed_primary_keys,
+ const std::vector<std::string>&
user_defined_sequence_fields,
+ bool sequence_fields_ascending,
+ const std::shared_ptr<FieldsComparator>& key_comparator,
+ uint64_t write_buffer_size, const
std::shared_ptr<MemoryPool>& pool);
+
+ void Clear() override;
+ uint64_t GetMemorySize() const override;
+ Result<bool> FlushMemory() override;
+ Result<bool> Write(std::unique_ptr<RecordBatch>&& batch) override;
+ Result<std::vector<std::unique_ptr<KeyValueRecordReader>>> CreateReaders()
override;
+ bool HasData() const override;
+ /// Get the estimated average memory usage per row in bytes.
+ uint64_t GetEstimateMemoryUseForEachRow() const;
+
+ private:
+ /// Estimate memory usage of an Arrow array.
+ static Result<int64_t> EstimateMemoryUse(const
std::shared_ptr<arrow::Array>& array);
+
+ const std::shared_ptr<MemoryPool> pool_;
+ const std::shared_ptr<arrow::DataType> value_type_;
+ const std::vector<std::string> trimmed_primary_keys_;
+ const std::vector<std::string> user_defined_sequence_fields_;
+ const bool sequence_fields_ascending_;
+ const std::shared_ptr<FieldsComparator> key_comparator_;
+ const uint64_t write_buffer_size_;
+
+ std::vector<BufferedWriteBatch> buffered_batches_;
+ uint64_t current_memory_in_bytes_ = 0;
+ uint64_t estimated_memory_use_for_each_row_ = 0;
+ int64_t total_row_count_ = 0;
+ int64_t next_sequence_number_ = 0;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/mergetree/sort_buffer.h
b/src/paimon/core/mergetree/sort_buffer.h
new file mode 100644
index 0000000..96ef36d
--- /dev/null
+++ b/src/paimon/core/mergetree/sort_buffer.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "paimon/record_batch.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class KeyValueRecordReader;
+
+/// SortBuffer is the interface for managing buffered records with sorting
capability.
+/// It abstracts the in-memory and external sort buffer implementations.
+class SortBuffer {
+ public:
+ virtual ~SortBuffer() = default;
+
+ /// Reset the buffer, releasing all in-memory batches and on-disk spill
files.
+ virtual void Clear() = 0;
+
+ /// Return the current memory usage in bytes.
+ virtual uint64_t GetMemorySize() const = 0;
+
+ /// Spill in-memory data to disk if supported by this implementation.
+ /// @return true for FlushMemory success with the buffer can accept more
data afterwards.
+ /// false for FlushMemory success with there is no more quota for
next flush. (on-disk)
+ /// false for the FlushMemory operation is not supported.
(in-memory only)
+ /// Status::Invalid for flush failure.
+ virtual Result<bool> FlushMemory() = 0;
+
+ /// Append a RecordBatch to the buffer.
+ /// @return true for write success with the buffer can accept more data
afterwards.
+ /// false for write success with the buffer is no more quota
(memory or disk) for next
+ /// write.
+ /// Status::Invalid for write failure.
+ virtual Result<bool> Write(std::unique_ptr<RecordBatch>&& batch) = 0;
+
+ /// Create sorted KeyValueRecordReaders from all buffered data (in-memory
+ on-disk).
+ /// This does not clear the buffer; the caller should invoke Clear()
afterwards.
+ virtual Result<std::vector<std::unique_ptr<KeyValueRecordReader>>>
CreateReaders() = 0;
+
+ /// Return true if there is any data to output (in-memory or on-disk).
+ virtual bool HasData() const = 0;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/mergetree/spill_channel_manager.h
b/src/paimon/core/mergetree/spill_channel_manager.h
new file mode 100644
index 0000000..6f22c12
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_channel_manager.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <unordered_set>
+
+#include "paimon/core/disk/file_io_channel.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class SpillChannelManager {
+ public:
+ SpillChannelManager(const std::shared_ptr<FileSystem>& fs, size_t
initial_capacity) : fs_(fs) {
+ channels_.reserve(initial_capacity);
+ }
+
+ void AddChannel(const FileIOChannel::ID& channel_id) {
+ channels_.emplace(channel_id);
+ }
+
+ Status DeleteChannel(const FileIOChannel::ID& channel_id) {
+ PAIMON_RETURN_NOT_OK(fs_->Delete(channel_id.GetPath()));
+ channels_.erase(channel_id);
+ return Status::OK();
+ }
+
+ void Reset() {
+ for (const auto& channel : channels_) {
+ [[maybe_unused]] auto status = fs_->Delete(channel.GetPath());
+ }
+ channels_.clear();
+ }
+
+ const std::unordered_set<FileIOChannel::ID, FileIOChannel::ID::Hash>&
GetChannels() const {
+ return channels_;
+ }
+
+ private:
+ std::unordered_set<FileIOChannel::ID, FileIOChannel::ID::Hash> channels_;
+ std::shared_ptr<FileSystem> fs_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/mergetree/spill_channel_manager_test.cpp
b/src/paimon/core/mergetree/spill_channel_manager_test.cpp
new file mode 100644
index 0000000..05e5de6
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_channel_manager_test.cpp
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/mergetree/spill_channel_manager.h"
+
+#include <memory>
+
+#include "gtest/gtest.h"
+#include "paimon/core/disk/io_manager.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class SpillChannelManagerTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ test_dir_ = UniqueTestDirectory::Create();
+ file_system_ = test_dir_->GetFileSystem();
+ io_manager_ = std::make_unique<IOManager>(test_dir_->Str(),
test_dir_->GetFileSystem());
+ }
+
+ FileIOChannel::ID CreateTempFile() {
+ EXPECT_OK_AND_ASSIGN(auto channel_id, io_manager_->CreateChannel());
+ // Create the actual file on disk
+ EXPECT_OK_AND_ASSIGN(auto out,
+ file_system_->Create(channel_id.GetPath(),
/*overwrite=*/false));
+ EXPECT_OK(out->Close());
+ return channel_id;
+ }
+
+ protected:
+ std::shared_ptr<FileSystem> file_system_;
+ std::unique_ptr<UniqueTestDirectory> test_dir_;
+ std::unique_ptr<IOManager> io_manager_;
+};
+
+TEST_F(SpillChannelManagerTest, AddAndGetChannels) {
+ SpillChannelManager manager(file_system_, 128);
+
+ auto channel1 = CreateTempFile();
+ auto channel2 = CreateTempFile();
+
+ manager.AddChannel(channel1);
+ manager.AddChannel(channel2);
+
+ const auto& channels = manager.GetChannels();
+ ASSERT_EQ(channels.size(), 2);
+ ASSERT_GT(channels.count(channel1), 0);
+ ASSERT_GT(channels.count(channel2), 0);
+}
+
+TEST_F(SpillChannelManagerTest, DeleteChannelRemovesFileAndEntry) {
+ SpillChannelManager manager(file_system_, 128);
+
+ auto channel = CreateTempFile();
+ manager.AddChannel(channel);
+
+ ASSERT_OK_AND_ASSIGN(bool exists_before,
file_system_->Exists(channel.GetPath()));
+ ASSERT_TRUE(exists_before);
+
+ ASSERT_OK(manager.DeleteChannel(channel));
+ ASSERT_EQ(manager.GetChannels().size(), 0);
+ ASSERT_OK_AND_ASSIGN(bool exists_after,
file_system_->Exists(channel.GetPath()));
+ ASSERT_FALSE(exists_after);
+}
+
+TEST_F(SpillChannelManagerTest, ResetDeletesAllFiles) {
+ SpillChannelManager manager(file_system_, 128);
+
+ auto channel1 = CreateTempFile();
+ auto channel2 = CreateTempFile();
+ auto channel3 = CreateTempFile();
+
+ manager.AddChannel(channel1);
+ manager.AddChannel(channel2);
+ manager.AddChannel(channel3);
+
+ ASSERT_OK_AND_ASSIGN(bool e1, file_system_->Exists(channel1.GetPath()));
+ ASSERT_OK_AND_ASSIGN(bool e2, file_system_->Exists(channel2.GetPath()));
+ ASSERT_OK_AND_ASSIGN(bool e3, file_system_->Exists(channel3.GetPath()));
+ ASSERT_TRUE(e1);
+ ASSERT_TRUE(e2);
+ ASSERT_TRUE(e3);
+
+ manager.Reset();
+
+ ASSERT_OK_AND_ASSIGN(bool a1, file_system_->Exists(channel1.GetPath()));
+ ASSERT_OK_AND_ASSIGN(bool a2, file_system_->Exists(channel2.GetPath()));
+ ASSERT_OK_AND_ASSIGN(bool a3, file_system_->Exists(channel3.GetPath()));
+ ASSERT_FALSE(a1);
+ ASSERT_FALSE(a2);
+ ASSERT_FALSE(a3);
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/mergetree/spill_reader.cpp
b/src/paimon/core/mergetree/spill_reader.cpp
new file mode 100644
index 0000000..0b338c2
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_reader.cpp
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/mergetree/spill_reader.h"
+
+#include "paimon/common/data/columnar/columnar_row_ref.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+
+namespace paimon {
+
+SpillReader::SpillReader(const std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<arrow::Schema>& key_schema,
+ const std::shared_ptr<arrow::Schema>& value_schema,
bool use_threads,
+ const std::shared_ptr<MemoryPool>& pool)
+ : fs_(fs),
+ key_schema_(key_schema),
+ value_schema_(value_schema),
+ pool_(pool),
+ arrow_pool_(GetArrowPool(pool)),
+ use_threads_(use_threads),
+ metrics_(std::make_shared<MetricsImpl>()) {}
+
+Result<std::unique_ptr<SpillReader>> SpillReader::Create(
+ const std::shared_ptr<FileSystem>& fs, const
std::shared_ptr<arrow::Schema>& key_schema,
+ const std::shared_ptr<arrow::Schema>& value_schema, bool use_threads,
+ const FileIOChannel::ID& channel_id, const std::shared_ptr<MemoryPool>&
pool) {
+ std::unique_ptr<SpillReader> reader(
+ new SpillReader(fs, key_schema, value_schema, use_threads, pool));
+ PAIMON_RETURN_NOT_OK(reader->Open(channel_id));
+ return reader;
+}
+
+Status SpillReader::Open(const FileIOChannel::ID& channel_id) {
+ const std::string& file_path = channel_id.GetPath();
+ PAIMON_ASSIGN_OR_RAISE(in_stream_, fs_->Open(file_path));
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStatus> file_status,
fs_->GetFileStatus(file_path));
+ uint64_t file_len = file_status->GetLen();
+ arrow_input_stream_adapter_ =
+ std::make_shared<ArrowInputStreamAdapter>(in_stream_, arrow_pool_,
file_len);
+ auto ipc_read_options = arrow::ipc::IpcReadOptions::Defaults();
+ ipc_read_options.memory_pool = arrow_pool_.get();
+ ipc_read_options.use_threads = use_threads_;
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ arrow_reader_,
+ arrow::ipc::RecordBatchFileReader::Open(arrow_input_stream_adapter_,
ipc_read_options));
+ num_record_batches_ = arrow_reader_->num_record_batches();
+ current_batch_index_ = 0;
+ return Status::OK();
+}
+
+SpillReader::Iterator::Iterator(SpillReader* reader) : reader_(reader) {}
+
+Result<bool> SpillReader::Iterator::HasNext() const {
+ return cursor_ < reader_->batch_length_;
+}
+
+Result<KeyValue> SpillReader::Iterator::Next() {
+ PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind,
+
RowKind::FromByteValue(reader_->row_kind_array_->Value(cursor_)));
+ int64_t sequence_number = reader_->sequence_number_array_->Value(cursor_);
+ auto key = std::make_unique<ColumnarRowRef>(reader_->key_ctx_, cursor_);
+ auto value = std::make_unique<ColumnarRowRef>(reader_->value_ctx_,
cursor_);
+ cursor_++;
+ return KeyValue(row_kind, sequence_number, KeyValue::UNKNOWN_LEVEL,
std::move(key),
+ std::move(value));
+}
+
+Result<std::unique_ptr<KeyValueRecordReader::Iterator>>
SpillReader::NextBatch() {
+ Reset();
+ if (current_batch_index_ >= num_record_batches_) {
+ return std::unique_ptr<KeyValueRecordReader::Iterator>();
+ }
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::RecordBatch>
record_batch,
+
arrow_reader_->ReadRecordBatch(current_batch_index_));
+ current_batch_index_++;
+
+ batch_length_ = record_batch->num_rows();
+
+ auto sequence_number_col =
+ record_batch->GetColumnByName(SpecialFields::SequenceNumber().Name());
+ if (!sequence_number_col) {
+ return Status::Invalid("cannot find _SEQUENCE_NUMBER column in spill
file");
+ }
+ sequence_number_array_ =
+
std::dynamic_pointer_cast<arrow::NumericArray<arrow::Int64Type>>(sequence_number_col);
+ if (!sequence_number_array_) {
+ return Status::Invalid("cannot cast _SEQUENCE_NUMBER column to int64
arrow array");
+ }
+
+ auto value_kind_col =
record_batch->GetColumnByName(SpecialFields::ValueKind().Name());
+ if (!value_kind_col) {
+ return Status::Invalid("cannot find _VALUE_KIND column in spill file");
+ }
+ row_kind_array_ =
+
std::dynamic_pointer_cast<arrow::NumericArray<arrow::Int8Type>>(value_kind_col);
+ if (!row_kind_array_) {
+ return Status::Invalid("cannot cast _VALUE_KIND column to int8 arrow
array");
+ }
+
+ arrow::ArrayVector key_fields;
+ key_fields.reserve(key_schema_->num_fields());
+ for (const auto& key_field : key_schema_->fields()) {
+ auto col = record_batch->GetColumnByName(key_field->name());
+ if (!col) {
+ return Status::Invalid("cannot find key field " +
key_field->name() + " in spill file");
+ }
+ key_fields.emplace_back(col);
+ }
+
+ arrow::ArrayVector value_fields;
+ value_fields.reserve(value_schema_->num_fields());
+ for (const auto& value_field : value_schema_->fields()) {
+ auto col = record_batch->GetColumnByName(value_field->name());
+ if (!col) {
+ return Status::Invalid("cannot find value field " +
value_field->name() +
+ " in spill file");
+ }
+ value_fields.emplace_back(col);
+ }
+
+ key_ctx_ = std::make_shared<ColumnarBatchContext>(key_fields, pool_);
+ value_ctx_ = std::make_shared<ColumnarBatchContext>(value_fields, pool_);
+
+ return std::make_unique<SpillReader::Iterator>(this);
+}
+
+std::shared_ptr<Metrics> SpillReader::GetReaderMetrics() const {
+ return metrics_;
+}
+
+void SpillReader::Close() {
+ Reset();
+ arrow_reader_.reset();
+ arrow_input_stream_adapter_.reset();
+ if (in_stream_) {
+ [[maybe_unused]] auto status = in_stream_->Close();
+ in_stream_.reset();
+ }
+}
+
+void SpillReader::Reset() {
+ key_ctx_.reset();
+ value_ctx_.reset();
+ sequence_number_array_.reset();
+ row_kind_array_.reset();
+ batch_length_ = 0;
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/mergetree/spill_reader.h
b/src/paimon/core/mergetree/spill_reader.h
new file mode 100644
index 0000000..6dba6fd
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_reader.h
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/array/array_primitive.h"
+#include "arrow/ipc/api.h"
+#include "paimon/common/data/columnar/columnar_batch_context.h"
+#include "paimon/core/disk/file_io_channel.h"
+#include "paimon/core/io/key_value_record_reader.h"
+#include "paimon/core/key_value.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace arrow {
+class MemoryPool;
+} // namespace arrow
+
+namespace paimon {
+
+class ArrowInputStreamAdapter;
+class Metrics;
+
+class SpillReader : public KeyValueRecordReader {
+ public:
+ static Result<std::unique_ptr<SpillReader>> Create(
+ const std::shared_ptr<FileSystem>& fs, const
std::shared_ptr<arrow::Schema>& key_schema,
+ const std::shared_ptr<arrow::Schema>& value_schema, bool use_threads,
+ const FileIOChannel::ID& channel_id, const
std::shared_ptr<MemoryPool>& pool);
+
+ SpillReader(const SpillReader&) = delete;
+ SpillReader& operator=(const SpillReader&) = delete;
+
+ class Iterator : public KeyValueRecordReader::Iterator {
+ public:
+ explicit Iterator(SpillReader* reader);
+ Result<bool> HasNext() const override;
+ Result<KeyValue> Next() override;
+
+ private:
+ int64_t cursor_ = 0;
+ SpillReader* reader_ = nullptr;
+ };
+
+ Result<std::unique_ptr<KeyValueRecordReader::Iterator>> NextBatch()
override;
+ std::shared_ptr<Metrics> GetReaderMetrics() const override;
+ void Close() override;
+
+ private:
+ SpillReader(const std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<arrow::Schema>& key_schema,
+ const std::shared_ptr<arrow::Schema>& value_schema, bool
use_threads,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ Status Open(const FileIOChannel::ID& channel_id);
+ void Reset();
+
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<arrow::Schema> key_schema_;
+ std::shared_ptr<arrow::Schema> value_schema_;
+ std::shared_ptr<MemoryPool> pool_;
+ std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+ bool use_threads_;
+ std::shared_ptr<Metrics> metrics_;
+
+ std::shared_ptr<InputStream> in_stream_;
+ std::shared_ptr<ArrowInputStreamAdapter> arrow_input_stream_adapter_;
+ std::shared_ptr<arrow::ipc::RecordBatchFileReader> arrow_reader_;
+ int32_t current_batch_index_ = 0;
+ int32_t num_record_batches_ = 0;
+
+ int64_t batch_length_ = 0;
+ std::shared_ptr<arrow::NumericArray<arrow::Int64Type>>
sequence_number_array_;
+ std::shared_ptr<arrow::NumericArray<arrow::Int8Type>> row_kind_array_;
+ std::shared_ptr<ColumnarBatchContext> key_ctx_;
+ std::shared_ptr<ColumnarBatchContext> value_ctx_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/mergetree/spill_reader_writer_test.cpp
b/src/paimon/core/mergetree/spill_reader_writer_test.cpp
new file mode 100644
index 0000000..e363389
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_reader_writer_test.cpp
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/disk/io_manager.h"
+#include "paimon/core/mergetree/spill_channel_manager.h"
+#include "paimon/core/mergetree/spill_reader.h"
+#include "paimon/core/mergetree/spill_writer.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class SpillReaderWriterTest : public ::testing::TestWithParam<std::string> {
+ public:
+ void SetUp() override {
+ read_pool_ = GetDefaultPool();
+ write_pool_ = GetDefaultPool();
+ test_dir_ = UniqueTestDirectory::Create();
+ file_system_ = test_dir_->GetFileSystem();
+
+ io_manager_ = std::make_unique<IOManager>(test_dir_->Str(),
test_dir_->GetFileSystem());
+ ASSERT_OK_AND_ASSIGN(channel_enumerator_,
io_manager_->CreateChannelEnumerator());
+ spill_channel_manager_ =
std::make_shared<SpillChannelManager>(file_system_, 128);
+
+ // Build write schema: [_SEQUENCE_NUMBER, _VALUE_KIND, key fields...,
value fields...]
+ value_fields_ = {DataField(0, arrow::field("f0", arrow::utf8())),
+ DataField(1, arrow::field("f1", arrow::int32()))};
+ key_fields_ = {DataField(0, arrow::field("f0", arrow::utf8()))};
+
+ key_schema_ = DataField::ConvertDataFieldsToArrowSchema(key_fields_);
+ value_schema_ =
DataField::ConvertDataFieldsToArrowSchema(value_fields_);
+ write_schema_ =
SpecialFields::CompleteSequenceAndValueKindField(value_schema_);
+ write_type_ = arrow::struct_(write_schema_->fields());
+ }
+
+ std::shared_ptr<arrow::RecordBatch> CreateRecordBatch(const std::string&
json_data,
+ int64_t num_rows)
const {
+ auto array = arrow::ipc::internal::json::ArrayFromJSON(write_type_,
json_data).ValueOrDie();
+ auto struct_array =
std::dynamic_pointer_cast<arrow::StructArray>(array);
+ return arrow::RecordBatch::Make(write_schema_, num_rows,
struct_array->fields());
+ }
+
+ Result<std::unique_ptr<SpillWriter>> CreateSpillWriter() const {
+ return SpillWriter::Create(file_system_, write_schema_,
channel_enumerator_,
+ spill_channel_manager_, GetParam(),
/*compression_level=*/1,
+ /*use_threads=*/false, write_pool_);
+ }
+
+ FileIOChannel::ID WriteSpillFile(
+ const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
+ EXPECT_OK_AND_ASSIGN(auto writer, CreateSpillWriter());
+ for (const auto& batch : batches) {
+ EXPECT_OK(writer->WriteBatch(batch));
+ }
+ EXPECT_OK(writer->Close());
+ return writer->GetChannelId();
+ }
+
+ Result<std::unique_ptr<SpillReader>> CreateSpillReader(
+ const FileIOChannel::ID& channel_id) const {
+ return SpillReader::Create(file_system_, key_schema_, value_schema_,
/*use_threads=*/false,
+ channel_id, read_pool_);
+ }
+
+ protected:
+ std::shared_ptr<MemoryPool> read_pool_;
+ std::shared_ptr<MemoryPool> write_pool_;
+ std::shared_ptr<FileSystem> file_system_;
+ std::unique_ptr<UniqueTestDirectory> test_dir_;
+ std::unique_ptr<IOManager> io_manager_;
+ std::shared_ptr<FileIOChannel::Enumerator> channel_enumerator_;
+ std::shared_ptr<SpillChannelManager> spill_channel_manager_;
+
+ std::vector<DataField> value_fields_;
+ std::vector<DataField> key_fields_;
+ std::shared_ptr<arrow::Schema> write_schema_;
+ std::shared_ptr<arrow::DataType> write_type_;
+ std::shared_ptr<arrow::Schema> key_schema_;
+ std::shared_ptr<arrow::Schema> value_schema_;
+};
+
+TEST_P(SpillReaderWriterTest, TestWriteBatch) {
+ FileIOChannel::ID channel_id_1;
+ FileIOChannel::ID channel_id_2;
+
+ // First writer
+ {
+ ASSERT_OK_AND_ASSIGN(auto writer, CreateSpillWriter());
+
+ auto batch = CreateRecordBatch(R"([
+ [0, 1, "Alice", 10],
+ [1, 1, "Bob", 20]
+ ])",
+ 2);
+ ASSERT_OK(writer->WriteBatch(batch));
+ ASSERT_OK_AND_ASSIGN(int64_t file_size, writer->GetFileSize());
+ ASSERT_GT(file_size, 0);
+ ASSERT_OK(writer->Close());
+ channel_id_1 = writer->GetChannelId();
+ ASSERT_GT(write_pool_->MaxMemoryUsage(), 0);
+ }
+ // Second writer
+ {
+ ASSERT_OK_AND_ASSIGN(auto writer, CreateSpillWriter());
+
+ auto batch_a = CreateRecordBatch(R"([
+ [2, 1, "Carol", 30],
+ [3, 1, "Dave", 40]
+ ])",
+ 2);
+ auto batch_b = CreateRecordBatch(R"([
+ [4, 1, "Eve", 50],
+ [5, 1, "Frank", 60],
+ [6, 1, "Grace", 70]
+ ])",
+ 3);
+ ASSERT_OK(writer->WriteBatch(batch_a));
+ ASSERT_OK_AND_ASSIGN(int64_t size_before, writer->GetFileSize());
+ ASSERT_OK(writer->WriteBatch(batch_b));
+ ASSERT_OK_AND_ASSIGN(int64_t size_after, writer->GetFileSize());
+ ASSERT_GT(size_after, size_before);
+ ASSERT_OK(writer->Close());
+ channel_id_2 = writer->GetChannelId();
+ }
+ // Read back first writer's data
+ {
+ ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id_1));
+
+ std::vector<std::string_view> expected_keys = {"Alice", "Bob"};
+ int total_rows = 0;
+ int batch_count = 0;
+ while (true) {
+ ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch());
+ if (iter == nullptr) {
+ break;
+ }
+ batch_count++;
+ while (true) {
+ ASSERT_OK_AND_ASSIGN(bool has_next, iter->HasNext());
+ if (!has_next) {
+ break;
+ }
+ ASSERT_OK_AND_ASSIGN(auto kv, iter->Next());
+ ASSERT_EQ(kv.key->GetStringView(0), expected_keys[total_rows]);
+ total_rows++;
+ }
+ }
+ ASSERT_EQ(batch_count, 1);
+ ASSERT_EQ(total_rows, 2);
+ reader->Close();
+ ASSERT_GT(read_pool_->MaxMemoryUsage(), 0);
+ }
+ // Read back second writer's data
+ {
+ ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id_2));
+
+ std::vector<std::string_view> expected_keys = {"Carol", "Dave", "Eve",
"Frank", "Grace"};
+ int total_rows = 0;
+ int batch_count = 0;
+ while (true) {
+ ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch());
+ if (iter == nullptr) {
+ break;
+ }
+ batch_count++;
+ while (true) {
+ ASSERT_OK_AND_ASSIGN(bool has_next, iter->HasNext());
+ if (!has_next) {
+ break;
+ }
+ ASSERT_OK_AND_ASSIGN(auto kv, iter->Next());
+ ASSERT_EQ(kv.key->GetStringView(0), expected_keys[total_rows]);
+ total_rows++;
+ }
+ }
+ ASSERT_EQ(batch_count, 2);
+ ASSERT_EQ(total_rows, 5);
+ reader->Close();
+ }
+}
+
+TEST_P(SpillReaderWriterTest, TestReadBatch) {
+ {
+ auto batch1 = CreateRecordBatch(R"([[0, 1, "Alice", 10], [1, 1, "Bob",
20]])", 2);
+ auto batch2 =
+ CreateRecordBatch(R"([[2, 1, "Carol", 30], [3, 2, "Dave", 40], [4,
3, "Eve", 50]])", 3);
+
+ auto channel_id = WriteSpillFile({batch1, batch2});
+ ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id));
+
+ std::vector<std::string_view> expected_keys = {"Alice", "Bob",
"Carol", "Dave", "Eve"};
+ std::vector<int32_t> expected_values = {10, 20, 30, 40, 50};
+ std::vector<int64_t> expected_seqs = {0, 1, 2, 3, 4};
+ std::vector<int8_t> expected_kinds = {1, 1, 1, 2, 3};
+
+ int total_rows = 0;
+ int batch_count = 0;
+ while (true) {
+ ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch());
+ if (iter == nullptr) break;
+ batch_count++;
+ while (true) {
+ ASSERT_OK_AND_ASSIGN(bool has_next, iter->HasNext());
+ if (!has_next) {
+ break;
+ }
+ ASSERT_OK_AND_ASSIGN(auto kv, iter->Next());
+ ASSERT_EQ(kv.key->GetStringView(0), expected_keys[total_rows]);
+ ASSERT_EQ(kv.value->GetStringView(0),
expected_keys[total_rows]);
+ ASSERT_EQ(kv.value->GetInt(1), expected_values[total_rows]);
+ ASSERT_EQ(kv.sequence_number, expected_seqs[total_rows]);
+ ASSERT_EQ(kv.value_kind->ToByteValue(),
expected_kinds[total_rows]);
+ total_rows++;
+ }
+ }
+ ASSERT_EQ(batch_count, 2);
+ ASSERT_EQ(total_rows, 5);
+ reader->Close();
+ }
+ {
+ auto empty_batch =
+ arrow::RecordBatch::Make(write_schema_, 0,
+
{arrow::MakeEmptyArray(arrow::int64()).ValueOrDie(),
+
arrow::MakeEmptyArray(arrow::int8()).ValueOrDie(),
+
arrow::MakeEmptyArray(arrow::utf8()).ValueOrDie(),
+
arrow::MakeEmptyArray(arrow::int32()).ValueOrDie()});
+
+ auto channel_id = WriteSpillFile({empty_batch});
+ ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id));
+
+ ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch());
+ if (iter != nullptr) {
+ ASSERT_OK_AND_ASSIGN(bool has_next, iter->HasNext());
+ ASSERT_FALSE(has_next);
+ }
+ reader->Close();
+ }
+}
+
+TEST_P(SpillReaderWriterTest, TestWriterCloseIdempotency) {
+ ASSERT_OK_AND_ASSIGN(auto writer, CreateSpillWriter());
+ auto batch = CreateRecordBatch(R"([[0, 1, "Alice", 10]])", 1);
+ ASSERT_OK(writer->WriteBatch(batch));
+ ASSERT_OK(writer->Close());
+ // second close should be idempotent
+ ASSERT_OK(writer->Close());
+
+ auto channel_id = writer->GetChannelId();
+ ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id));
+
+ // Read and verify data is intact.
+ ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch());
+ ASSERT_NE(iter, nullptr);
+ ASSERT_OK_AND_ASSIGN(auto kv, iter->Next());
+ ASSERT_EQ(kv.key->GetStringView(0), "Alice");
+ ASSERT_OK_AND_ASSIGN(bool has_next, iter->HasNext());
+ ASSERT_FALSE(has_next);
+ reader->Close();
+}
+
+TEST_P(SpillReaderWriterTest, TestReaderSchemaMismatchErrors) {
+ auto batch = CreateRecordBatch(R"([[0, 1, "Alice", 10], [1, 1, "Bob",
20]])", 2);
+ auto channel_id = WriteSpillFile({batch});
+ {
+ auto wrong_key_schema = arrow::schema({arrow::field("nonexistent_key",
arrow::utf8())});
+ ASSERT_OK_AND_ASSIGN(auto reader,
+ SpillReader::Create(file_system_,
wrong_key_schema, value_schema_,
+ /*use_threads=*/false,
channel_id, read_pool_));
+ ASSERT_NOK_WITH_MSG(reader->NextBatch(),
+ "cannot find key field nonexistent_key in spill
file");
+ }
+ {
+ auto wrong_value_schema = arrow::schema(
+ {arrow::field("f0", arrow::utf8()),
arrow::field("nonexistent_value", arrow::int32())});
+ ASSERT_OK_AND_ASSIGN(auto reader,
+ SpillReader::Create(file_system_, key_schema_,
wrong_value_schema,
+ /*use_threads=*/false,
channel_id, read_pool_));
+ ASSERT_NOK_WITH_MSG(reader->NextBatch(),
+ "cannot find value field nonexistent_value in
spill file");
+ }
+}
+
+INSTANTIATE_TEST_SUITE_P(CompressionTypes, SpillReaderWriterTest,
+ ::testing::Values("zstd", "none", "uncompressed",
"lz4"));
+
+} // namespace paimon::test
diff --git a/src/paimon/core/mergetree/spill_writer.cpp
b/src/paimon/core/mergetree/spill_writer.cpp
new file mode 100644
index 0000000..3e9b917
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_writer.cpp
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/mergetree/spill_writer.h"
+
+#include "paimon/common/utils/arrow/arrow_output_stream_adapter.h"
+#include "paimon/common/utils/arrow/arrow_utils.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/core/mergetree/spill_channel_manager.h"
+
+namespace paimon {
+
+SpillWriter::SpillWriter(const std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<FileIOChannel::Enumerator>&
channel_enumerator,
+ const std::shared_ptr<SpillChannelManager>&
spill_channel_manager,
+ const std::string& compression, int32_t
compression_level,
+ bool use_threads, const std::shared_ptr<MemoryPool>&
pool)
+ : fs_(fs),
+ schema_(schema),
+ channel_enumerator_(channel_enumerator),
+ spill_channel_manager_(spill_channel_manager),
+ compression_(compression),
+ compression_level_(compression_level),
+ use_threads_(use_threads),
+ arrow_pool_(GetArrowPool(pool)) {}
+
+Result<std::unique_ptr<SpillWriter>> SpillWriter::Create(
+ const std::shared_ptr<FileSystem>& fs, const
std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<FileIOChannel::Enumerator>& channel_enumerator,
+ const std::shared_ptr<SpillChannelManager>& spill_channel_manager,
+ const std::string& compression, int32_t compression_level, bool
use_threads,
+ const std::shared_ptr<MemoryPool>& pool) {
+ std::unique_ptr<SpillWriter> writer(new SpillWriter(fs, schema,
channel_enumerator,
+ spill_channel_manager,
compression,
+ compression_level,
use_threads, pool));
+ PAIMON_RETURN_NOT_OK(writer->Open());
+ return writer;
+}
+
+Status SpillWriter::Open() {
+ channel_id_ = channel_enumerator_->Next();
+ auto ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults();
+ ipc_write_options.memory_pool = arrow_pool_.get();
+ ipc_write_options.use_threads = use_threads_;
+ auto cleanup_guard = ScopeGuard([&]() {
+ arrow_writer_.reset();
+ arrow_output_stream_adapter_.reset();
+ if (out_stream_) {
+ [[maybe_unused]] auto status = out_stream_->Close();
+ out_stream_.reset();
+ }
+ if (!channel_id_.GetPath().empty()) {
+ [[maybe_unused]] auto status = fs_->Delete(channel_id_.GetPath());
+ }
+ });
+ PAIMON_ASSIGN_OR_RAISE(arrow::Compression::type arrow_compression,
+ ArrowUtils::GetCompressionType(compression_));
+ if (!arrow::util::Codec::SupportsCompressionLevel(arrow_compression)) {
+ compression_level_ = arrow::util::Codec::UseDefaultCompressionLevel();
+ }
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ ipc_write_options.codec, arrow::util::Codec::Create(arrow_compression,
compression_level_));
+ PAIMON_ASSIGN_OR_RAISE(out_stream_, fs_->Create(channel_id_.GetPath(),
/*overwrite=*/false));
+ arrow_output_stream_adapter_ =
std::make_shared<ArrowOutputStreamAdapter>(out_stream_);
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ arrow_writer_,
+ arrow::ipc::MakeFileWriter(arrow_output_stream_adapter_, schema_,
ipc_write_options));
+ spill_channel_manager_->AddChannel(channel_id_);
+ cleanup_guard.Release();
+ return Status::OK();
+}
+
+Status SpillWriter::WriteBatch(const std::shared_ptr<arrow::RecordBatch>&
batch) {
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow_writer_->WriteRecordBatch(*batch));
+ return Status::OK();
+}
+
+Status SpillWriter::Close() {
+ if (closed_) {
+ return Status::OK();
+ }
+ if (arrow_writer_) {
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow_writer_->Close());
+ }
+ if (out_stream_) {
+ PAIMON_RETURN_NOT_OK(out_stream_->Close());
+ }
+ closed_ = true;
+ return Status::OK();
+}
+
+Result<int64_t> SpillWriter::GetFileSize() const {
+ if (channel_id_.GetPath().empty()) {
+ return Status::Invalid("spill writer has no channel id");
+ }
+ if (!closed_ && arrow_output_stream_adapter_ != nullptr) {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(int64_t file_size,
arrow_output_stream_adapter_->Tell());
+ return file_size;
+ }
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStatus> file_status,
+ fs_->GetFileStatus(channel_id_.GetPath()));
+ return static_cast<int64_t>(file_status->GetLen());
+}
+
+const FileIOChannel::ID& SpillWriter::GetChannelId() const {
+ return channel_id_;
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/mergetree/spill_writer.h
b/src/paimon/core/mergetree/spill_writer.h
new file mode 100644
index 0000000..6676dc2
--- /dev/null
+++ b/src/paimon/core/mergetree/spill_writer.h
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/ipc/api.h"
+#include "paimon/core/disk/file_io_channel.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class RecordBatch;
+class Schema;
+} // namespace arrow
+
+namespace paimon {
+
+class ArrowOutputStreamAdapter;
+class MemoryPool;
+class SpillChannelManager;
+
+class SpillWriter {
+ public:
+ static Result<std::unique_ptr<SpillWriter>> Create(
+ const std::shared_ptr<FileSystem>& fs, const
std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<FileIOChannel::Enumerator>& channel_enumerator,
+ const std::shared_ptr<SpillChannelManager>& spill_channel_manager,
+ const std::string& compression, int32_t compression_level, bool
use_threads,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ SpillWriter(const SpillWriter&) = delete;
+ SpillWriter& operator=(const SpillWriter&) = delete;
+
+ Status WriteBatch(const std::shared_ptr<arrow::RecordBatch>& batch);
+ Status Close();
+ Result<int64_t> GetFileSize() const;
+ const FileIOChannel::ID& GetChannelId() const;
+
+ private:
+ SpillWriter(const std::shared_ptr<FileSystem>& fs, const
std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<FileIOChannel::Enumerator>&
channel_enumerator,
+ const std::shared_ptr<SpillChannelManager>&
spill_channel_manager,
+ const std::string& compression, int32_t compression_level,
bool use_threads,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ Status Open();
+
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<arrow::Schema> schema_;
+ std::shared_ptr<FileIOChannel::Enumerator> channel_enumerator_;
+ std::shared_ptr<SpillChannelManager> spill_channel_manager_;
+ std::string compression_;
+ int32_t compression_level_;
+ bool use_threads_;
+ std::shared_ptr<OutputStream> out_stream_;
+ std::shared_ptr<ArrowOutputStreamAdapter> arrow_output_stream_adapter_;
+ std::unique_ptr<arrow::MemoryPool> arrow_pool_;
+ std::shared_ptr<arrow::ipc::RecordBatchWriter> arrow_writer_;
+ FileIOChannel::ID channel_id_;
+ bool closed_ = false;
+};
+
+} // namespace paimon