wgtmac commented on code in PR #652:
URL: https://github.com/apache/iceberg-cpp/pull/652#discussion_r3231668230


##########
src/iceberg/manifest/manifest_filter_manager.cc:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_filter_manager.h"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/expression/manifest_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+ManifestFilterManager::ManifestFilterManager(ManifestContent content,
+                                             std::shared_ptr<FileIO> file_io)
+    : manifest_content_(content), file_io_(std::move(file_io)) {}
+
+void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression> expr,
+                                              bool case_sensitive) {
+  delete_exprs_.push_back({.expr = std::move(expr), .case_sensitive = 
case_sensitive});
+}
+
+void ManifestFilterManager::DeleteFile(std::string_view path) {
+  std::string p(path);
+  delete_paths_.insert(p);
+  pending_paths_.insert(std::move(p));
+}
+
+void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues 
partition) {
+  drop_partitions_.add(spec_id, std::move(partition));
+}
+
+void ManifestFilterManager::FailMissingDeletePaths() {
+  fail_missing_delete_paths_ = true;
+}
+
+bool ManifestFilterManager::DeletesFiles() const {
+  return !delete_exprs_.empty() || !delete_paths_.empty() || 
!drop_partitions_.empty();
+}
+
+bool ManifestFilterManager::CanContainDroppedFiles() const {
+  return !delete_paths_.empty();
+}
+
+bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& 
manifest) {
+  if (drop_partitions_.empty()) return false;
+  // Only manifests whose partition spec matches a registered drop can contain
+  // entries for that partition.  PartitionKey is pair<spec_id, values>.
+  int32_t spec_id = manifest.partition_spec_id;
+  for (const auto& key : drop_partitions_) {
+    if (key.first == spec_id) return true;
+  }
+  return false;
+}
+
+bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& 
manifest,
+                                                        const TableMetadata& 
metadata) {
+  if (delete_exprs_.empty()) return false;
+  int32_t spec_id = manifest.partition_spec_id;
+  for (const auto& delete_expr : delete_exprs_) {
+    auto* evaluator_ptr =
+        GetManifestEvaluator(metadata, spec_id, delete_expr).value_or(nullptr);
+    if (evaluator_ptr == nullptr) return true;  // conservative on error
+    auto result = evaluator_ptr->Evaluate(manifest);
+    if (!result.has_value() || result.value()) return true;
+  }
+  return false;
+}
+
+bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& 
manifest,
+                                                   const TableMetadata& 
metadata) {
+  // A manifest with no live files cannot contain files to delete.
+  bool has_live = (manifest.added_files_count.value_or(0) > 0) ||
+                  (manifest.existing_files_count.value_or(0) > 0);
+  if (!has_live) return false;
+
+  return CanContainDroppedFiles() || CanContainExpressionDeletes(manifest, 
metadata) ||
+         CanContainDroppedPartitions(manifest);
+}
+
+Result<ManifestEvaluator*> ManifestFilterManager::GetManifestEvaluator(
+    const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
+  auto& vec = manifest_evaluator_cache_[spec_id];
+  size_t idx = &de - delete_exprs_.data();
+  if (idx >= vec.size()) {
+    vec.resize(delete_exprs_.size());
+  }
+  if (!vec[idx]) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+    ICEBERG_ASSIGN_OR_RAISE(vec[idx], ManifestEvaluator::MakeRowFilter(
+                                          de.expr, spec, *schema, 
de.case_sensitive));
+  }
+  return vec[idx].get();
+}
+
+Result<InclusiveMetricsEvaluator*> ManifestFilterManager::GetMetricsEvaluator(
+    const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
+  auto& vec = metrics_evaluator_cache_[spec_id];
+  size_t idx = &de - delete_exprs_.data();
+  if (idx >= vec.size()) {
+    vec.resize(delete_exprs_.size());
+  }
+  if (!vec[idx]) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+    ICEBERG_ASSIGN_OR_RAISE(
+        vec[idx], InclusiveMetricsEvaluator::Make(de.expr, *schema, 
de.case_sensitive));
+  }
+  return vec[idx].get();
+}
+
+Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
+                                                 const TableMetadata& metadata,
+                                                 int32_t manifest_spec_id) {
+  if (!entry.data_file) return false;
+  const DataFile& file = *entry.data_file;
+
+  // Path-based check
+  if (delete_paths_.count(file.file_path)) {
+    pending_paths_.erase(file.file_path);
+    return true;
+  }
+
+  // Partition-drop check
+  int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id);
+  if (drop_partitions_.contains(spec_id, file.partition)) {
+    return true;
+  }
+
+  // Expression-based check (inclusive metrics)
+  for (const auto& de : delete_exprs_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto* eval, GetMetricsEvaluator(metadata, spec_id, 
de));
+    ICEBERG_ASSIGN_OR_RAISE(auto matches, eval->Evaluate(file));
+    if (matches) return true;

Review Comment:
   This row-filter delete path is broader than Java's behavior. Java's 
ManifestFilterManager first computes a partition residual and then uses both 
inclusive and strict metrics: files are deleted only when rowsMustMatch is 
true, and data files where only some rows may match raise `Cannot delete file 
where some, but not all, rows match filter`. Here, an inclusive `might match` 
result is enough to mark the whole file as DELETED, so a file whose bounds 
straddle the predicate would be incorrectly removed instead of rejected.



##########
src/iceberg/manifest/manifest_filter_manager.cc:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_filter_manager.h"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/expression/manifest_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+ManifestFilterManager::ManifestFilterManager(ManifestContent content,
+                                             std::shared_ptr<FileIO> file_io)
+    : manifest_content_(content), file_io_(std::move(file_io)) {}
+
+void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression> expr,
+                                              bool case_sensitive) {
+  delete_exprs_.push_back({.expr = std::move(expr), .case_sensitive = 
case_sensitive});
+}
+
+void ManifestFilterManager::DeleteFile(std::string_view path) {
+  std::string p(path);
+  delete_paths_.insert(p);
+  pending_paths_.insert(std::move(p));
+}
+
+void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues 
partition) {
+  drop_partitions_.add(spec_id, std::move(partition));
+}
+
+void ManifestFilterManager::FailMissingDeletePaths() {
+  fail_missing_delete_paths_ = true;
+}
+
+bool ManifestFilterManager::DeletesFiles() const {
+  return !delete_exprs_.empty() || !delete_paths_.empty() || 
!drop_partitions_.empty();
+}
+
+bool ManifestFilterManager::CanContainDroppedFiles() const {
+  return !delete_paths_.empty();
+}
+
+bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& 
manifest) {
+  if (drop_partitions_.empty()) return false;
+  // Only manifests whose partition spec matches a registered drop can contain
+  // entries for that partition.  PartitionKey is pair<spec_id, values>.
+  int32_t spec_id = manifest.partition_spec_id;
+  for (const auto& key : drop_partitions_) {
+    if (key.first == spec_id) return true;
+  }
+  return false;
+}
+
+bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& 
manifest,
+                                                        const TableMetadata& 
metadata) {
+  if (delete_exprs_.empty()) return false;
+  int32_t spec_id = manifest.partition_spec_id;
+  for (const auto& delete_expr : delete_exprs_) {
+    auto* evaluator_ptr =
+        GetManifestEvaluator(metadata, spec_id, delete_expr).value_or(nullptr);
+    if (evaluator_ptr == nullptr) return true;  // conservative on error
+    auto result = evaluator_ptr->Evaluate(manifest);
+    if (!result.has_value() || result.value()) return true;
+  }
+  return false;
+}
+
+bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& 
manifest,
+                                                   const TableMetadata& 
metadata) {
+  // A manifest with no live files cannot contain files to delete.
+  bool has_live = (manifest.added_files_count.value_or(0) > 0) ||

Review Comment:
   This treats missing manifest counts as zero live files. In Iceberg/Java, 
null `addedFilesCount` or `existingFilesCount` means the count is unknown and 
`ManifestFile.hasAddedFiles/hasExistingFiles` returns true. Skipping such 
manifests can miss deletes for older or compatible metadata where counts are 
not populated.



##########
src/iceberg/manifest/manifest_filter_manager.cc:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_filter_manager.h"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/expression/manifest_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+ManifestFilterManager::ManifestFilterManager(ManifestContent content,
+                                             std::shared_ptr<FileIO> file_io)
+    : manifest_content_(content), file_io_(std::move(file_io)) {}
+
+void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression> expr,
+                                              bool case_sensitive) {
+  delete_exprs_.push_back({.expr = std::move(expr), .case_sensitive = 
case_sensitive});
+}
+
+void ManifestFilterManager::DeleteFile(std::string_view path) {
+  std::string p(path);
+  delete_paths_.insert(p);
+  pending_paths_.insert(std::move(p));
+}
+
+void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues 
partition) {
+  drop_partitions_.add(spec_id, std::move(partition));
+}
+
+void ManifestFilterManager::FailMissingDeletePaths() {
+  fail_missing_delete_paths_ = true;
+}
+
+bool ManifestFilterManager::DeletesFiles() const {
+  return !delete_exprs_.empty() || !delete_paths_.empty() || 
!drop_partitions_.empty();
+}
+
+bool ManifestFilterManager::CanContainDroppedFiles() const {
+  return !delete_paths_.empty();
+}
+
+bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& 
manifest) {
+  if (drop_partitions_.empty()) return false;
+  // Only manifests whose partition spec matches a registered drop can contain
+  // entries for that partition.  PartitionKey is pair<spec_id, values>.
+  int32_t spec_id = manifest.partition_spec_id;
+  for (const auto& key : drop_partitions_) {
+    if (key.first == spec_id) return true;
+  }
+  return false;
+}
+
+bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& 
manifest,
+                                                        const TableMetadata& 
metadata) {
+  if (delete_exprs_.empty()) return false;
+  int32_t spec_id = manifest.partition_spec_id;
+  for (const auto& delete_expr : delete_exprs_) {
+    auto* evaluator_ptr =
+        GetManifestEvaluator(metadata, spec_id, delete_expr).value_or(nullptr);
+    if (evaluator_ptr == nullptr) return true;  // conservative on error
+    auto result = evaluator_ptr->Evaluate(manifest);
+    if (!result.has_value() || result.value()) return true;
+  }
+  return false;
+}
+
+bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& 
manifest,
+                                                   const TableMetadata& 
metadata) {
+  // A manifest with no live files cannot contain files to delete.
+  bool has_live = (manifest.added_files_count.value_or(0) > 0) ||
+                  (manifest.existing_files_count.value_or(0) > 0);
+  if (!has_live) return false;
+
+  return CanContainDroppedFiles() || CanContainExpressionDeletes(manifest, 
metadata) ||
+         CanContainDroppedPartitions(manifest);
+}
+
+Result<ManifestEvaluator*> ManifestFilterManager::GetManifestEvaluator(
+    const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
+  auto& vec = manifest_evaluator_cache_[spec_id];
+  size_t idx = &de - delete_exprs_.data();
+  if (idx >= vec.size()) {
+    vec.resize(delete_exprs_.size());
+  }
+  if (!vec[idx]) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+    ICEBERG_ASSIGN_OR_RAISE(vec[idx], ManifestEvaluator::MakeRowFilter(
+                                          de.expr, spec, *schema, 
de.case_sensitive));
+  }
+  return vec[idx].get();
+}
+
+Result<InclusiveMetricsEvaluator*> ManifestFilterManager::GetMetricsEvaluator(
+    const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
+  auto& vec = metrics_evaluator_cache_[spec_id];
+  size_t idx = &de - delete_exprs_.data();
+  if (idx >= vec.size()) {
+    vec.resize(delete_exprs_.size());
+  }
+  if (!vec[idx]) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+    ICEBERG_ASSIGN_OR_RAISE(
+        vec[idx], InclusiveMetricsEvaluator::Make(de.expr, *schema, 
de.case_sensitive));
+  }
+  return vec[idx].get();
+}
+
+Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
+                                                 const TableMetadata& metadata,
+                                                 int32_t manifest_spec_id) {
+  if (!entry.data_file) return false;
+  const DataFile& file = *entry.data_file;
+
+  // Path-based check
+  if (delete_paths_.count(file.file_path)) {
+    pending_paths_.erase(file.file_path);
+    return true;
+  }
+
+  // Partition-drop check
+  int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id);
+  if (drop_partitions_.contains(spec_id, file.partition)) {
+    return true;
+  }
+
+  // Expression-based check (inclusive metrics)
+  for (const auto& de : delete_exprs_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto* eval, GetMetricsEvaluator(metadata, spec_id, 
de));
+    ICEBERG_ASSIGN_OR_RAISE(auto matches, eval->Evaluate(file));
+    if (matches) return true;
+  }
+
+  return false;
+}
+
+Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
+    const TableMetadata& metadata, const std::shared_ptr<Snapshot>& 
base_snapshot,
+    const ManifestWriterFactory& writer_factory) {
+  // No base snapshot → nothing to filter
+  if (!base_snapshot) return std::vector<ManifestFile>{};

Review Comment:
   Java still validates required deletes when the manifest list is null or 
empty: `filterManifests` calls `validateRequiredDeletes()` before returning. 
With `FailMissingDeletePaths()` set, this early return silently succeeds for a 
missing path on an empty/no-base snapshot, while Java reports `Missing required 
files to delete`.



##########
src/iceberg/manifest/manifest_merge_manager.cc:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_merge_manager.h"
+
+#include <algorithm>
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes,
+                                           int32_t min_count_to_merge, bool 
merge_enabled)
+    : target_size_bytes_(target_size_bytes),
+      min_count_to_merge_(min_count_to_merge),
+      merge_enabled_(merge_enabled) {}
+
+Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests(
+    const std::vector<ManifestFile>& existing_manifests,
+    const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id,
+    const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
+    const ManifestWriterFactory& writer_factory) {
+  // Combine new then existing (new-first ordering is preserved in output)
+  std::vector<ManifestFile> all;
+  all.reserve(new_manifests.size() + existing_manifests.size());
+  all.insert(all.end(), new_manifests.begin(), new_manifests.end());
+  all.insert(all.end(), existing_manifests.begin(), existing_manifests.end());
+
+  if (!merge_enabled_ || std::cmp_less(all.size(), min_count_to_merge_)) {

Review Comment:
   This global `min_count_to_merge_` check does not match Java. Java only 
applies `minCountToMerge` to the bin containing the first/newest manifest; 
older bins that do not contain `first` can still be merged even when the total 
input count is below the threshold. Returning all manifests here can skip 
merges Java would perform.



##########
src/iceberg/manifest/manifest_merge_manager.cc:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_merge_manager.h"
+
+#include <algorithm>
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes,
+                                           int32_t min_count_to_merge, bool 
merge_enabled)
+    : target_size_bytes_(target_size_bytes),
+      min_count_to_merge_(min_count_to_merge),
+      merge_enabled_(merge_enabled) {}
+
+Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests(
+    const std::vector<ManifestFile>& existing_manifests,
+    const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id,
+    const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
+    const ManifestWriterFactory& writer_factory) {
+  // Combine new then existing (new-first ordering is preserved in output)
+  std::vector<ManifestFile> all;
+  all.reserve(new_manifests.size() + existing_manifests.size());
+  all.insert(all.end(), new_manifests.begin(), new_manifests.end());
+  all.insert(all.end(), existing_manifests.begin(), existing_manifests.end());
+
+  if (!merge_enabled_ || std::cmp_less(all.size(), min_count_to_merge_)) {
+    return all;
+  }
+
+  // The first (newest) manifest governs the per-bin minCountToMerge check.
+  const ManifestFile& first = all[0];
+
+  // Group manifests by partition_spec_id — never merge across specs
+  std::map<int32_t, std::vector<ManifestFile>> by_spec;

Review Comment:
   Java groups manifests by partition spec using a reverse-order TreeMap. This 
`std::map` iterates specs in ascending order, so the merged manifest list order 
can differ from Java. For v3 tables this ordering is observable because 
manifest-list writing assigns first-row IDs in output order.



##########
src/iceberg/manifest/manifest_merge_manager.cc:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_merge_manager.h"
+
+#include <algorithm>
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes,
+                                           int32_t min_count_to_merge, bool 
merge_enabled)
+    : target_size_bytes_(target_size_bytes),
+      min_count_to_merge_(min_count_to_merge),
+      merge_enabled_(merge_enabled) {}
+
+Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests(
+    const std::vector<ManifestFile>& existing_manifests,
+    const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id,
+    const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
+    const ManifestWriterFactory& writer_factory) {
+  // Combine new then existing (new-first ordering is preserved in output)
+  std::vector<ManifestFile> all;
+  all.reserve(new_manifests.size() + existing_manifests.size());
+  all.insert(all.end(), new_manifests.begin(), new_manifests.end());
+  all.insert(all.end(), existing_manifests.begin(), existing_manifests.end());
+
+  if (!merge_enabled_ || std::cmp_less(all.size(), min_count_to_merge_)) {
+    return all;
+  }
+
+  // The first (newest) manifest governs the per-bin minCountToMerge check.
+  const ManifestFile& first = all[0];
+
+  // Group manifests by partition_spec_id — never merge across specs
+  std::map<int32_t, std::vector<ManifestFile>> by_spec;
+  for (const auto& m : all) {
+    by_spec[m.partition_spec_id].push_back(m);
+  }
+
+  std::vector<ManifestFile> result;
+  result.reserve(all.size());
+  for (auto& [spec_id, group] : by_spec) {
+    ICEBERG_ASSIGN_OR_RAISE(auto merged, MergeGroup(group, first, snapshot_id, 
metadata,
+                                                    file_io, writer_factory));
+    result.insert(result.end(), std::make_move_iterator(merged.begin()),
+                  std::make_move_iterator(merged.end()));
+  }
+  return result;
+}
+
+Result<std::vector<ManifestFile>> ManifestMergeManager::MergeGroup(
+    const std::vector<ManifestFile>& group, const ManifestFile& first,
+    int64_t snapshot_id, const TableMetadata& metadata, 
std::shared_ptr<FileIO> file_io,
+    const ManifestWriterFactory& writer_factory) {
+  // Collect bins using greedy first-fit packing.
+  std::vector<std::vector<ManifestFile>> bins;
+  std::vector<ManifestFile> current_bin;
+  int64_t bin_size = 0;
+
+  std::vector<ManifestFile> result;
+
+  for (const auto& manifest : group) {
+    if (manifest.manifest_length >= target_size_bytes_) {

Review Comment:
   Java uses `ListPacker.packEnd(group, ManifestFile::length)` with lookback 1 
specifically to preserve manifest order and leave the under-filled bin at the 
beginning. This forward hand-rolled packing keeps the current small bin open 
across an oversized manifest, so a sequence like [small, oversized, small] can 
merge the two small manifests across the oversized one and reorder output 
relative to Java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to