Copilot commented on code in PR #72: URL: https://github.com/apache/paimon-cpp/pull/72#discussion_r3385262472
########## src/paimon/core/mergetree/external_sort_buffer.cpp: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/external_sort_buffer.h" + +#include <algorithm> +#include <cassert> +#include <utility> + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "arrow/compute/api.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/async_key_value_producer_and_consumer.h" +#include "paimon/core/io/key_value_in_memory_record_reader.h" +#include "paimon/core/io/key_value_meta_projection_consumer.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/io/row_to_arrow_array_converter.h" +#include "paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h" +#include "paimon/core/mergetree/spill_channel_manager.h" +#include "paimon/core/mergetree/spill_reader.h" +#include "paimon/core/mergetree/spill_writer.h" + +namespace paimon { + +Result<std::unique_ptr<ExternalSortBuffer>> ExternalSortBuffer::Create( + std::unique_ptr<InMemorySortBuffer>&& in_memory_buffer, + const std::shared_ptr<arrow::Schema>& value_schema, + const std::vector<std::string>& trimmed_primary_keys, + const std::shared_ptr<FieldsComparator>& key_comparator, + const std::shared_ptr<FieldsComparator>& user_defined_seq_comparator, + const CoreOptions& options, const std::shared_ptr<IOManager>& io_manager, + bool enable_multi_thread_spill, const std::shared_ptr<MemoryPool>& pool) { + if (options.GetLocalSortMaxNumFileHandles() < kSpillMinFanIn) { + return Status::Invalid(fmt::format( + "invalid '{}': {}, must be at least {}", Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, + options.GetLocalSortMaxNumFileHandles(), kSpillMinFanIn)); + } + arrow::FieldVector key_fields; + key_fields.reserve(trimmed_primary_keys.size()); + for (const auto& primary_key : trimmed_primary_keys) { + auto key_field = value_schema->GetFieldByName(primary_key); + assert(key_field != nullptr); + key_fields.push_back(key_field); + } + auto key_schema = arrow::schema(key_fields); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIOChannel::Enumerator> spill_channel_enumerator, + io_manager->CreateChannelEnumerator()); + return std::unique_ptr<ExternalSortBuffer>( + new ExternalSortBuffer(std::move(in_memory_buffer), key_schema, value_schema, + key_comparator, user_defined_seq_comparator, options, + spill_channel_enumerator, enable_multi_thread_spill, pool)); +} + +ExternalSortBuffer::ExternalSortBuffer( + std::unique_ptr<InMemorySortBuffer>&& in_memory_buffer, + const std::shared_ptr<arrow::Schema>& key_schema, + const std::shared_ptr<arrow::Schema>& value_schema, + const std::shared_ptr<FieldsComparator>& key_comparator, + const std::shared_ptr<FieldsComparator>& user_defined_seq_comparator, + const CoreOptions& options, + const std::shared_ptr<FileIOChannel::Enumerator>& spill_channel_enumerator, + bool enable_multi_thread_spill, const std::shared_ptr<MemoryPool>& pool) + : in_memory_buffer_(std::move(in_memory_buffer)), + pool_(pool), + key_schema_(key_schema), + value_schema_(value_schema), + key_comparator_(key_comparator), + user_defined_seq_comparator_(user_defined_seq_comparator), + write_schema_(SpecialFields::CompleteSequenceAndValueKindField(value_schema)), + options_(options), + max_fan_in_(options.GetLocalSortMaxNumFileHandles()), + enable_multi_thread_spill_(enable_multi_thread_spill), + spill_channel_manager_( + std::make_shared<SpillChannelManager>(options_.GetFileSystem(), max_fan_in_)), + spill_merger_(std::make_unique<SpillFileMerger>(max_fan_in_)), + spill_channel_enumerator_(spill_channel_enumerator), + actual_max_fan_in_(max_fan_in_), + spill_batch_size_(options_.GetWriteBatchSize()) {} + +ExternalSortBuffer::~ExternalSortBuffer() { + DoClear(); +} + +bool ExternalSortBuffer::HasSpilledData() const { + return !spill_channel_manager_->GetChannels().empty(); +} + +void ExternalSortBuffer::DoClear() { + in_memory_buffer_->Clear(); + + spill_channel_manager_->Reset(); + total_spill_disk_bytes_ = 0; + spill_merger_->Clear(); +} + +void ExternalSortBuffer::Clear() { + DoClear(); +} + +uint64_t ExternalSortBuffer::GetMemorySize() const { + return in_memory_buffer_->GetMemorySize(); +} + +void ExternalSortBuffer::UpdateSpillParameters() { + int64_t estimated_row_size = in_memory_buffer_->GetEstimateMemoryUseForEachRow(); + if (estimated_row_size <= 0) { + return; + } + + const int32_t max_batch_size = options_.GetWriteBatchSize(); + const int32_t min_batch_size = std::min(kSpillMinBatchSize, max_batch_size); + const int64_t merge_budget = options_.GetWriteBufferSize(); + const int64_t max_memory_use_per_handle = merge_budget / max_fan_in_; + + spill_batch_size_ = max_memory_use_per_handle / estimated_row_size; + spill_batch_size_ = std::clamp(spill_batch_size_, min_batch_size, max_batch_size); + + actual_max_fan_in_ = merge_budget / (spill_batch_size_ * estimated_row_size); + actual_max_fan_in_ = std::clamp(actual_max_fan_in_, kSpillMinFanIn, max_fan_in_); + + // Re-derive spill_batch_size_ from the clamped actual_max_fan_in_ to stay within merge_budget. + spill_batch_size_ = merge_budget / (actual_max_fan_in_ * estimated_row_size); + spill_batch_size_ = std::clamp(spill_batch_size_, 1, max_batch_size); + + spill_merger_->SetMaxFanIn(actual_max_fan_in_); +} + +Result<bool> ExternalSortBuffer::FlushMemory() { + if (!in_memory_buffer_->HasData()) { + return true; + } + + UpdateSpillParameters(); + PAIMON_ASSIGN_OR_RAISE(std::vector<std::unique_ptr<KeyValueRecordReader>> memory_buffer_readers, + in_memory_buffer_->CreateReaders()); + PAIMON_RETURN_NOT_OK(SpillMemoryBuffer(std::move(memory_buffer_readers))); + in_memory_buffer_->Clear(); + return total_spill_disk_bytes_ < options_.GetWriteBufferSpillMaxDiskSize(); +} + +Result<bool> ExternalSortBuffer::Write(std::unique_ptr<RecordBatch>&& batch) { + PAIMON_ASSIGN_OR_RAISE(bool has_remaining_memory, in_memory_buffer_->Write(std::move(batch))); + if (has_remaining_memory) { + return true; + } + return FlushMemory(); +} + +Result<std::vector<std::unique_ptr<KeyValueRecordReader>>> ExternalSortBuffer::CreateReaders() { + PAIMON_ASSIGN_OR_RAISE(std::vector<std::unique_ptr<KeyValueRecordReader>> memory_readers, + in_memory_buffer_->CreateReaders()); + if (!HasSpilledData()) { + return memory_readers; + } + + int32_t max_spill_files = actual_max_fan_in_ - 1; + PAIMON_RETURN_NOT_OK( + spill_merger_->RunFinalMergeIfNeeded(max_spill_files, CreateSpillFileMergeFn())); + PAIMON_ASSIGN_OR_RAISE(std::vector<std::unique_ptr<KeyValueRecordReader>> readers, + CreateSpillReaders(spill_merger_->GetAllFiles())); + readers.insert(readers.end(), std::make_move_iterator(memory_readers.begin()), + std::make_move_iterator(memory_readers.end())); + return readers; +} + +bool ExternalSortBuffer::HasData() const { + return in_memory_buffer_->HasData() || HasSpilledData(); +} + +Result<std::vector<std::unique_ptr<KeyValueRecordReader>>> ExternalSortBuffer::CreateSpillReaders( + const std::vector<FileChannelInfo>& files) const { + std::vector<std::unique_ptr<KeyValueRecordReader>> readers; + readers.reserve(files.size()); + for (const auto& file : files) { + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr<SpillReader> reader, + SpillReader::Create(options_.GetFileSystem(), key_schema_, value_schema_, + enable_multi_thread_spill_, file.channel_id, pool_)); + readers.push_back(std::move(reader)); + } + return readers; +} + +Result<FileChannelInfo> ExternalSortBuffer::SpillToDisk( + std::vector<std::unique_ptr<KeyValueRecordReader>>&& readers, int32_t write_batch_size) { + const auto& spill_compress_options = options_.GetSpillCompressOptions(); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr<SpillWriter> spill_writer, + SpillWriter::Create(options_.GetFileSystem(), write_schema_, spill_channel_enumerator_, + spill_channel_manager_, spill_compress_options.compress, + spill_compress_options.zstd_level, enable_multi_thread_spill_, pool_)); + auto cleanup_guard = ScopeGuard([&]() { + [[maybe_unused]] auto status = + spill_channel_manager_->DeleteChannel(spill_writer->GetChannelId()); + }); + + auto sorted_reader = std::make_unique<SortMergeReaderWithMinHeap>( + std::move(readers), key_comparator_, user_defined_seq_comparator_, + /*merge_function_wrapper=*/nullptr); + auto create_consumer = [target_schema = write_schema_, pool = pool_]() + -> Result<std::unique_ptr<RowToArrowArrayConverter<KeyValue, KeyValueBatch>>> { + return KeyValueMetaProjectionConsumer::Create(target_schema, pool); + }; + auto async_key_value_producer_consumer = + std::make_unique<AsyncKeyValueProducerAndConsumer<KeyValue, KeyValueBatch>>( + std::move(sorted_reader), create_consumer, write_batch_size, + /*projection_thread_num=*/1, pool_); + auto close_guard = ScopeGuard([&]() { async_key_value_producer_consumer->Close(); }); + + while (true) { + PAIMON_ASSIGN_OR_RAISE(KeyValueBatch key_value_batch, + async_key_value_producer_consumer->NextBatch()); + if (key_value_batch.batch == nullptr) { + break; + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr<arrow::RecordBatch> record_batch, + arrow::ImportRecordBatch(key_value_batch.batch.get(), write_schema_)); + PAIMON_RETURN_NOT_OK(spill_writer->WriteBatch(record_batch)); + } + + PAIMON_RETURN_NOT_OK(spill_writer->Close()); + PAIMON_ASSIGN_OR_RAISE(int64_t spilled_file_size, spill_writer->GetFileSize()); + cleanup_guard.Release(); + return FileChannelInfo{spill_writer->GetChannelId(), spilled_file_size}; +} + +Status ExternalSortBuffer::SpillMemoryBuffer( + std::vector<std::unique_ptr<KeyValueRecordReader>>&& readers) { + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo file_info, + SpillToDisk(std::move(readers), spill_batch_size_)); + total_spill_disk_bytes_ += file_info.file_size; + spill_merger_->AddFile(file_info); + return spill_merger_->RunMergeIfNeeded(CreateSpillFileMergeFn()); +} + +SpillFileMerger::MergeFn ExternalSortBuffer::CreateSpillFileMergeFn() { + return [this](const std::vector<FileChannelInfo>& files) -> Result<FileChannelInfo> { + return MergeAndReplaceFiles(files); + }; +} + +Result<FileChannelInfo> ExternalSortBuffer::MergeAndReplaceFiles( + const std::vector<FileChannelInfo>& files) { + PAIMON_ASSIGN_OR_RAISE(std::vector<std::unique_ptr<KeyValueRecordReader>> readers, + CreateSpillReaders(files)); + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo output, + SpillToDisk(std::move(readers), spill_batch_size_)); + total_spill_disk_bytes_ += output.file_size; + + for (const auto& file : files) { + [[maybe_unused]] auto status = spill_channel_manager_->DeleteChannel(file.channel_id); + total_spill_disk_bytes_ -= file.file_size; + } + return output; +} Review Comment: Disk-usage accounting becomes incorrect if `DeleteChannel(...)` fails: `total_spill_disk_bytes_` is decremented regardless of deletion success, which can cause later quota checks (e.g., `FlushMemory()` return value) to be wrong. This should either (a) return an error when deletion fails, or (b) only subtract `file.file_size` on successful deletion and otherwise keep the bytes (and ideally log/propagate the failure). ########## src/paimon/core/mergetree/external_sort_buffer.cpp: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/external_sort_buffer.h" + +#include <algorithm> +#include <cassert> +#include <utility> + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "arrow/compute/api.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/async_key_value_producer_and_consumer.h" +#include "paimon/core/io/key_value_in_memory_record_reader.h" +#include "paimon/core/io/key_value_meta_projection_consumer.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/io/row_to_arrow_array_converter.h" +#include "paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h" +#include "paimon/core/mergetree/spill_channel_manager.h" +#include "paimon/core/mergetree/spill_reader.h" +#include "paimon/core/mergetree/spill_writer.h" + +namespace paimon { + +Result<std::unique_ptr<ExternalSortBuffer>> ExternalSortBuffer::Create( + std::unique_ptr<InMemorySortBuffer>&& in_memory_buffer, + const std::shared_ptr<arrow::Schema>& value_schema, + const std::vector<std::string>& trimmed_primary_keys, + const std::shared_ptr<FieldsComparator>& key_comparator, + const std::shared_ptr<FieldsComparator>& user_defined_seq_comparator, + const CoreOptions& options, const std::shared_ptr<IOManager>& io_manager, + bool enable_multi_thread_spill, const std::shared_ptr<MemoryPool>& pool) { + if (options.GetLocalSortMaxNumFileHandles() < kSpillMinFanIn) { + return Status::Invalid(fmt::format( + "invalid '{}': {}, must be at least {}", Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, + options.GetLocalSortMaxNumFileHandles(), kSpillMinFanIn)); + } + arrow::FieldVector key_fields; + key_fields.reserve(trimmed_primary_keys.size()); + for (const auto& primary_key : trimmed_primary_keys) { + auto key_field = value_schema->GetFieldByName(primary_key); + assert(key_field != nullptr); + key_fields.push_back(key_field); + } Review Comment: Using `assert(key_field != nullptr)` for validating that each primary key exists in `value_schema` can turn into undefined behavior in release builds (asserts compiled out), potentially passing null fields into `arrow::schema(...)`. Prefer returning a `Status::Invalid(...)` identifying the missing primary key name. ########## src/paimon/core/mergetree/spill_file_merger.cpp: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/spill_file_merger.h" + +#include <algorithm> +#include <cassert> + +namespace paimon { + +SpillFileMerger::SpillFileMerger(int32_t max_fan_in) : max_fan_in_(max_fan_in) { + assert(max_fan_in >= 2); +} + +void SpillFileMerger::SetMaxFanIn(int32_t max_fan_in) { + assert(max_fan_in >= 2); + max_fan_in_ = max_fan_in; +} Review Comment: `assert(max_fan_in >= 2)` is not a runtime guard in release builds; invalid values can slip through and later cause incorrect behavior (including divisions by `max_fan_in_` elsewhere). Since this is production code (not test-only), prefer enforcing the invariant with a runtime check (e.g., clamp to 2, throw, or return an error—whichever matches existing project conventions). ########## src/paimon/core/mergetree/spill_file_merger.cpp: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/spill_file_merger.h" + +#include <algorithm> +#include <cassert> + +namespace paimon { + +SpillFileMerger::SpillFileMerger(int32_t max_fan_in) : max_fan_in_(max_fan_in) { + assert(max_fan_in >= 2); +} + +void SpillFileMerger::SetMaxFanIn(int32_t max_fan_in) { + assert(max_fan_in >= 2); + max_fan_in_ = max_fan_in; +} + +void SpillFileMerger::Clear() { + levels_.clear(); +} + +void SpillFileMerger::AddFile(const FileChannelInfo& file_info) { + EnsureLevel(0); + levels_[0].push_back(file_info); +} + +Status SpillFileMerger::RunMergeIfNeeded(const MergeFn& merge_fn) { + while (NeedMerge()) { + auto task = PickMergeTask(); + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo output, merge_fn(task.input_files)); + ApplyMergeResult(task, output); + } + return Status::OK(); +} + +Status SpillFileMerger::RunFinalMergeIfNeeded(int32_t target_file_count, const MergeFn& merge_fn) { + while (GetTotalFileCount() > target_file_count) { + auto task = PickFinalMergeBatch(target_file_count); + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo output, merge_fn(task.input_files)); + ApplyMergeResult(task, output); + } + return Status::OK(); +} + +bool SpillFileMerger::NeedMerge() const { + for (const auto& level : levels_) { + if (static_cast<int32_t>(level.size()) >= max_fan_in_) { + return true; + } + } + return false; +} + +void SpillFileMerger::ApplyMergeResult(const MergeTask& task, const FileChannelInfo& output) { + for (const auto& file : task.input_files) { + RemoveFile(file.channel_id); + } + EnsureLevel(task.target_level); + levels_[task.target_level].push_back(output); +} + +SpillFileMerger::MergeTask SpillFileMerger::PickMergeTask() const { + for (int32_t i = 0; i < static_cast<int32_t>(levels_.size()); ++i) { + if (static_cast<int32_t>(levels_[i].size()) >= max_fan_in_) { + MergeTask task; + task.target_level = i + 1; + task.input_files.assign(levels_[i].begin(), levels_[i].begin() + max_fan_in_); + return task; + } + } + assert(false && "PickMergeTask called but no pending merge"); + return {}; +} + +SpillFileMerger::MergeTask SpillFileMerger::PickFinalMergeBatch(int32_t target_file_count) const { + int32_t total = GetTotalFileCount(); + assert(total > target_file_count); + + // Collect all files with their levels, sort by size ascending. + struct LeveledFile { + int32_t level; + FileChannelInfo entry; + }; + std::vector<LeveledFile> all_files; + for (int32_t level_idx = 0; level_idx < static_cast<int32_t>(levels_.size()); ++level_idx) { + for (const auto& file : levels_[level_idx]) { + all_files.push_back({level_idx, file}); + } + } + std::sort(all_files.begin(), all_files.end(), + [](const LeveledFile& lhs, const LeveledFile& rhs) { + return lhs.entry.file_size < rhs.entry.file_size; + }); + + // Merge `files_to_merge` (alias: n) files into 1 eliminates (n-1) files. + // Need to eliminate (total - target_file_count), so n = total - target_file_count + 1. + // Bounded by max_fan_in_ (max merge width per round). + int32_t files_to_merge = std::min(total - target_file_count + 1, max_fan_in_); + + MergeTask task; + int32_t max_level = 0; + for (int32_t i = 0; i < files_to_merge; ++i) { + max_level = std::max(max_level, all_files[i].level); + task.input_files.push_back(all_files[i].entry); + } + task.target_level = max_level + 1; + return task; +} + +std::vector<FileChannelInfo> SpillFileMerger::GetAllFiles() const { + std::vector<FileChannelInfo> result; + for (const auto& level : levels_) { + for (const auto& file : level) { + result.push_back(file); + } + } + return result; +} + +int32_t SpillFileMerger::GetTotalFileCount() const { + int32_t total = 0; + for (const auto& level : levels_) { + total += static_cast<int32_t>(level.size()); + } + return total; +} + +void SpillFileMerger::EnsureLevel(int32_t level) { + while (static_cast<int32_t>(levels_.size()) <= level) { + levels_.emplace_back(); + } +} + +void SpillFileMerger::RemoveFile(const FileIOChannel::ID& channel_id) { + for (auto& level : levels_) { + for (auto it = level.begin(); it != level.end(); ++it) { + if (it->channel_id == channel_id) { + level.erase(it); + return; + } + } + } +} Review Comment: `RemoveFile` is O(total_files) per removal due to nested linear scans. During merges this can become noticeably expensive as file counts grow. Consider storing per-file indices (e.g., an `unordered_map<ID, (level_idx, pos)>`, or per-level `unordered_map<ID, iterator>`) to make removals near O(1), and update the index when moving/erasing entries. ########## src/paimon/core/mergetree/external_sort_buffer.cpp: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/external_sort_buffer.h" + +#include <algorithm> +#include <cassert> +#include <utility> + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "arrow/compute/api.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/async_key_value_producer_and_consumer.h" +#include "paimon/core/io/key_value_in_memory_record_reader.h" +#include "paimon/core/io/key_value_meta_projection_consumer.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/io/row_to_arrow_array_converter.h" +#include "paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h" +#include "paimon/core/mergetree/spill_channel_manager.h" +#include "paimon/core/mergetree/spill_reader.h" +#include "paimon/core/mergetree/spill_writer.h" + +namespace paimon { + +Result<std::unique_ptr<ExternalSortBuffer>> ExternalSortBuffer::Create( + std::unique_ptr<InMemorySortBuffer>&& in_memory_buffer, + const std::shared_ptr<arrow::Schema>& value_schema, + const std::vector<std::string>& trimmed_primary_keys, + const std::shared_ptr<FieldsComparator>& key_comparator, + const std::shared_ptr<FieldsComparator>& user_defined_seq_comparator, + const CoreOptions& options, const std::shared_ptr<IOManager>& io_manager, + bool enable_multi_thread_spill, const std::shared_ptr<MemoryPool>& pool) { + if (options.GetLocalSortMaxNumFileHandles() < kSpillMinFanIn) { + return Status::Invalid(fmt::format( + "invalid '{}': {}, must be at least {}", Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, + options.GetLocalSortMaxNumFileHandles(), kSpillMinFanIn)); + } + arrow::FieldVector key_fields; + key_fields.reserve(trimmed_primary_keys.size()); + for (const auto& primary_key : trimmed_primary_keys) { + auto key_field = value_schema->GetFieldByName(primary_key); + assert(key_field != nullptr); + key_fields.push_back(key_field); + } + auto key_schema = arrow::schema(key_fields); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIOChannel::Enumerator> spill_channel_enumerator, + io_manager->CreateChannelEnumerator()); + return std::unique_ptr<ExternalSortBuffer>( + new ExternalSortBuffer(std::move(in_memory_buffer), key_schema, value_schema, + key_comparator, user_defined_seq_comparator, options, + spill_channel_enumerator, enable_multi_thread_spill, pool)); +} + +ExternalSortBuffer::ExternalSortBuffer( + std::unique_ptr<InMemorySortBuffer>&& in_memory_buffer, + const std::shared_ptr<arrow::Schema>& key_schema, + const std::shared_ptr<arrow::Schema>& value_schema, + const std::shared_ptr<FieldsComparator>& key_comparator, + const std::shared_ptr<FieldsComparator>& user_defined_seq_comparator, + const CoreOptions& options, + const std::shared_ptr<FileIOChannel::Enumerator>& spill_channel_enumerator, + bool enable_multi_thread_spill, const std::shared_ptr<MemoryPool>& pool) + : in_memory_buffer_(std::move(in_memory_buffer)), + pool_(pool), + key_schema_(key_schema), + value_schema_(value_schema), + key_comparator_(key_comparator), + user_defined_seq_comparator_(user_defined_seq_comparator), + write_schema_(SpecialFields::CompleteSequenceAndValueKindField(value_schema)), + options_(options), + max_fan_in_(options.GetLocalSortMaxNumFileHandles()), + enable_multi_thread_spill_(enable_multi_thread_spill), + spill_channel_manager_( + std::make_shared<SpillChannelManager>(options_.GetFileSystem(), max_fan_in_)), + spill_merger_(std::make_unique<SpillFileMerger>(max_fan_in_)), + spill_channel_enumerator_(spill_channel_enumerator), + actual_max_fan_in_(max_fan_in_), + spill_batch_size_(options_.GetWriteBatchSize()) {} + +ExternalSortBuffer::~ExternalSortBuffer() { + DoClear(); +} + +bool ExternalSortBuffer::HasSpilledData() const { + return !spill_channel_manager_->GetChannels().empty(); +} + +void ExternalSortBuffer::DoClear() { + in_memory_buffer_->Clear(); + + spill_channel_manager_->Reset(); + total_spill_disk_bytes_ = 0; + spill_merger_->Clear(); +} + +void ExternalSortBuffer::Clear() { + DoClear(); +} + +uint64_t ExternalSortBuffer::GetMemorySize() const { + return in_memory_buffer_->GetMemorySize(); +} + +void ExternalSortBuffer::UpdateSpillParameters() { + int64_t estimated_row_size = in_memory_buffer_->GetEstimateMemoryUseForEachRow(); + if (estimated_row_size <= 0) { + return; + } + + const int32_t max_batch_size = options_.GetWriteBatchSize(); + const int32_t min_batch_size = std::min(kSpillMinBatchSize, max_batch_size); + const int64_t merge_budget = options_.GetWriteBufferSize(); + const int64_t max_memory_use_per_handle = merge_budget / max_fan_in_; + + spill_batch_size_ = max_memory_use_per_handle / estimated_row_size; + spill_batch_size_ = std::clamp(spill_batch_size_, min_batch_size, max_batch_size); Review Comment: `spill_batch_size_` is an `int32_t`, but the intermediate calculation is effectively `int64_t / int64_t`. If `merge_budget` is large enough, the computed value can exceed `int32_t` and overflow/truncate before `std::clamp` is applied. Use an `int64_t` temporary, clamp in `int64_t`, then cast to `int32_t` after bounds are enforced. ########## src/paimon/core/mergetree/write_buffer_test.cpp: ########## @@ -0,0 +1,775 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/write_buffer.h" + +#include <cstdint> +#include <memory> +#include <vector> + +#include "arrow/api.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/core/core_options.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/mergetree/compact/deduplicate_merge_function.h" +#include "paimon/core/mergetree/compact/reducer_merge_function_wrapper.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/test_helper.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon { +template <typename T> +class MergeFunctionWrapper; +} // namespace paimon + +namespace paimon::test { +struct ReaderResult { + std::vector<int64_t> sequence_numbers; + std::vector<int8_t> row_kind_values; +}; + +class WriteBufferTest : public ::testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + tmp_dir_ = UniqueTestDirectory::Create(); + ASSERT_TRUE(tmp_dir_); + io_manager_ = std::make_shared<IOManager>(tmp_dir_->Str(), tmp_dir_->GetFileSystem()); + value_fields_ = {DataField(0, arrow::field("f0", arrow::utf8())), + DataField(1, arrow::field("f1", arrow::int32())), + DataField(2, arrow::field("f2", arrow::int32())), + DataField(3, arrow::field("f3", arrow::float64()))}; + value_schema_ = DataField::ConvertDataFieldsToArrowSchema(value_fields_); + value_type_ = DataField::ConvertDataFieldsToArrowStructType(value_fields_); + primary_keys_ = {"f0"}; + ASSERT_OK_AND_ASSIGN(key_comparator_, + FieldsComparator::Create({value_fields_[0]}, + /*is_ascending_order=*/true)); + + auto merge_function = std::make_unique<DeduplicateMergeFunction>(/*ignore_delete=*/false); + merge_function_wrapper_ = + std::make_shared<ReducerMergeFunctionWrapper>(std::move(merge_function)); + } + + std::unique_ptr<WriteBuffer> CreateWriteBuffer(int64_t last_sequence_number, + const CoreOptions& options) const { + EXPECT_OK_AND_ASSIGN( + auto write_buffer, + WriteBuffer::Create(last_sequence_number, value_schema_, primary_keys_, + /*user_defined_sequence_fields=*/{}, key_comparator_, + /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, + options, io_manager_, /*enable_multi_thread_spill=*/false, pool_)); + return write_buffer; + } + + std::unique_ptr<RecordBatch> CreateBatch( + const std::shared_ptr<arrow::Array>& array, + const std::vector<RecordBatch::RowKind>& row_kinds) const { + ::ArrowArray c_array; + EXPECT_TRUE(arrow::ExportArray(*array, &c_array).ok()); + RecordBatchBuilder batch_builder(&c_array); + batch_builder.SetRowKinds(row_kinds); + EXPECT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, batch_builder.Finish()); + return batch; + } + + Result<int64_t> GetOnlySpillFileSize() const { + PAIMON_ASSIGN_OR_RAISE(std::string spill_dir, io_manager_->GetSpillDir()); + std::vector<std::unique_ptr<FileStatus>> spill_files; + PAIMON_RETURN_NOT_OK(tmp_dir_->GetFileSystem()->ListFileStatus(spill_dir, &spill_files)); + if (spill_files.size() != 1 || spill_files[0]->IsDir()) { + return Status::Invalid("expected exactly one spill file"); + } + return static_cast<int64_t>(spill_files[0]->GetLen()); + } + + Result<ReaderResult> ReadReaderResult(KeyValueRecordReader* reader) const { + PAIMON_ASSIGN_OR_RAISE(auto iterator, reader->NextBatch()); + + ReaderResult result; + while (true) { + PAIMON_ASSIGN_OR_RAISE(bool has_next, iterator->HasNext()); + if (!has_next) { + break; + } + PAIMON_ASSIGN_OR_RAISE(KeyValue key_value, iterator->Next()); + result.sequence_numbers.push_back(key_value.sequence_number); + result.row_kind_values.push_back(key_value.value_kind->ToByteValue()); + } + return result; + } Review Comment: This helper only reads a single `NextBatch()` iterator. `KeyValueRecordReader` appears to be batch-oriented (see `CollectRows` in `sort_buffer_test.cpp`, which loops `NextBatch()` until it returns `nullptr`), so this test helper can silently miss rows if the reader yields multiple batches. Update it to repeatedly call `NextBatch()` until exhausted, and accumulate results across all iterators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
