HuaHuaY commented on code in PR #687:
URL: https://github.com/apache/iceberg-cpp/pull/687#discussion_r3322984290


##########
src/iceberg/delete_file_index.cc:
##########
@@ -528,107 +531,159 @@ DeleteFileIndex::Builder& 
DeleteFileIndex::Builder::IgnoreResiduals() {
   return *this;
 }
 
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::PlanWith(OptionalExecutor 
executor) {
+  executor_ = executor;
+  return *this;
+}
+
 Result<std::vector<ManifestEntry>> DeleteFileIndex::Builder::LoadDeleteFiles() 
{
-  // Build expression caches per spec ID
-  std::unordered_map<int32_t, std::shared_ptr<Expression>> part_expr_cache;
+  // TODO(zehua): Replace with a thread-safe LRU cache.
+  std::shared_mutex projected_expr_cache_mutex;
+  std::unordered_map<int32_t, std::shared_ptr<Expression>> 
projected_expr_cache;
+  std::shared_mutex eval_cache_mutex;
   std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>> eval_cache;
 
   auto data_filter = ignore_residuals_ ? True::Instance() : data_filter_;
 
-  // Filter and read manifests into manifest entries
-  std::vector<ManifestEntry> files;
-  for (const auto& manifest : delete_manifests_) {
-    if (manifest.content != ManifestContent::kDeletes) {
-      continue;
+  auto and_filters =
+      [](std::shared_ptr<Expression> left,
+         std::shared_ptr<Expression> right) -> 
Result<std::shared_ptr<Expression>> {
+    if (left && right) {
+      return And::MakeFolded(std::move(left), std::move(right));
     }
-    if (!manifest.has_added_files() && !manifest.has_existing_files()) {
-      continue;
+    if (right) {
+      return right;
+    }
+    return left;
+  };
+
+  auto get_projected_expr = [&](int32_t spec_id,
+                                const std::shared_ptr<PartitionSpec>& spec)
+      -> Result<std::shared_ptr<Expression>> {
+    if (!data_filter_) {
+      return std::shared_ptr<Expression>();
     }
 
-    const int32_t spec_id = manifest.partition_spec_id;
-    auto spec_iter = specs_by_id_.find(spec_id);
-    ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
-                  "Partition spec ID {} not found when loading delete files", 
spec_id);
+    {
+      std::shared_lock lock(projected_expr_cache_mutex);
+      auto iter = projected_expr_cache.find(spec_id);
+      if (iter != projected_expr_cache.end()) {
+        return iter->second;
+      }
+    }
 
-    const auto& spec = spec_iter->second;
+    std::lock_guard lock(projected_expr_cache_mutex);
+    auto iter = projected_expr_cache.find(spec_id);
+    if (iter != projected_expr_cache.end()) {
+      return iter->second;
+    }
 
-    // Get or compute projected partition expression
-    if (!part_expr_cache.contains(spec_id) && data_filter_) {
-      auto projector = Projections::Inclusive(*spec, *schema_, 
case_sensitive_);
-      ICEBERG_ASSIGN_OR_RAISE(auto projected, 
projector->Project(data_filter_));
-      part_expr_cache[spec_id] = std::move(projected);
+    auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_);
+    ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_));
+    auto [inserted_iter, _] = projected_expr_cache.emplace(spec_id, 
std::move(projected));
+    return inserted_iter->second;
+  };
+
+  auto get_manifest_evaluator =
+      [&](int32_t spec_id, const std::shared_ptr<PartitionSpec>& spec,
+          const std::shared_ptr<Expression>& filter) -> 
Result<ManifestEvaluator*> {
+    if (!filter) {
+      return nullptr;
     }
 
-    // Get or create manifest evaluator
-    if (!eval_cache.contains(spec_id)) {
-      auto filter = partition_filter_;
-      if (auto it = part_expr_cache.find(spec_id); it != 
part_expr_cache.cend()) {
-        if (filter) {
-          ICEBERG_ASSIGN_OR_RAISE(filter, And::Make(filter, it->second));
-        } else {
-          filter = it->second;
-        }
-      }
-      if (filter) {
-        ICEBERG_ASSIGN_OR_RAISE(auto evaluator,
-                                ManifestEvaluator::MakePartitionFilter(
-                                    std::move(filter), spec, *schema_, 
case_sensitive_));
-        eval_cache[spec_id] = std::move(evaluator);
+    {
+      std::shared_lock lock(eval_cache_mutex);
+      auto iter = eval_cache.find(spec_id);
+      if (iter != eval_cache.end()) {
+        return iter->second.get();
       }
     }
 
-    // Evaluate manifest against filter
-    if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) {
-      ICEBERG_ASSIGN_OR_RAISE(auto should_match, 
it->second->Evaluate(manifest));
-      if (!should_match) {
-        continue;  // Manifest doesn't match filter
-      }
+    std::lock_guard lock(eval_cache_mutex);
+    auto iter = eval_cache.find(spec_id);
+    if (iter != eval_cache.end()) {
+      return iter->second.get();
     }
 
-    // Read manifest entries
-    ICEBERG_ASSIGN_OR_RAISE(auto reader,
-                            ManifestReader::Make(manifest, io_, schema_, 
spec));
-
-    auto partition_filter = partition_filter_;
-    if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) 
{
-      if (partition_filter) {
-        ICEBERG_ASSIGN_OR_RAISE(partition_filter,
-                                And::Make(partition_filter, it->second));
-      } else {
-        partition_filter = it->second;
+    ICEBERG_ASSIGN_OR_RAISE(auto evaluator, 
ManifestEvaluator::MakePartitionFilter(
+                                                filter, spec, *schema_, 
case_sensitive_));
+    auto [inserted_iter, _] = eval_cache.emplace(spec_id, 
std::move(evaluator));
+    return inserted_iter->second.get();
+  };
+
+  std::vector<std::vector<ManifestEntry>> 
manifest_results(delete_manifests_.size());
+  auto read_tasks = TaskGroup().SetExecutor(executor_);
+  for (auto&& [manifest, manifest_result] :
+       std::views::zip(delete_manifests_, manifest_results)) {
+    read_tasks.Submit([&]() -> Status {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to