gty404 commented on code in PR #652: URL: https://github.com/apache/iceberg-cpp/pull/652#discussion_r3238856761
########## src/iceberg/manifest/manifest_merge_manager.cc: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_merge_manager.h" + +#include <algorithm> +#include <map> +#include <utility> +#include <vector> + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes, + int32_t min_count_to_merge, bool merge_enabled) + : target_size_bytes_(target_size_bytes), + min_count_to_merge_(min_count_to_merge), + merge_enabled_(merge_enabled) {} + +Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests( + const std::vector<ManifestFile>& existing_manifests, + const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr<FileIO> file_io, + const ManifestWriterFactory& writer_factory) { + // Combine new then existing (new-first ordering is preserved in output) + std::vector<ManifestFile> all; + all.reserve(new_manifests.size() + existing_manifests.size()); + all.insert(all.end(), new_manifests.begin(), new_manifests.end()); + all.insert(all.end(), existing_manifests.begin(), existing_manifests.end()); + + if (!merge_enabled_ || std::cmp_less(all.size(), min_count_to_merge_)) { + return all; + } + + // The first (newest) manifest governs the per-bin minCountToMerge check. + const ManifestFile& first = all[0]; + + // Group manifests by partition_spec_id — never merge across specs + std::map<int32_t, std::vector<ManifestFile>> by_spec; + for (const auto& m : all) { + by_spec[m.partition_spec_id].push_back(m); + } + + std::vector<ManifestFile> result; + result.reserve(all.size()); + for (auto& [spec_id, group] : by_spec) { + ICEBERG_ASSIGN_OR_RAISE(auto merged, MergeGroup(group, first, snapshot_id, metadata, + file_io, writer_factory)); + result.insert(result.end(), std::make_move_iterator(merged.begin()), + std::make_move_iterator(merged.end())); + } + return result; +} + +Result<std::vector<ManifestFile>> ManifestMergeManager::MergeGroup( + const std::vector<ManifestFile>& group, const ManifestFile& first, + int64_t snapshot_id, const TableMetadata& metadata, std::shared_ptr<FileIO> file_io, + const ManifestWriterFactory& writer_factory) { + // Collect bins using greedy first-fit packing. + std::vector<std::vector<ManifestFile>> bins; + std::vector<ManifestFile> current_bin; + int64_t bin_size = 0; + + std::vector<ManifestFile> result; + + for (const auto& manifest : group) { + if (manifest.manifest_length >= target_size_bytes_) { Review Comment: Fixed. MergeGroup now mirrors ListPacker.packEnd(group, ManifestFile::length) with lookback=1 exactly ########## src/iceberg/manifest/manifest_merge_manager.cc: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_merge_manager.h" + +#include <algorithm> +#include <map> +#include <utility> +#include <vector> + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes, + int32_t min_count_to_merge, bool merge_enabled) + : target_size_bytes_(target_size_bytes), + min_count_to_merge_(min_count_to_merge), + merge_enabled_(merge_enabled) {} + +Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests( + const std::vector<ManifestFile>& existing_manifests, + const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr<FileIO> file_io, + const ManifestWriterFactory& writer_factory) { + // Combine new then existing (new-first ordering is preserved in output) + std::vector<ManifestFile> all; + all.reserve(new_manifests.size() + existing_manifests.size()); + all.insert(all.end(), new_manifests.begin(), new_manifests.end()); + all.insert(all.end(), existing_manifests.begin(), existing_manifests.end()); + + if (!merge_enabled_ || std::cmp_less(all.size(), min_count_to_merge_)) { + return all; + } + + // The first (newest) manifest governs the per-bin minCountToMerge check. + const ManifestFile& first = all[0]; + + // Group manifests by partition_spec_id — never merge across specs + std::map<int32_t, std::vector<ManifestFile>> by_spec; Review Comment: The grouping key was already changed to (partition_spec_id, content) in the review-fix commit, so data and delete manifests with the same spec can never land in the same bin. The writer factory call also uses first.content to ensure the output manifest has the correct content type. ########## src/iceberg/manifest/manifest_merge_manager.h: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_merge_manager.h +/// Merges small manifests into fewer larger ones according to table properties. + +#include <cstdint> +#include <memory> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_filter_manager.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Merges small manifests into larger ones using greedy bin-packing. +/// +/// Manifests are grouped by partition_spec_id before merging; manifests with +/// different spec IDs are never merged together. Within a group, manifests are +/// accumulated into bins until a bin would exceed target_size_bytes, at which +/// point the bin is flushed (written) and a new one started. Manifests already +/// larger than target_size_bytes pass through unchanged. +/// +/// \note This class is non-copyable and non-movable. +class ICEBERG_EXPORT ManifestMergeManager { + public: + /// \brief Construct a merge manager with the given configuration. + /// + /// \param target_size_bytes Target output manifest size in bytes + /// \param min_count_to_merge Minimum number of manifests before any merging occurs + /// \param merge_enabled Whether merging is enabled at all + ManifestMergeManager(int64_t target_size_bytes, int32_t min_count_to_merge, + bool merge_enabled); + + ManifestMergeManager(const ManifestMergeManager&) = delete; + ManifestMergeManager& operator=(const ManifestMergeManager&) = delete; + + /// \brief Merge existing and new manifests according to configured thresholds. + /// + /// \param existing_manifests Manifests already in the base snapshot + /// \param new_manifests Newly written manifests to incorporate + /// \param snapshot_id The ID of the snapshot being committed. Used to preserve + /// ADDED/DELETED status for entries written by this snapshot and to suppress + /// stale DELETED tombstones from prior snapshots. + /// \param metadata Table metadata (provides specs and schema for readers) + /// \param file_io File IO used to open existing manifests for reading + /// \param writer_factory Factory to create new ManifestWriter instances + /// \return The merged manifest list, or an error + Result<std::vector<ManifestFile>> MergeManifests( Review Comment: These lifecycle methods are deferred to PR2 (MergingSnapshotUpdate). The header \note at line 67–70 documents this explicitly: "Java's ManifestMergeManager additionally exposes replacedManifestsCount() and cleanUncommitted(committed) for retry / rollback support. In this C++ implementation those responsibilities are handled by the caller (MergingSnapshotUpdate in PR2) via a tracked ManifestWriterFactory wrapper." The filter-side equivalent (cleanUncommitted for rewritten manifests) will live in the same PR2 caller layer. ########## src/iceberg/manifest/manifest_filter_manager.h: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_filter_manager.h +/// Filters an existing snapshot's manifest list, marking data files as DELETED +/// or EXISTING based on row-filter expressions, exact path deletes, and partition drops. + +#include <functional> +#include <memory> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/partition_value_util.h" + +namespace iceberg { + +/// \brief Factory type for creating ManifestWriter instances during filtering/merging. +/// +/// The factory receives the partition spec ID (to look up the spec) and the manifest +/// content type, and returns a new ManifestWriter ready for writing. The caller +/// (i.e. MergingSnapshotUpdate in PR2) captures metadata, FileIO, and snapshot ID +/// inside the lambda. +using ManifestWriterFactory = std::function<Result<std::unique_ptr<ManifestWriter>>( + int32_t spec_id, ManifestContent content)>; + +/// \brief Filters an existing snapshot's manifest list. +/// +/// The manager accumulates delete conditions incrementally, then applies them all +/// at once in a single FilterManifests() call. Manifests that contain no deleted +/// entries are returned unchanged (no I/O). Manifests that do contain deleted +/// entries are rewritten with those entries marked DELETED. +/// +/// The manager is content-agnostic: pass ManifestContent::kData to process data +/// manifests, or ManifestContent::kDeletes to process delete manifests. Review Comment: This PR's ManifestFilterManager only handles path/partition/row-filter deletes. Java's dropDeleteFilesOlderThan(minDataSequenceNumber) and removeDanglingDeletesFor(filesToBeDeleted) / DV cleanup semantics are intentionally out of scope here. these delete-file-specific v2 semantics (minSequenceNumber cleanup, dangling DV detection) are deferred to (RowDelta), which is where BaseRowDelta.java's delete file management lives in Java. We can strengthen the note to name PR5 explicitly if that helps track it. ########## src/iceberg/manifest/manifest_filter_manager.h: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_filter_manager.h +/// Filters an existing snapshot's manifest list, marking data files as DELETED +/// or EXISTING based on row-filter expressions, exact path deletes, and partition drops. + +#include <functional> +#include <memory> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/partition_value_util.h" + +namespace iceberg { + +/// \brief Factory type for creating ManifestWriter instances during filtering/merging. +/// +/// The factory receives the partition spec ID (to look up the spec) and the manifest +/// content type, and returns a new ManifestWriter ready for writing. The caller +/// (i.e. MergingSnapshotUpdate in PR2) captures metadata, FileIO, and snapshot ID Review Comment: I broke down the support for MergingSnapshotUpdate into multiple PRs. This is the first PR, and the second is the support for MergingSnapshotUpdate. Next are the commit interfaces, such as overwrite, append, and replace partitions. I will delete this comment when I submit PR2. ########## src/iceberg/manifest/manifest_merge_manager.h: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_merge_manager.h +/// Merges small manifests into fewer larger ones according to table properties. + +#include <cstdint> +#include <memory> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_filter_manager.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Merges small manifests into larger ones using greedy bin-packing. +/// +/// Manifests are grouped by partition_spec_id before merging; manifests with +/// different spec IDs are never merged together. Within a group, manifests are +/// accumulated into bins until a bin would exceed target_size_bytes, at which +/// point the bin is flushed (written) and a new one started. Manifests already +/// larger than target_size_bytes pass through unchanged. +/// +/// \note This class is non-copyable and non-movable. +class ICEBERG_EXPORT ManifestMergeManager { + public: + /// \brief Construct a merge manager with the given configuration. + /// + /// \param target_size_bytes Target output manifest size in bytes + /// \param min_count_to_merge Minimum number of manifests before any merging occurs + /// \param merge_enabled Whether merging is enabled at all + ManifestMergeManager(int64_t target_size_bytes, int32_t min_count_to_merge, + bool merge_enabled); + + ManifestMergeManager(const ManifestMergeManager&) = delete; + ManifestMergeManager& operator=(const ManifestMergeManager&) = delete; + + /// \brief Merge existing and new manifests according to configured thresholds. + /// + /// \param existing_manifests Manifests already in the base snapshot + /// \param new_manifests Newly written manifests to incorporate + /// \param snapshot_id The ID of the snapshot being committed. Used to preserve + /// ADDED/DELETED status for entries written by this snapshot and to suppress + /// stale DELETED tombstones from prior snapshots. + /// \param metadata Table metadata (provides specs and schema for readers) + /// \param file_io File IO used to open existing manifests for reading + /// \param writer_factory Factory to create new ManifestWriter instances + /// \return The merged manifest list, or an error + Result<std::vector<ManifestFile>> MergeManifests( + const std::vector<ManifestFile>& existing_manifests, + const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr<FileIO> file_io, + const ManifestWriterFactory& writer_factory); + + private: + /// \brief Merge a group of manifests sharing the same spec_id. + /// + /// \param first The overall first (newest) manifest across all groups, used to + /// apply the minCountToMerge threshold on the bin that contains it. Review Comment: Fixed. Changed to min_count_to_merge to match the documented constructor parameter name. ########## src/iceberg/manifest/manifest_merge_manager.h: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_merge_manager.h +/// Merges small manifests into fewer larger ones according to table properties. + +#include <cstdint> +#include <memory> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_filter_manager.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Merges small manifests into larger ones using greedy bin-packing. +/// +/// Manifests are grouped by partition_spec_id before merging; manifests with +/// different spec IDs are never merged together. Within a group, manifests are +/// accumulated into bins until a bin would exceed target_size_bytes, at which +/// point the bin is flushed (written) and a new one started. Manifests already +/// larger than target_size_bytes pass through unchanged. +/// +/// \note This class is non-copyable and non-movable. +class ICEBERG_EXPORT ManifestMergeManager { + public: + /// \brief Construct a merge manager with the given configuration. + /// + /// \param target_size_bytes Target output manifest size in bytes + /// \param min_count_to_merge Minimum number of manifests before any merging occurs + /// \param merge_enabled Whether merging is enabled at all + ManifestMergeManager(int64_t target_size_bytes, int32_t min_count_to_merge, + bool merge_enabled); + + ManifestMergeManager(const ManifestMergeManager&) = delete; + ManifestMergeManager& operator=(const ManifestMergeManager&) = delete; + + /// \brief Merge existing and new manifests according to configured thresholds. + /// + /// \param existing_manifests Manifests already in the base snapshot + /// \param new_manifests Newly written manifests to incorporate + /// \param snapshot_id The ID of the snapshot being committed. Used to preserve + /// ADDED/DELETED status for entries written by this snapshot and to suppress + /// stale DELETED tombstones from prior snapshots. + /// \param metadata Table metadata (provides specs and schema for readers) + /// \param file_io File IO used to open existing manifests for reading + /// \param writer_factory Factory to create new ManifestWriter instances + /// \return The merged manifest list, or an error + Result<std::vector<ManifestFile>> MergeManifests( + const std::vector<ManifestFile>& existing_manifests, + const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id, + const TableMetadata& metadata, std::shared_ptr<FileIO> file_io, + const ManifestWriterFactory& writer_factory); + + private: + /// \brief Merge a group of manifests sharing the same spec_id. + /// + /// \param first The overall first (newest) manifest across all groups, used to + /// apply the minCountToMerge threshold on the bin that contains it. + Result<std::vector<ManifestFile>> MergeGroup( + const std::vector<ManifestFile>& group, const ManifestFile& first, + int64_t snapshot_id, const TableMetadata& metadata, std::shared_ptr<FileIO> file_io, + const ManifestWriterFactory& writer_factory); + + /// \brief Write a merged manifest from all manifests in a bin. + /// + /// Entries are written snapshot-aware: + /// - ADDED from \p snapshot_id → WriteAddedEntry (preserve status) Review Comment: Fixed. Removed the \p markers from the bullet-list body (they only make sense in \param declarations). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
