This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 86e9ebbffd Core: Support parallel execution when scanning entries in
ManifestGroup (#15426)
86e9ebbffd is described below
commit 86e9ebbffde79ae270c3d57d621e509f9142b27c
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Fri Feb 27 09:43:53 2026 -0800
Core: Support parallel execution when scanning entries in ManifestGroup
(#15426)
---
.../main/java/org/apache/iceberg/FindFiles.java | 24 +++++++++---------
.../java/org/apache/iceberg/ManifestGroup.java | 29 ++++++++++++++++++++++
.../java/org/apache/iceberg/TestFindFiles.java | 29 +++++++++++++++-------
3 files changed, 61 insertions(+), 21 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java
b/core/src/main/java/org/apache/iceberg/FindFiles.java
index e21f3972ce..9d6f84ad0c 100644
--- a/core/src/main/java/org/apache/iceberg/FindFiles.java
+++ b/core/src/main/java/org/apache/iceberg/FindFiles.java
@@ -208,18 +208,18 @@ public class FindFiles {
}
// when snapshot is not null
- CloseableIterable<ManifestEntry<DataFile>> entries =
- new ManifestGroup(ops.io(), snapshot.dataManifests(ops.io()))
- .specsById(ops.current().specsById())
- .filterData(rowFilter)
- .filterFiles(fileFilter)
- .filterPartitions(partitionFilter)
- .ignoreDeleted()
- .caseSensitive(caseSensitive)
- .planWith(executorService)
- .entries();
-
- return CloseableIterable.transform(entries, entry ->
entry.file().copy(includeColumnStats));
+ return new ManifestGroup(ops.io(), snapshot.dataManifests(ops.io()))
+ .specsById(ops.current().specsById())
+ .filterData(rowFilter)
+ .filterFiles(fileFilter)
+ .filterPartitions(partitionFilter)
+ .ignoreDeleted()
+ .caseSensitive(caseSensitive)
+ .planWith(executorService)
+ .entries(
+ entries ->
+ CloseableIterable.transform(
+ entries, entry ->
entry.file().copy(includeColumnStats)));
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index e1202d6ee2..39b66d558b 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
@@ -231,6 +232,34 @@ class ManifestGroup {
return CloseableIterable.concat(entries((manifest, entries) -> entries));
}
+ /**
+ * Returns a transformed iterable over manifest entries in the set of
manifests.
+ *
+ * <p>The provided function is applied to the entries of each manifest to
produce the output
+ * elements. When parallel execution is enabled via {@link
#planWith(ExecutorService)}, manifests
+ * are scanned in parallel and the function is invoked in worker threads.
+ *
+ * <p>Since the underlying {@link ManifestReader} reuses entry objects
during iteration, the
+ * provided function must make defensive copies of any entry data it needs
to retain beyond the
+ * current iteration step.
+ *
+ * @param entryTransform a function that transforms each manifest's entries
into the desired
+ * output
+ * @param <T> the output element type
+ * @return a {@link CloseableIterable} of transformed elements
+ */
+ public <T> CloseableIterable<T> entries(
+ Function<CloseableIterable<ManifestEntry<DataFile>>,
CloseableIterable<T>> entryTransform) {
+ Iterable<CloseableIterable<T>> iterables =
+ entries((manifest, entries) -> entryTransform.apply(entries));
+
+ if (executorService != null) {
+ return new ParallelIterable<>(iterables, executorService);
+ } else {
+ return CloseableIterable.concat(iterables);
+ }
+ }
+
/**
* Returns an iterable for groups of data files in the set of manifests.
*
diff --git a/core/src/test/java/org/apache/iceberg/TestFindFiles.java
b/core/src/test/java/org/apache/iceberg/TestFindFiles.java
index 14746601e7..9a473dd41b 100644
--- a/core/src/test/java/org/apache/iceberg/TestFindFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestFindFiles.java
@@ -24,12 +24,14 @@ import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ParallelIterable;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -209,15 +211,20 @@ public class TestFindFiles extends TestBase {
@TestTemplate
public void testPlanWith() {
- table
- .newAppend()
- .appendFile(FILE_A)
- .appendFile(FILE_B)
- .appendFile(FILE_C)
- .appendFile(FILE_D)
- .commit();
-
- ExecutorService executorService = Executors.newFixedThreadPool(2);
+ // use separate commits to create multiple manifests for parallel scanning
+ table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+ table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit();
+
+ AtomicInteger planThreadsIndex = new AtomicInteger(0);
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(
+ 2,
+ runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("plan-" + planThreadsIndex.getAndIncrement());
+ thread.setDaemon(true);
+ return thread;
+ });
try {
Iterable<DataFile> files =
FindFiles.in(table)
@@ -225,7 +232,11 @@ public class TestFindFiles extends TestBase {
.withMetadataMatching(Expressions.startsWith("file_path",
"/path/to/data"))
.collect();
+ assertThat(files).isInstanceOf(ParallelIterable.class);
assertThat(pathSet(files)).isEqualTo(pathSet(FILE_A, FILE_B, FILE_C,
FILE_D));
+ assertThat(planThreadsIndex.get())
+ .as("Thread should be created in provided pool")
+ .isGreaterThan(0);
} finally {
executorService.shutdown();
}