wgtmac commented on code in PR #652: URL: https://github.com/apache/iceberg-cpp/pull/652#discussion_r3231851004
########## src/iceberg/manifest/manifest_merge_manager.cc: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_merge_manager.h" + +#include <algorithm> +#include <map> +#include <utility> +#include <vector> + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes, + int32_t min_count_to_merge, bool merge_enabled) + : target_size_bytes_(target_size_bytes), + min_count_to_merge_(min_count_to_merge), + merge_enabled_(merge_enabled) {} + +Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests( + const std::vector<ManifestFile>& existing_manifests, + const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr<FileIO> file_io, + const ManifestWriterFactory& writer_factory) { + // Combine new then existing (new-first ordering is preserved in output) + std::vector<ManifestFile> all; + all.reserve(new_manifests.size() + existing_manifests.size()); + all.insert(all.end(), new_manifests.begin(), new_manifests.end()); + all.insert(all.end(), existing_manifests.begin(), existing_manifests.end()); + + if (!merge_enabled_ || std::cmp_less(all.size(), min_count_to_merge_)) { + return all; + } + + // The first (newest) manifest governs the per-bin minCountToMerge check. + const ManifestFile& first = all[0]; Review Comment: Please keep the Java empty-input behavior here before indexing into `all`. Java `ManifestMergeManager.mergeManifests` returns the input immediately when the iterator is empty, and `commit.manifest.min-count-to-merge=0` is a valid/aggressively-used setting in Java tests. With this implementation, an empty data/delete manifest input plus `min_count_to_merge_ <= 0` gets past the size check and reads `all[0]`, which is UB/crash instead of returning an empty manifest list. ########## src/iceberg/manifest/manifest_filter_manager.cc: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_filter_manager.h" + +#include <string> +#include <unordered_set> +#include <vector> + +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +ManifestFilterManager::ManifestFilterManager(ManifestContent content, + std::shared_ptr<FileIO> file_io) + : manifest_content_(content), file_io_(std::move(file_io)) {} + +void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression> expr, + bool case_sensitive) { + delete_exprs_.push_back({.expr = std::move(expr), .case_sensitive = case_sensitive}); +} + +void ManifestFilterManager::DeleteFile(std::string_view path) { + std::string p(path); + delete_paths_.insert(p); + pending_paths_.insert(std::move(p)); +} + +void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues partition) { + drop_partitions_.add(spec_id, std::move(partition)); +} + +void ManifestFilterManager::FailMissingDeletePaths() { + fail_missing_delete_paths_ = true; +} + +bool ManifestFilterManager::DeletesFiles() const { + return !delete_exprs_.empty() || !delete_paths_.empty() || !drop_partitions_.empty(); +} + +bool ManifestFilterManager::CanContainDroppedFiles() const { + return !delete_paths_.empty(); +} + +bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& manifest) { + if (drop_partitions_.empty()) return false; + // Only manifests whose partition spec matches a registered drop can contain + // entries for that partition. PartitionKey is pair<spec_id, values>. + int32_t spec_id = manifest.partition_spec_id; + for (const auto& key : drop_partitions_) { + if (key.first == spec_id) return true; + } + return false; +} + +bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& manifest, + const TableMetadata& metadata) { + if (delete_exprs_.empty()) return false; + int32_t spec_id = manifest.partition_spec_id; + for (const auto& delete_expr : delete_exprs_) { + auto* evaluator_ptr = + GetManifestEvaluator(metadata, spec_id, delete_expr).value_or(nullptr); + if (evaluator_ptr == nullptr) return true; // conservative on error + auto result = evaluator_ptr->Evaluate(manifest); + if (!result.has_value() || result.value()) return true; + } + return false; +} + +bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& manifest, + const TableMetadata& metadata) { + // A manifest with no live files cannot contain files to delete. + bool has_live = (manifest.added_files_count.value_or(0) > 0) || + (manifest.existing_files_count.value_or(0) > 0); + if (!has_live) return false; + + return CanContainDroppedFiles() || CanContainExpressionDeletes(manifest, metadata) || + CanContainDroppedPartitions(manifest); +} + +Result<ManifestEvaluator*> ManifestFilterManager::GetManifestEvaluator( + const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) { + auto& vec = manifest_evaluator_cache_[spec_id]; + size_t idx = &de - delete_exprs_.data(); + if (idx >= vec.size()) { + vec.resize(delete_exprs_.size()); + } + if (!vec[idx]) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(vec[idx], ManifestEvaluator::MakeRowFilter( + de.expr, spec, *schema, de.case_sensitive)); + } + return vec[idx].get(); +} + +Result<InclusiveMetricsEvaluator*> ManifestFilterManager::GetMetricsEvaluator( + const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) { + auto& vec = metrics_evaluator_cache_[spec_id]; + size_t idx = &de - delete_exprs_.data(); + if (idx >= vec.size()) { + vec.resize(delete_exprs_.size()); + } + if (!vec[idx]) { + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE( + vec[idx], InclusiveMetricsEvaluator::Make(de.expr, *schema, de.case_sensitive)); + } + return vec[idx].get(); +} + +Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry, + const TableMetadata& metadata, + int32_t manifest_spec_id) { + if (!entry.data_file) return false; + const DataFile& file = *entry.data_file; + + // Path-based check + if (delete_paths_.count(file.file_path)) { + pending_paths_.erase(file.file_path); Review Comment: This consumes the required-delete tracking while filtering, which makes `FailMissingDeletePaths` non-idempotent across retries. Java keeps `deletePaths` immutable and `validateRequiredDeletes` recomputes the deleted file set from the filtered manifests on every call, so a retry against a changed base can still fail if the required path is no longer present. Here, once the first filtering pass erases the path from `pending_paths_`, a later `FilterManifests` call on the same manager can miss the validation failure because the path stays consumed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
