This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new be577eeac6 Core: Maintain passed in ordering of files in Manifest 
Lists (#13411)
be577eeac6 is described below

commit be577eeac631d77243beb57409e476bf197f79d7
Author: Russell Spitzer <russell.spit...@gmail.com>
AuthorDate: Tue Jul 1 12:44:34 2025 -0500

    Core: Maintain passed in ordering of files in Manifest Lists (#13411)
---
 .../java/org/apache/iceberg/AppendBenchmark.java   |  1 +
 .../java/org/apache/iceberg/SnapshotProducer.java  | 32 ++++++++++++++++++----
 jmh.gradle                                         |  1 +
 3 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
index a444e7ff9c..57a5c2d3a1 100644
--- a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
@@ -86,6 +86,7 @@ public class AppendBenchmark {
 
   @Setup
   public void setupBenchmark() {
+    dropTable();
     initTable();
     initDataFiles();
   }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 118ae0b328..77cdac8f4a 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -40,12 +40,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
@@ -69,10 +69,10 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Queues;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.math.IntMath;
 import org.apache.iceberg.util.Exceptions;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.Tasks;
@@ -669,13 +669,33 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
       Collection<F> files, Function<List<F>, List<ManifestFile>> writeFunc) {
     int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE, 
files.size());
     List<List<F>> groups = divide(files, parallelism);
-    Queue<ManifestFile> manifests = Queues.newConcurrentLinkedQueue();
-    Tasks.foreach(groups)
+
+    // Create a new list pairing each group with its index
+    List<Pair<Integer, List<F>>> groupsWithIndex = Lists.newArrayList();
+    for (int i = 0; i < groups.size(); i++) {
+      groupsWithIndex.add(Pair.of(i, groups.get(i)));
+    }
+
+    AtomicReferenceArray<List<ManifestFile>> results = new 
AtomicReferenceArray<>(groups.size());
+
+    Tasks.foreach(groupsWithIndex)
         .stopOnFailure()
         .throwFailureWhenFinished()
         .executeWith(ThreadPools.getWorkerPool())
-        .run(group -> manifests.addAll(writeFunc.apply(group)));
-    return ImmutableList.copyOf(manifests);
+        .run(
+            indexedGroup -> {
+              int index = indexedGroup.first();
+              List<F> group = indexedGroup.second();
+              List<ManifestFile> groupResults = writeFunc.apply(group);
+              results.set(index, groupResults);
+            });
+
+    // Collect results in order
+    ImmutableList.Builder<ManifestFile> builder = ImmutableList.builder();
+    for (int i = 0; i < results.length(); i++) {
+      builder.addAll(results.get(i));
+    }
+    return builder.build();
   }
 
   private static <T> List<List<T>> divide(Collection<T> collection, int 
groupCount) {
diff --git a/jmh.gradle b/jmh.gradle
index 4d6d7207c5..7f3bc0deaf 100644
--- a/jmh.gradle
+++ b/jmh.gradle
@@ -66,6 +66,7 @@ configure(jmhProjects) {
     forceGC = true
     includeTests = true
     humanOutputFile = file(jmhOutputPath)
+    jvmArgs = ['-Xmx32g']
     resultsFile = file(jmhJsonOutputPath)
     resultFormat = 'JSON'
     includes = [jmhIncludeRegex]

Reply via email to