This is an automated email from the ASF dual-hosted git repository. stevenwu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new be577eeac6 Core: Maintain passed in ordering of files in Manifest Lists (#13411) be577eeac6 is described below commit be577eeac631d77243beb57409e476bf197f79d7 Author: Russell Spitzer <russell.spit...@gmail.com> AuthorDate: Tue Jul 1 12:44:34 2025 -0500 Core: Maintain passed in ordering of files in Manifest Lists (#13411) --- .../java/org/apache/iceberg/AppendBenchmark.java | 1 + .../java/org/apache/iceberg/SnapshotProducer.java | 32 ++++++++++++++++++---- jmh.gradle | 1 + 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java index a444e7ff9c..57a5c2d3a1 100644 --- a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java +++ b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java @@ -86,6 +86,7 @@ public class AppendBenchmark { @Setup public void setupBenchmark() { + dropTable(); initTable(); initDataFiles(); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 118ae0b328..77cdac8f4a 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -40,12 +40,12 @@ import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Consumer; import java.util.function.Function; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -69,10 +69,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Queues; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.IntMath; import org.apache.iceberg.util.Exceptions; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; @@ -669,13 +669,33 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> { Collection<F> files, Function<List<F>, List<ManifestFile>> writeFunc) { int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE, files.size()); List<List<F>> groups = divide(files, parallelism); - Queue<ManifestFile> manifests = Queues.newConcurrentLinkedQueue(); - Tasks.foreach(groups) + + // Create a new list pairing each group with its index + List<Pair<Integer, List<F>>> groupsWithIndex = Lists.newArrayList(); + for (int i = 0; i < groups.size(); i++) { + groupsWithIndex.add(Pair.of(i, groups.get(i))); + } + + AtomicReferenceArray<List<ManifestFile>> results = new AtomicReferenceArray<>(groups.size()); + + Tasks.foreach(groupsWithIndex) .stopOnFailure() .throwFailureWhenFinished() .executeWith(ThreadPools.getWorkerPool()) - .run(group -> manifests.addAll(writeFunc.apply(group))); - return ImmutableList.copyOf(manifests); + .run( + indexedGroup -> { + int index = indexedGroup.first(); + List<F> group = indexedGroup.second(); + List<ManifestFile> groupResults = writeFunc.apply(group); + results.set(index, groupResults); + }); + + // Collect results in order + ImmutableList.Builder<ManifestFile> builder = ImmutableList.builder(); + for (int i = 0; i < results.length(); i++) { + builder.addAll(results.get(i)); + } + return builder.build(); } private static <T> List<List<T>> divide(Collection<T> collection, int groupCount) { diff --git a/jmh.gradle b/jmh.gradle index 4d6d7207c5..7f3bc0deaf 100644 --- a/jmh.gradle +++ b/jmh.gradle @@ -66,6 +66,7 @@ configure(jmhProjects) { forceGC = true includeTests = true humanOutputFile = file(jmhOutputPath) + jvmArgs = ['-Xmx32g'] resultsFile = file(jmhJsonOutputPath) resultFormat = 'JSON' includes = [jmhIncludeRegex]