Copilot commented on code in PR #67:
URL: https://github.com/apache/paimon-cpp/pull/67#discussion_r3385005548


##########
src/paimon/core/mergetree/spill_writer.h:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/ipc/api.h"
+#include "paimon/core/disk/file_io_channel.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class RecordBatch;
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+
+class ArrowOutputStreamAdapter;
+class MemoryPool;
+class SpillChannelManager;
+
+class SpillWriter {
+ public:
+    static Result<std::unique_ptr<SpillWriter>> Create(
+        const std::shared_ptr<FileSystem>& fs, const 
std::shared_ptr<arrow::Schema>& schema,
+        const std::shared_ptr<FileIOChannel::Enumerator>& channel_enumerator,
+        const std::shared_ptr<SpillChannelManager>& spill_channel_manager,
+        const std::string& compression, int32_t compression_level, bool 
use_threads,
+        const std::shared_ptr<MemoryPool>& pool);
+
+    SpillWriter(const SpillWriter&) = delete;
+    SpillWriter& operator=(const SpillWriter&) = delete;
+
+    Status WriteBatch(const std::shared_ptr<arrow::RecordBatch>& batch);
+    Status Close();
+    Result<int64_t> GetFileSize() const;
+    const FileIOChannel::ID& GetChannelId() const;
+
+ private:
+    SpillWriter(const std::shared_ptr<FileSystem>& fs, const 
std::shared_ptr<arrow::Schema>& schema,
+                const std::shared_ptr<FileIOChannel::Enumerator>& 
channel_enumerator,
+                const std::shared_ptr<SpillChannelManager>& 
spill_channel_manager,
+                const std::string& compression, int32_t compression_level, 
bool use_threads,
+                const std::shared_ptr<MemoryPool>& pool);
+
+    Status Open();
+
+    std::shared_ptr<FileSystem> fs_;
+    std::shared_ptr<arrow::Schema> schema_;
+    std::shared_ptr<FileIOChannel::Enumerator> channel_enumerator_;
+    std::shared_ptr<SpillChannelManager> spill_channel_manager_;
+    std::string compression_;
+    int32_t compression_level_;
+    bool use_threads_;
+    std::shared_ptr<OutputStream> out_stream_;
+    std::shared_ptr<ArrowOutputStreamAdapter> arrow_output_stream_adapter_;
+    std::unique_ptr<arrow::MemoryPool> arrow_pool_;
+    std::shared_ptr<arrow::ipc::RecordBatchWriter> arrow_writer_;

Review Comment:
   `arrow_pool_` is declared as `std::unique_ptr<arrow::MemoryPool>`, but the 
implementation initializes it with `GetArrowPool(pool)` (see 
`spill_writer.cpp:43`). In the same PR, `SpillReader` stores the result of 
`GetArrowPool(pool)` in a `std::shared_ptr<arrow::MemoryPool>`, strongly 
suggesting `GetArrowPool` returns a `shared_ptr`, which would make 
`SpillWriter` fail to compile (no implicit conversion from `shared_ptr` to 
`unique_ptr`). Make `SpillWriter::arrow_pool_` use the same ownership type as 
`GetArrowPool` (likely `std::shared_ptr<arrow::MemoryPool>`), or change 
`GetArrowPool` usage to match the declared type.



##########
src/paimon/core/mergetree/spill_writer.cpp:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/mergetree/spill_writer.h"
+
+#include "paimon/common/utils/arrow/arrow_output_stream_adapter.h"
+#include "paimon/common/utils/arrow/arrow_utils.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/core/mergetree/spill_channel_manager.h"
+
+namespace paimon {
+
+SpillWriter::SpillWriter(const std::shared_ptr<FileSystem>& fs,
+                         const std::shared_ptr<arrow::Schema>& schema,
+                         const std::shared_ptr<FileIOChannel::Enumerator>& 
channel_enumerator,
+                         const std::shared_ptr<SpillChannelManager>& 
spill_channel_manager,
+                         const std::string& compression, int32_t 
compression_level,
+                         bool use_threads, const std::shared_ptr<MemoryPool>& 
pool)
+    : fs_(fs),
+      schema_(schema),
+      channel_enumerator_(channel_enumerator),
+      spill_channel_manager_(spill_channel_manager),
+      compression_(compression),
+      compression_level_(compression_level),
+      use_threads_(use_threads),
+      arrow_pool_(GetArrowPool(pool)) {}
+
+Result<std::unique_ptr<SpillWriter>> SpillWriter::Create(
+    const std::shared_ptr<FileSystem>& fs, const 
std::shared_ptr<arrow::Schema>& schema,
+    const std::shared_ptr<FileIOChannel::Enumerator>& channel_enumerator,
+    const std::shared_ptr<SpillChannelManager>& spill_channel_manager,
+    const std::string& compression, int32_t compression_level, bool 
use_threads,
+    const std::shared_ptr<MemoryPool>& pool) {
+    std::unique_ptr<SpillWriter> writer(new SpillWriter(fs, schema, 
channel_enumerator,
+                                                        spill_channel_manager, 
compression,
+                                                        compression_level, 
use_threads, pool));
+    PAIMON_RETURN_NOT_OK(writer->Open());
+    return writer;
+}
+
+Status SpillWriter::Open() {
+    channel_id_ = channel_enumerator_->Next();
+    auto ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults();
+    ipc_write_options.memory_pool = arrow_pool_.get();
+    ipc_write_options.use_threads = use_threads_;
+    auto cleanup_guard = ScopeGuard([&]() {
+        arrow_writer_.reset();
+        arrow_output_stream_adapter_.reset();
+        if (out_stream_) {
+            [[maybe_unused]] auto status = out_stream_->Close();
+            out_stream_.reset();
+        }
+        if (!channel_id_.GetPath().empty()) {
+            [[maybe_unused]] auto status = fs_->Delete(channel_id_.GetPath());
+        }
+    });
+    PAIMON_ASSIGN_OR_RAISE(arrow::Compression::type arrow_compression,
+                           ArrowUtils::GetCompressionType(compression_));
+    if (!arrow::util::Codec::SupportsCompressionLevel(arrow_compression)) {
+        compression_level_ = arrow::util::Codec::UseDefaultCompressionLevel();
+    }
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+        ipc_write_options.codec, arrow::util::Codec::Create(arrow_compression, 
compression_level_));
+    PAIMON_ASSIGN_OR_RAISE(out_stream_, fs_->Create(channel_id_.GetPath(), 
/*overwrite=*/false));
+    arrow_output_stream_adapter_ = 
std::make_shared<ArrowOutputStreamAdapter>(out_stream_);
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+        arrow_writer_,
+        arrow::ipc::MakeFileWriter(arrow_output_stream_adapter_, schema_, 
ipc_write_options));
+    spill_channel_manager_->AddChannel(channel_id_);
+    cleanup_guard.Release();
+    return Status::OK();
+}
+
+Status SpillWriter::WriteBatch(const std::shared_ptr<arrow::RecordBatch>& 
batch) {
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow_writer_->WriteRecordBatch(*batch));
+    return Status::OK();
+}

Review Comment:
   `WriteBatch` unconditionally dereferences `arrow_writer_` and doesn't 
enforce the `closed_` state. After `Close()`, `arrow_writer_` is not reset, and 
the underlying stream is closed; calling `WriteBatch` could either crash (if 
`arrow_writer_` is null) or return an Arrow error in a hard-to-diagnose way. 
Add a fast-fail guard (mandatory) that returns `Status::Invalid(...)` when 
`closed_` is true or when `arrow_writer_` is null, so API misuse is detected 
deterministically.



##########
src/paimon/core/mergetree/spill_reader_writer_test.cpp:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/disk/io_manager.h"
+#include "paimon/core/mergetree/spill_channel_manager.h"
+#include "paimon/core/mergetree/spill_reader.h"
+#include "paimon/core/mergetree/spill_writer.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class SpillReaderWriterTest : public ::testing::TestWithParam<std::string> {
+ public:
+    void SetUp() override {
+        read_pool_ = GetDefaultPool();
+        write_pool_ = GetDefaultPool();
+        test_dir_ = UniqueTestDirectory::Create();
+        file_system_ = test_dir_->GetFileSystem();
+
+        io_manager_ = std::make_unique<IOManager>(test_dir_->Str(), 
test_dir_->GetFileSystem());
+        ASSERT_OK_AND_ASSIGN(channel_enumerator_, 
io_manager_->CreateChannelEnumerator());
+        spill_channel_manager_ = 
std::make_shared<SpillChannelManager>(file_system_, 128);
+
+        // Build write schema: [_SEQUENCE_NUMBER, _VALUE_KIND, key fields..., 
value fields...]
+        value_fields_ = {DataField(0, arrow::field("f0", arrow::utf8())),
+                         DataField(1, arrow::field("f1", arrow::int32()))};
+        key_fields_ = {DataField(0, arrow::field("f0", arrow::utf8()))};
+
+        key_schema_ = DataField::ConvertDataFieldsToArrowSchema(key_fields_);
+        value_schema_ = 
DataField::ConvertDataFieldsToArrowSchema(value_fields_);
+        write_schema_ = 
SpecialFields::CompleteSequenceAndValueKindField(value_schema_);
+        write_type_ = arrow::struct_(write_schema_->fields());
+    }
+
+    std::shared_ptr<arrow::RecordBatch> CreateRecordBatch(const std::string& 
json_data,
+                                                          int64_t num_rows) 
const {
+        auto array = arrow::ipc::internal::json::ArrayFromJSON(write_type_, 
json_data).ValueOrDie();

Review Comment:
   Using `ValueOrDie()` in tests aborts the process and can hide useful failure 
context from gtest output. Prefer asserting the `Result` with an 
`ASSERT_OK_AND_ASSIGN`-style helper (or equivalent) so failures are reported as 
test assertions rather than hard termination.



##########
src/paimon/core/mergetree/in_memory_sort_buffer.cpp:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/mergetree/in_memory_sort_buffer.h"
+
+#include <cassert>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/core/io/key_value_in_memory_record_reader.h"
+#include "paimon/core/io/key_value_record_reader.h"
+#include "paimon/data/decimal.h"
+
+namespace paimon {
+
+InMemorySortBuffer::InMemorySortBuffer(int64_t last_sequence_number,
+                                       const std::shared_ptr<arrow::DataType>& 
value_type,
+                                       const std::vector<std::string>& 
trimmed_primary_keys,
+                                       const std::vector<std::string>& 
user_defined_sequence_fields,
+                                       bool sequence_fields_ascending,
+                                       const 
std::shared_ptr<FieldsComparator>& key_comparator,
+                                       uint64_t write_buffer_size,
+                                       const std::shared_ptr<MemoryPool>& pool)
+    : pool_(pool),
+      value_type_(value_type),
+      trimmed_primary_keys_(trimmed_primary_keys),
+      user_defined_sequence_fields_(user_defined_sequence_fields),
+      sequence_fields_ascending_(sequence_fields_ascending),
+      key_comparator_(key_comparator),
+      write_buffer_size_(write_buffer_size),
+      next_sequence_number_(last_sequence_number + 1) {}
+
+void InMemorySortBuffer::Clear() {
+    buffered_batches_.clear();
+    current_memory_in_bytes_ = 0;
+    total_row_count_ = 0;
+}
+
+uint64_t InMemorySortBuffer::GetMemorySize() const {
+    return current_memory_in_bytes_;
+}
+
+Result<bool> InMemorySortBuffer::FlushMemory() {
+    return false;
+}
+
+Result<bool> InMemorySortBuffer::Write(std::unique_ptr<RecordBatch>&& 
moved_batch) {
+    if (ArrowArrayIsReleased(moved_batch->GetData())) {
+        return Status::Invalid("invalid batch: data is released");
+    }
+    std::unique_ptr<RecordBatch> batch = std::move(moved_batch);
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
arrow_array,
+                                      arrow::ImportArray(batch->GetData(), 
value_type_));
+    auto value_struct_array = 
std::dynamic_pointer_cast<arrow::StructArray>(arrow_array);
+    if (value_struct_array == nullptr) {
+        return Status::Invalid("invalid RecordBatch: cannot cast to 
StructArray");
+    }
+    PAIMON_ASSIGN_OR_RAISE(int64_t memory_in_bytes, 
EstimateMemoryUse(value_struct_array));
+    current_memory_in_bytes_ += static_cast<uint64_t>(memory_in_bytes);
+
+    BufferedWriteBatch buffered_batch;
+    buffered_batch.first_sequence_number = next_sequence_number_;
+    buffered_batch.struct_array = std::move(value_struct_array);
+    buffered_batch.row_kinds = batch->GetRowKind();
+    next_sequence_number_ += buffered_batch.struct_array->length();
+    total_row_count_ += buffered_batch.struct_array->length();
+    buffered_batches_.push_back(std::move(buffered_batch));
+    if (total_row_count_ > 0) {
+        estimated_memory_use_for_each_row_ = current_memory_in_bytes_ / 
total_row_count_;
+    }
+    return current_memory_in_bytes_ < write_buffer_size_;
+}
+
+Result<std::vector<std::unique_ptr<KeyValueRecordReader>>> 
InMemorySortBuffer::CreateReaders() {
+    std::vector<std::unique_ptr<KeyValueRecordReader>> readers;
+    if (buffered_batches_.empty()) {
+        return readers;
+    }
+
+    readers.reserve(buffered_batches_.size());
+    for (auto& buffered_batch : buffered_batches_) {
+        auto in_memory_reader = std::make_unique<KeyValueInMemoryRecordReader>(
+            buffered_batch.first_sequence_number, buffered_batch.struct_array,
+            buffered_batch.row_kinds, trimmed_primary_keys_, 
user_defined_sequence_fields_,
+            sequence_fields_ascending_, key_comparator_, pool_);
+        readers.push_back(std::move(in_memory_reader));
+    }
+    return readers;
+}
+
+bool InMemorySortBuffer::HasData() const {
+    return !buffered_batches_.empty();
+}
+
+uint64_t InMemorySortBuffer::GetEstimateMemoryUseForEachRow() const {
+    return estimated_memory_use_for_each_row_;
+}
+
+// TODO(jinli.zjw): Consider making the memory estimation more accurate.
+// https://github.com/alibaba/paimon-cpp/pull/206#discussion_r3021325389
+Result<int64_t> InMemorySortBuffer::EstimateMemoryUse(const 
std::shared_ptr<arrow::Array>& array) {
+    arrow::Type::type type = array->type()->id();
+    int64_t null_bits_size_in_bytes = (array->length() + 7) / 8;
+    switch (type) {
+        case arrow::Type::type::BOOL:
+            return null_bits_size_in_bytes + array->length() * sizeof(bool);
+        case arrow::Type::type::INT8:
+            return null_bits_size_in_bytes + array->length() * sizeof(int8_t);
+        case arrow::Type::type::INT16:
+            return null_bits_size_in_bytes + array->length() * sizeof(int16_t);
+        case arrow::Type::type::INT32:
+            return null_bits_size_in_bytes + array->length() * sizeof(int32_t);
+        case arrow::Type::type::DATE32:
+            return null_bits_size_in_bytes + array->length() * sizeof(int32_t);
+        case arrow::Type::type::INT64:
+            return null_bits_size_in_bytes + array->length() * sizeof(int64_t);
+        case arrow::Type::type::FLOAT:
+            return null_bits_size_in_bytes + array->length() * sizeof(float);
+        case arrow::Type::type::DOUBLE:
+            return null_bits_size_in_bytes + array->length() * sizeof(double);
+        case arrow::Type::type::TIMESTAMP:
+            return null_bits_size_in_bytes + array->length() * sizeof(int64_t);
+        case arrow::Type::type::DECIMAL:
+            return null_bits_size_in_bytes + array->length() * 
sizeof(Decimal::int128_t);
+        case arrow::Type::type::STRING:
+        case arrow::Type::type::BINARY: {
+            auto binary_array =
+                arrow::internal::checked_cast<const 
arrow::BinaryArray*>(array.get());
+            assert(binary_array);
+            int64_t value_length = binary_array->total_values_length();
+            int64_t offset_length = array->length() * sizeof(int32_t);
+            return null_bits_size_in_bytes + value_length + offset_length;
+        }

Review Comment:
   The memory estimator undercounts variable-length arrays: the offsets buffer 
is `(length + 1) * sizeof(int32_t)`, not `length * sizeof(int32_t)`. This 
underestimation directly feeds `current_memory_in_bytes_` and the 
`write_buffer_size_` gate, so it can cause the in-memory buffer to exceed its 
intended limit under realistic workloads. Update the estimation to account for 
the full offsets buffer size (and consider also covering offsets for LIST/MAP 
similarly, or summing `array->data()->buffers[i]->size()` where available) to 
avoid systematic underestimation.



##########
src/paimon/core/mergetree/spill_channel_manager.h:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <unordered_set>
+
+#include "paimon/core/disk/file_io_channel.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class SpillChannelManager {
+ public:
+    SpillChannelManager(const std::shared_ptr<FileSystem>& fs, size_t 
initial_capacity) : fs_(fs) {
+        channels_.reserve(initial_capacity);
+    }
+
+    void AddChannel(const FileIOChannel::ID& channel_id) {
+        channels_.emplace(channel_id);
+    }
+
+    Status DeleteChannel(const FileIOChannel::ID& channel_id) {
+        PAIMON_RETURN_NOT_OK(fs_->Delete(channel_id.GetPath()));
+        channels_.erase(channel_id);
+        return Status::OK();
+    }
+
+    void Reset() {
+        for (const auto& channel : channels_) {
+            [[maybe_unused]] auto status = fs_->Delete(channel.GetPath());
+        }
+        channels_.clear();
+    }

Review Comment:
   `Reset()` silently ignores deletion failures. Operationally this can leave 
spill files behind while also clearing `channels_`, making subsequent cleanup 
impossible and hiding disk-usage growth. Prefer returning a `Status` from 
`Reset()` and aggregate/return the first error (or a combined error), or at 
minimum keep failed deletions in `channels_` so the caller can retry cleanup.



##########
src/paimon/core/mergetree/sort_buffer.h:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "paimon/record_batch.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class KeyValueRecordReader;
+
+/// SortBuffer is the interface for managing buffered records with sorting 
capability.
+/// It abstracts the in-memory and external sort buffer implementations.
+class SortBuffer {
+ public:
+    virtual ~SortBuffer() = default;
+
+    /// Reset the buffer, releasing all in-memory batches and on-disk spill 
files.
+    virtual void Clear() = 0;
+
+    /// Return the current memory usage in bytes.
+    virtual uint64_t GetMemorySize() const = 0;
+
+    /// Spill in-memory data to disk if supported by this implementation.
+    /// @return true for FlushMemory success with the buffer can accept more 
data afterwards.
+    ///         false for FlushMemory success with there is no more quota for 
next flush. (on-disk)
+    ///         false for the FlushMemory operation is not supported. 
(in-memory only)
+    ///         Status::Invalid for flush failure.
+    virtual Result<bool> FlushMemory() = 0;

Review Comment:
   The `Result<bool>` return contract is ambiguous: `false` is documented for 
both 'no more quota' and 'not supported', which makes it impossible for callers 
to distinguish those cases without RTTI/side channels. Since this is a new 
interface, consider using `Status::NotSupported` for the 'in-memory only' case 
(or return an enum), while reserving `false` for the 'no more quota' condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to