This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d2087a04bd Core: Parallelize manifest writing for many new files 
(#11086)
d2087a04bd is described below

commit d2087a04bd6ba62b13f2540480a18b5edc710e8e
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Sep 12 16:41:05 2024 -0700

    Core: Parallelize manifest writing for many new files (#11086)
---
 .../java/org/apache/iceberg/AppendBenchmark.java   |  13 ++-
 .../main/java/org/apache/iceberg/FastAppend.java   |   9 +-
 .../apache/iceberg/MergingSnapshotProducer.java    |  73 +-----------
 .../java/org/apache/iceberg/SnapshotProducer.java  | 124 +++++++++++++++++++++
 .../src/test/java/org/apache/iceberg/TestBase.java |   5 +
 .../java/org/apache/iceberg/TestFastAppend.java    |  19 ++++
 .../java/org/apache/iceberg/TestMergeAppend.java   |  21 ++++
 .../org/apache/iceberg/TestSnapshotProducer.java   |  77 +++++++++++++
 8 files changed, 261 insertions(+), 80 deletions(-)

diff --git a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
index a8bafe413c..a444e7ff9c 100644
--- a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
@@ -38,7 +38,6 @@ import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Threads;
 import org.openjdk.jmh.annotations.Timeout;
 import org.openjdk.jmh.annotations.Warmup;
-import org.openjdk.jmh.infra.Blackhole;
 
 /**
  * A benchmark that evaluates the performance of appending files to the table.
@@ -66,14 +65,20 @@ public class AppendBenchmark {
           required(4, "date_col", Types.DateType.get()),
           required(5, "timestamp_col", Types.TimestampType.withoutZone()),
           required(6, "timestamp_tz_col", Types.TimestampType.withZone()),
-          required(7, "str_col", Types.StringType.get()));
+          required(7, "str_col1", Types.StringType.get()),
+          required(8, "str_col2", Types.StringType.get()),
+          required(9, "str_col3", Types.StringType.get()),
+          required(10, "str_col4", Types.StringType.get()),
+          required(11, "str_col5", Types.StringType.get()),
+          required(12, "str_col6", Types.StringType.get()),
+          required(13, "str_col7", Types.StringType.get()));
   private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
   private static final HadoopTables TABLES = new HadoopTables();
 
   private Table table;
   private List<DataFile> dataFiles;
 
-  @Param({"500000", "1000000", "2500000"})
+  @Param({"50000", "100000", "500000", "1000000", "2500000"})
   private int numFiles;
 
   @Param({"true", "false"})
@@ -92,7 +97,7 @@ public class AppendBenchmark {
 
   @Benchmark
   @Threads(1)
-  public void appendFiles(Blackhole blackhole) {
+  public void appendFiles() {
     AppendFiles append = fast ? table.newFastAppend() : table.newAppend();
 
     for (DataFile dataFile : dataFiles) {
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java 
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 4976a8081c..1bae2e2fc5 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -215,14 +215,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
     }
 
     if (newManifests == null && !newFiles.isEmpty()) {
-      RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);
-      try {
-        newFiles.forEach(writer::add);
-      } finally {
-        writer.close();
-      }
-
-      this.newManifests = writer.toManifestFiles();
+      this.newManifests = writeDataManifests(newFiles, spec);
       hasNewFiles = false;
     }
 
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index d1eb5d89da..6a4da2abc9 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -33,7 +33,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.events.CreateSnapshotEvent;
-import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -972,21 +971,9 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
     if (cachedNewDataManifests.isEmpty()) {
       newDataFilesBySpec.forEach(
           (dataSpec, newDataFiles) -> {
-            try {
-              RollingManifestWriter<DataFile> writer = 
newRollingManifestWriter(dataSpec);
-              try {
-                if (newDataFilesDataSequenceNumber == null) {
-                  newDataFiles.forEach(writer::add);
-                } else {
-                  newDataFiles.forEach(f -> writer.add(f, 
newDataFilesDataSequenceNumber));
-                }
-              } finally {
-                writer.close();
-              }
-              this.cachedNewDataManifests.addAll(writer.toManifestFiles());
-            } catch (IOException e) {
-              throw new RuntimeIOException(e, "Failed to close manifest 
writer");
-            }
+            List<ManifestFile> newDataManifests =
+                writeDataManifests(newDataFiles, 
newDataFilesDataSequenceNumber, dataSpec);
+            cachedNewDataManifests.addAll(newDataManifests);
           });
       this.hasNewDataFiles = false;
     }
@@ -1016,24 +1003,8 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
       newDeleteFilesBySpec.forEach(
           (specId, deleteFiles) -> {
             PartitionSpec spec = ops.current().spec(specId);
-            try {
-              RollingManifestWriter<DeleteFile> writer = 
newRollingDeleteManifestWriter(spec);
-              try {
-                deleteFiles.forEach(
-                    df -> {
-                      if (df.dataSequenceNumber() != null) {
-                        writer.add(df.deleteFile(), df.dataSequenceNumber());
-                      } else {
-                        writer.add(df.deleteFile());
-                      }
-                    });
-              } finally {
-                writer.close();
-              }
-              cachedNewDeleteManifests.addAll(writer.toManifestFiles());
-            } catch (IOException e) {
-              throw new RuntimeIOException(e, "Failed to close manifest 
writer");
-            }
+            List<ManifestFile> newDeleteManifests = 
writeDeleteManifests(deleteFiles, spec);
+            cachedNewDeleteManifests.addAll(newDeleteManifests);
           });
 
       this.hasNewDeleteFiles = false;
@@ -1147,38 +1118,4 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
       return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
     }
   }
-
-  private static class DeleteFileHolder {
-    private final DeleteFile deleteFile;
-    private final Long dataSequenceNumber;
-
-    /**
-     * Wrap a delete file for commit with a given data sequence number
-     *
-     * @param deleteFile delete file
-     * @param dataSequenceNumber data sequence number to apply
-     */
-    DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) {
-      this.deleteFile = deleteFile;
-      this.dataSequenceNumber = dataSequenceNumber;
-    }
-
-    /**
-     * Wrap a delete file for commit with the latest sequence number
-     *
-     * @param deleteFile delete file
-     */
-    DeleteFileHolder(DeleteFile deleteFile) {
-      this.deleteFile = deleteFile;
-      this.dataSequenceNumber = null;
-    }
-
-    public DeleteFile deleteFile() {
-      return deleteFile;
-    }
-
-    public Long dataSequenceNumber() {
-      return dataSequenceNumber;
-    }
-  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 74997cc898..22f6ac5e0b 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -34,15 +34,18 @@ import static 
org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import java.io.IOException;
+import java.math.RoundingMode;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptingFileIO;
 import org.apache.iceberg.events.CreateSnapshotEvent;
@@ -59,10 +62,14 @@ import org.apache.iceberg.metrics.ImmutableCommitReport;
 import org.apache.iceberg.metrics.LoggingMetricsReporter;
 import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.metrics.Timer.Timed;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
 import org.apache.iceberg.util.Exceptions;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.Tasks;
@@ -73,6 +80,7 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("UnnecessaryAnonymousClass")
 abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
   private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotProducer.class);
+  static final int MIN_FILE_GROUP_SIZE = 10_000;
   static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();
 
   /** Default callback used to delete files. */
@@ -554,6 +562,88 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
     return true;
   }
 
+  protected List<ManifestFile> writeDataManifests(List<DataFile> files, 
PartitionSpec spec) {
+    return writeDataManifests(files, null /* inherit data seq */, spec);
+  }
+
+  protected List<ManifestFile> writeDataManifests(
+      List<DataFile> files, Long dataSeq, PartitionSpec spec) {
+    return writeManifests(files, group -> writeDataFileGroup(group, dataSeq, 
spec));
+  }
+
+  private List<ManifestFile> writeDataFileGroup(
+      List<DataFile> files, Long dataSeq, PartitionSpec spec) {
+    RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);
+
+    try (RollingManifestWriter<DataFile> closableWriter = writer) {
+      if (dataSeq != null) {
+        files.forEach(file -> closableWriter.add(file, dataSeq));
+      } else {
+        files.forEach(closableWriter::add);
+      }
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to write data manifests");
+    }
+
+    return writer.toManifestFiles();
+  }
+
+  protected List<ManifestFile> writeDeleteManifests(
+      List<DeleteFileHolder> files, PartitionSpec spec) {
+    return writeManifests(files, group -> writeDeleteFileGroup(group, spec));
+  }
+
+  private List<ManifestFile> writeDeleteFileGroup(
+      List<DeleteFileHolder> files, PartitionSpec spec) {
+    RollingManifestWriter<DeleteFile> writer = 
newRollingDeleteManifestWriter(spec);
+
+    try (RollingManifestWriter<DeleteFile> closableWriter = writer) {
+      for (DeleteFileHolder file : files) {
+        if (file.dataSequenceNumber() != null) {
+          closableWriter.add(file.deleteFile(), file.dataSequenceNumber());
+        } else {
+          closableWriter.add(file.deleteFile());
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to write delete manifests");
+    }
+
+    return writer.toManifestFiles();
+  }
+
+  private static <F> List<ManifestFile> writeManifests(
+      List<F> files, Function<List<F>, List<ManifestFile>> writeFunc) {
+    int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE, 
files.size());
+    List<List<F>> groups = divide(files, parallelism);
+    Queue<ManifestFile> manifests = Queues.newConcurrentLinkedQueue();
+    Tasks.foreach(groups)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(ThreadPools.getWorkerPool())
+        .run(group -> manifests.addAll(writeFunc.apply(group)));
+    return ImmutableList.copyOf(manifests);
+  }
+
+  private static <T> List<List<T>> divide(List<T> list, int groupCount) {
+    int groupSize = IntMath.divide(list.size(), groupCount, 
RoundingMode.CEILING);
+    return Lists.partition(list, groupSize);
+  }
+
+  /**
+   * Calculates how many manifest writers can be used to concurrently to 
handle the given number of
+   * files without creating too small manifests.
+   *
+   * @param workerPoolSize the size of the available worker pool
+   * @param fileCount the total number of files to be processed
+   * @return the number of manifest writers that can be used concurrently
+   */
+  @VisibleForTesting
+  static int manifestWriterCount(int workerPoolSize, int fileCount) {
+    int limit = IntMath.divide(fileCount, MIN_FILE_GROUP_SIZE, 
RoundingMode.HALF_UP);
+    return Math.max(1, Math.min(workerPoolSize, limit));
+  }
+
   private static ManifestFile addMetadata(TableOperations ops, ManifestFile 
manifest) {
     try (ManifestReader<DataFile> reader =
         ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
@@ -654,4 +744,38 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
       }
     }
   }
+
+  protected static class DeleteFileHolder {
+    private final DeleteFile deleteFile;
+    private final Long dataSequenceNumber;
+
+    /**
+     * Wrap a delete file for commit with a given data sequence number.
+     *
+     * @param deleteFile delete file
+     * @param dataSequenceNumber data sequence number to apply
+     */
+    DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) {
+      this.deleteFile = deleteFile;
+      this.dataSequenceNumber = dataSequenceNumber;
+    }
+
+    /**
+     * Wrap a delete file for commit with the latest sequence number.
+     *
+     * @param deleteFile delete file
+     */
+    DeleteFileHolder(DeleteFile deleteFile) {
+      this.deleteFile = deleteFile;
+      this.dataSequenceNumber = null;
+    }
+
+    public DeleteFile deleteFile() {
+      return deleteFile;
+    }
+
+    public Long dataSequenceNumber() {
+      return dataSequenceNumber;
+    }
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java 
b/core/src/test/java/org/apache/iceberg/TestBase.java
index e03a1efd51..23fabc2a94 100644
--- a/core/src/test/java/org/apache/iceberg/TestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TestBase.java
@@ -27,6 +27,7 @@ import java.io.UncheckedIOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -460,6 +461,10 @@ public class TestBase {
   }
 
   void validateTableFiles(Table tbl, DataFile... expectedFiles) {
+    validateTableFiles(tbl, Arrays.asList(expectedFiles));
+  }
+
+  void validateTableFiles(Table tbl, Collection<DataFile> expectedFiles) {
     Set<CharSequence> expectedFilePaths = Sets.newHashSet();
     for (DataFile file : expectedFiles) {
       expectedFilePaths.add(file.path());
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java 
b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index 8125c528d9..b2f19fbd5f 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -42,6 +42,25 @@ public class TestFastAppend extends TestBase {
     return Arrays.asList(1, 2, 3);
   }
 
+  @TestTemplate
+  public void testAddManyFiles() {
+    assertThat(listManifestFiles()).as("Table should start empty").isEmpty();
+
+    List<DataFile> dataFiles = Lists.newArrayList();
+
+    for (int ordinal = 0; ordinal < 2 * SnapshotProducer.MIN_FILE_GROUP_SIZE; 
ordinal++) {
+      StructLike partition = TestHelpers.Row.of(ordinal % 2);
+      DataFile dataFile = FileGenerationUtil.generateDataFile(table, 
partition);
+      dataFiles.add(dataFile);
+    }
+
+    AppendFiles append = table.newAppend();
+    dataFiles.forEach(append::appendFile);
+    append.commit();
+
+    validateTableFiles(table, dataFiles);
+  }
+
   @TestTemplate
   public void appendNullFile() {
     assertThatThrownBy(() -> table.newFastAppend().appendFile(null).commit())
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java 
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index abfcb31833..e079f63401 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -33,9 +33,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.TestHelpers.Row;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.TestTemplate;
@@ -62,6 +64,25 @@ public class TestMergeAppend extends TestBase {
         .hasMessage("Invalid data file: null");
   }
 
+  @TestTemplate
+  public void testAddManyFiles() {
+    assertThat(listManifestFiles()).as("Table should start empty").isEmpty();
+
+    List<DataFile> dataFiles = Lists.newArrayList();
+
+    for (int ordinal = 0; ordinal < 2 * SnapshotProducer.MIN_FILE_GROUP_SIZE; 
ordinal++) {
+      StructLike partition = Row.of(ordinal % 2);
+      DataFile dataFile = FileGenerationUtil.generateDataFile(table, 
partition);
+      dataFiles.add(dataFile);
+    }
+
+    AppendFiles append = table.newAppend();
+    dataFiles.forEach(append::appendFile);
+    append.commit();
+
+    validateTableFiles(table, dataFiles);
+  }
+
   @TestTemplate
   public void testEmptyTableAppend() {
     assertThat(listManifestFiles()).isEmpty();
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
new file mode 100644
index 0000000000..52bffdf185
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+
+public class TestSnapshotProducer {
+
+  @Test
+  public void testManifestFileGroupSize() {
+    assertManifestWriterCount(
+        4 /* worker pool size */,
+        100 /* file count */,
+        1 /* manifest writer count */,
+        "Must use 1 writer if file count is small");
+
+    assertManifestWriterCount(
+        4 /* worker pool size */,
+        SnapshotProducer.MIN_FILE_GROUP_SIZE /* file count */,
+        1 /* manifest writer count */,
+        "Must use 1 writer if file count matches min group size");
+
+    assertManifestWriterCount(
+        4 /* worker pool size */,
+        SnapshotProducer.MIN_FILE_GROUP_SIZE + 1 /* file count */,
+        1 /* manifest writer count */,
+        "Must use 1 writer if file count is slightly above min group size");
+
+    assertManifestWriterCount(
+        4 /* worker pool size */,
+        (int) (1.25 * SnapshotProducer.MIN_FILE_GROUP_SIZE) /* file count */,
+        1 /* manifest writer count */,
+        "Must use 1 writer when file count is < 1.5 * min group size");
+
+    assertManifestWriterCount(
+        4 /* worker pool size */,
+        (int) (1.5 * SnapshotProducer.MIN_FILE_GROUP_SIZE) /* file count */,
+        2 /* manifest writer count */,
+        "Must use 2 writers when file count is >= 1.5 * min group size");
+
+    assertManifestWriterCount(
+        3 /* worker pool size */,
+        100 * SnapshotProducer.MIN_FILE_GROUP_SIZE /* file count */,
+        3 /* manifest writer count */,
+        "Must limit parallelism to worker pool size when file count is large");
+
+    assertManifestWriterCount(
+        32 /* worker pool size */,
+        5 * SnapshotProducer.MIN_FILE_GROUP_SIZE /* file count */,
+        5 /* manifest writer count */,
+        "Must limit parallelism to avoid tiny manifests");
+  }
+
+  private void assertManifestWriterCount(
+      int workerPoolSize, int fileCount, int expectedManifestWriterCount, 
String errMsg) {
+    int writerCount = SnapshotProducer.manifestWriterCount(workerPoolSize, 
fileCount);
+    
assertThat(writerCount).withFailMessage(errMsg).isEqualTo(expectedManifestWriterCount);
+  }
+}

Reply via email to