This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 86e9ebbffd Core: Support parallel execution when scanning entries in 
ManifestGroup (#15426)
86e9ebbffd is described below

commit 86e9ebbffde79ae270c3d57d621e509f9142b27c
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Fri Feb 27 09:43:53 2026 -0800

    Core: Support parallel execution when scanning entries in ManifestGroup 
(#15426)
---
 .../main/java/org/apache/iceberg/FindFiles.java    | 24 +++++++++---------
 .../java/org/apache/iceberg/ManifestGroup.java     | 29 ++++++++++++++++++++++
 .../java/org/apache/iceberg/TestFindFiles.java     | 29 +++++++++++++++-------
 3 files changed, 61 insertions(+), 21 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java 
b/core/src/main/java/org/apache/iceberg/FindFiles.java
index e21f3972ce..9d6f84ad0c 100644
--- a/core/src/main/java/org/apache/iceberg/FindFiles.java
+++ b/core/src/main/java/org/apache/iceberg/FindFiles.java
@@ -208,18 +208,18 @@ public class FindFiles {
       }
 
       // when snapshot is not null
-      CloseableIterable<ManifestEntry<DataFile>> entries =
-          new ManifestGroup(ops.io(), snapshot.dataManifests(ops.io()))
-              .specsById(ops.current().specsById())
-              .filterData(rowFilter)
-              .filterFiles(fileFilter)
-              .filterPartitions(partitionFilter)
-              .ignoreDeleted()
-              .caseSensitive(caseSensitive)
-              .planWith(executorService)
-              .entries();
-
-      return CloseableIterable.transform(entries, entry -> 
entry.file().copy(includeColumnStats));
+      return new ManifestGroup(ops.io(), snapshot.dataManifests(ops.io()))
+          .specsById(ops.current().specsById())
+          .filterData(rowFilter)
+          .filterFiles(fileFilter)
+          .filterPartitions(partitionFilter)
+          .ignoreDeleted()
+          .caseSensitive(caseSensitive)
+          .planWith(executorService)
+          .entries(
+              entries ->
+                  CloseableIterable.transform(
+                      entries, entry -> 
entry.file().copy(includeColumnStats)));
     }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java 
b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index e1202d6ee2..39b66d558b 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
@@ -231,6 +232,34 @@ class ManifestGroup {
     return CloseableIterable.concat(entries((manifest, entries) -> entries));
   }
 
+  /**
+   * Returns a transformed iterable over manifest entries in the set of 
manifests.
+   *
+   * <p>The provided function is applied to the entries of each manifest to 
produce the output
+   * elements. When parallel execution is enabled via {@link 
#planWith(ExecutorService)}, manifests
+   * are scanned in parallel and the function is invoked in worker threads.
+   *
+   * <p>Since the underlying {@link ManifestReader} reuses entry objects 
during iteration, the
+   * provided function must make defensive copies of any entry data it needs 
to retain beyond the
+   * current iteration step.
+   *
+   * @param entryTransform a function that transforms each manifest's entries 
into the desired
+   *     output
+   * @param <T> the output element type
+   * @return a {@link CloseableIterable} of transformed elements
+   */
+  public <T> CloseableIterable<T> entries(
+      Function<CloseableIterable<ManifestEntry<DataFile>>, 
CloseableIterable<T>> entryTransform) {
+    Iterable<CloseableIterable<T>> iterables =
+        entries((manifest, entries) -> entryTransform.apply(entries));
+
+    if (executorService != null) {
+      return new ParallelIterable<>(iterables, executorService);
+    } else {
+      return CloseableIterable.concat(iterables);
+    }
+  }
+
   /**
    * Returns an iterable for groups of data files in the set of manifests.
    *
diff --git a/core/src/test/java/org/apache/iceberg/TestFindFiles.java 
b/core/src/test/java/org/apache/iceberg/TestFindFiles.java
index 14746601e7..9a473dd41b 100644
--- a/core/src/test/java/org/apache/iceberg/TestFindFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestFindFiles.java
@@ -24,12 +24,14 @@ import java.util.Arrays;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ParallelIterable;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -209,15 +211,20 @@ public class TestFindFiles extends TestBase {
 
   @TestTemplate
   public void testPlanWith() {
-    table
-        .newAppend()
-        .appendFile(FILE_A)
-        .appendFile(FILE_B)
-        .appendFile(FILE_C)
-        .appendFile(FILE_D)
-        .commit();
-
-    ExecutorService executorService = Executors.newFixedThreadPool(2);
+    // use separate commits to create multiple manifests for parallel scanning
+    table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    AtomicInteger planThreadsIndex = new AtomicInteger(0);
+    ExecutorService executorService =
+        Executors.newFixedThreadPool(
+            2,
+            runnable -> {
+              Thread thread = new Thread(runnable);
+              thread.setName("plan-" + planThreadsIndex.getAndIncrement());
+              thread.setDaemon(true);
+              return thread;
+            });
     try {
       Iterable<DataFile> files =
           FindFiles.in(table)
@@ -225,7 +232,11 @@ public class TestFindFiles extends TestBase {
               .withMetadataMatching(Expressions.startsWith("file_path", 
"/path/to/data"))
               .collect();
 
+      assertThat(files).isInstanceOf(ParallelIterable.class);
       assertThat(pathSet(files)).isEqualTo(pathSet(FILE_A, FILE_B, FILE_C, 
FILE_D));
+      assertThat(planThreadsIndex.get())
+          .as("Thread should be created in provided pool")
+          .isGreaterThan(0);
     } finally {
       executorService.shutdown();
     }

Reply via email to