wgtmac commented on code in PR #652:
URL: https://github.com/apache/iceberg-cpp/pull/652#discussion_r3231852664


##########
src/iceberg/manifest/manifest_filter_manager.cc:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_filter_manager.h"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/expression/manifest_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+ManifestFilterManager::ManifestFilterManager(ManifestContent content,
+                                             std::shared_ptr<FileIO> file_io)
+    : manifest_content_(content), file_io_(std::move(file_io)) {}
+
+void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression> expr,
+                                              bool case_sensitive) {
+  delete_exprs_.push_back({.expr = std::move(expr), .case_sensitive = 
case_sensitive});
+}
+
+void ManifestFilterManager::DeleteFile(std::string_view path) {
+  std::string p(path);
+  delete_paths_.insert(p);
+  pending_paths_.insert(std::move(p));
+}
+
+void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues 
partition) {
+  drop_partitions_.add(spec_id, std::move(partition));
+}
+
+void ManifestFilterManager::FailMissingDeletePaths() {
+  fail_missing_delete_paths_ = true;
+}
+
+bool ManifestFilterManager::DeletesFiles() const {
+  return !delete_exprs_.empty() || !delete_paths_.empty() || 
!drop_partitions_.empty();
+}
+
+bool ManifestFilterManager::CanContainDroppedFiles() const {
+  return !delete_paths_.empty();
+}
+
+bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& 
manifest) {
+  if (drop_partitions_.empty()) return false;
+  // Only manifests whose partition spec matches a registered drop can contain
+  // entries for that partition.  PartitionKey is pair<spec_id, values>.
+  int32_t spec_id = manifest.partition_spec_id;
+  for (const auto& key : drop_partitions_) {
+    if (key.first == spec_id) return true;
+  }
+  return false;
+}
+
+bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& 
manifest,
+                                                        const TableMetadata& 
metadata) {
+  if (delete_exprs_.empty()) return false;
+  int32_t spec_id = manifest.partition_spec_id;
+  for (const auto& delete_expr : delete_exprs_) {
+    auto* evaluator_ptr =
+        GetManifestEvaluator(metadata, spec_id, delete_expr).value_or(nullptr);
+    if (evaluator_ptr == nullptr) return true;  // conservative on error
+    auto result = evaluator_ptr->Evaluate(manifest);
+    if (!result.has_value() || result.value()) return true;
+  }
+  return false;
+}
+
+bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& 
manifest,
+                                                   const TableMetadata& 
metadata) {
+  // A manifest with no live files cannot contain files to delete.
+  bool has_live = (manifest.added_files_count.value_or(0) > 0) ||
+                  (manifest.existing_files_count.value_or(0) > 0);
+  if (!has_live) return false;
+
+  return CanContainDroppedFiles() || CanContainExpressionDeletes(manifest, 
metadata) ||
+         CanContainDroppedPartitions(manifest);
+}
+
+Result<ManifestEvaluator*> ManifestFilterManager::GetManifestEvaluator(
+    const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
+  auto& vec = manifest_evaluator_cache_[spec_id];
+  size_t idx = &de - delete_exprs_.data();
+  if (idx >= vec.size()) {
+    vec.resize(delete_exprs_.size());
+  }
+  if (!vec[idx]) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+    ICEBERG_ASSIGN_OR_RAISE(vec[idx], ManifestEvaluator::MakeRowFilter(
+                                          de.expr, spec, *schema, 
de.case_sensitive));
+  }
+  return vec[idx].get();
+}
+
+Result<InclusiveMetricsEvaluator*> ManifestFilterManager::GetMetricsEvaluator(
+    const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
+  auto& vec = metrics_evaluator_cache_[spec_id];
+  size_t idx = &de - delete_exprs_.data();
+  if (idx >= vec.size()) {
+    vec.resize(delete_exprs_.size());
+  }
+  if (!vec[idx]) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+    ICEBERG_ASSIGN_OR_RAISE(
+        vec[idx], InclusiveMetricsEvaluator::Make(de.expr, *schema, 
de.case_sensitive));
+  }
+  return vec[idx].get();
+}
+
+Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
+                                                 const TableMetadata& metadata,
+                                                 int32_t manifest_spec_id) {
+  if (!entry.data_file) return false;
+  const DataFile& file = *entry.data_file;
+
+  // Path-based check
+  if (delete_paths_.count(file.file_path)) {
+    pending_paths_.erase(file.file_path);

Review Comment:
   [P2] This consumes the required-delete tracking while filtering, which makes 
`FailMissingDeletePaths` non-idempotent across retries. Java keeps 
`deletePaths` immutable and `validateRequiredDeletes` recomputes the deleted 
file set from the filtered manifests on every call, so a retry against a 
changed base can still fail if the required path is no longer present. Here, 
once the first filtering pass erases the path from `pending_paths_`, a later 
`FilterManifests` call on the same manager can miss the validation failure 
because the path stays consumed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to