This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d2087a04bd Core: Parallelize manifest writing for many new files
(#11086)
d2087a04bd is described below
commit d2087a04bd6ba62b13f2540480a18b5edc710e8e
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Sep 12 16:41:05 2024 -0700
Core: Parallelize manifest writing for many new files (#11086)
---
.../java/org/apache/iceberg/AppendBenchmark.java | 13 ++-
.../main/java/org/apache/iceberg/FastAppend.java | 9 +-
.../apache/iceberg/MergingSnapshotProducer.java | 73 +-----------
.../java/org/apache/iceberg/SnapshotProducer.java | 124 +++++++++++++++++++++
.../src/test/java/org/apache/iceberg/TestBase.java | 5 +
.../java/org/apache/iceberg/TestFastAppend.java | 19 ++++
.../java/org/apache/iceberg/TestMergeAppend.java | 21 ++++
.../org/apache/iceberg/TestSnapshotProducer.java | 77 +++++++++++++
8 files changed, 261 insertions(+), 80 deletions(-)
diff --git a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
index a8bafe413c..a444e7ff9c 100644
--- a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
@@ -38,7 +38,6 @@ import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Timeout;
import org.openjdk.jmh.annotations.Warmup;
-import org.openjdk.jmh.infra.Blackhole;
/**
* A benchmark that evaluates the performance of appending files to the table.
@@ -66,14 +65,20 @@ public class AppendBenchmark {
required(4, "date_col", Types.DateType.get()),
required(5, "timestamp_col", Types.TimestampType.withoutZone()),
required(6, "timestamp_tz_col", Types.TimestampType.withZone()),
- required(7, "str_col", Types.StringType.get()));
+ required(7, "str_col1", Types.StringType.get()),
+ required(8, "str_col2", Types.StringType.get()),
+ required(9, "str_col3", Types.StringType.get()),
+ required(10, "str_col4", Types.StringType.get()),
+ required(11, "str_col5", Types.StringType.get()),
+ required(12, "str_col6", Types.StringType.get()),
+ required(13, "str_col7", Types.StringType.get()));
private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
private static final HadoopTables TABLES = new HadoopTables();
private Table table;
private List<DataFile> dataFiles;
- @Param({"500000", "1000000", "2500000"})
+ @Param({"50000", "100000", "500000", "1000000", "2500000"})
private int numFiles;
@Param({"true", "false"})
@@ -92,7 +97,7 @@ public class AppendBenchmark {
@Benchmark
@Threads(1)
- public void appendFiles(Blackhole blackhole) {
+ public void appendFiles() {
AppendFiles append = fast ? table.newFastAppend() : table.newAppend();
for (DataFile dataFile : dataFiles) {
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 4976a8081c..1bae2e2fc5 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -215,14 +215,7 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
}
if (newManifests == null && !newFiles.isEmpty()) {
- RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);
- try {
- newFiles.forEach(writer::add);
- } finally {
- writer.close();
- }
-
- this.newManifests = writer.toManifestFiles();
+ this.newManifests = writeDataManifests(newFiles, spec);
hasNewFiles = false;
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index d1eb5d89da..6a4da2abc9 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -33,7 +33,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
-import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -972,21 +971,9 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
if (cachedNewDataManifests.isEmpty()) {
newDataFilesBySpec.forEach(
(dataSpec, newDataFiles) -> {
- try {
- RollingManifestWriter<DataFile> writer =
newRollingManifestWriter(dataSpec);
- try {
- if (newDataFilesDataSequenceNumber == null) {
- newDataFiles.forEach(writer::add);
- } else {
- newDataFiles.forEach(f -> writer.add(f,
newDataFilesDataSequenceNumber));
- }
- } finally {
- writer.close();
- }
- this.cachedNewDataManifests.addAll(writer.toManifestFiles());
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to close manifest
writer");
- }
+ List<ManifestFile> newDataManifests =
+ writeDataManifests(newDataFiles,
newDataFilesDataSequenceNumber, dataSpec);
+ cachedNewDataManifests.addAll(newDataManifests);
});
this.hasNewDataFiles = false;
}
@@ -1016,24 +1003,8 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops.current().spec(specId);
- try {
- RollingManifestWriter<DeleteFile> writer =
newRollingDeleteManifestWriter(spec);
- try {
- deleteFiles.forEach(
- df -> {
- if (df.dataSequenceNumber() != null) {
- writer.add(df.deleteFile(), df.dataSequenceNumber());
- } else {
- writer.add(df.deleteFile());
- }
- });
- } finally {
- writer.close();
- }
- cachedNewDeleteManifests.addAll(writer.toManifestFiles());
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to close manifest
writer");
- }
+ List<ManifestFile> newDeleteManifests =
writeDeleteManifests(deleteFiles, spec);
+ cachedNewDeleteManifests.addAll(newDeleteManifests);
});
this.hasNewDeleteFiles = false;
@@ -1147,38 +1118,4 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
}
}
-
- private static class DeleteFileHolder {
- private final DeleteFile deleteFile;
- private final Long dataSequenceNumber;
-
- /**
- * Wrap a delete file for commit with a given data sequence number
- *
- * @param deleteFile delete file
- * @param dataSequenceNumber data sequence number to apply
- */
- DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) {
- this.deleteFile = deleteFile;
- this.dataSequenceNumber = dataSequenceNumber;
- }
-
- /**
- * Wrap a delete file for commit with the latest sequence number
- *
- * @param deleteFile delete file
- */
- DeleteFileHolder(DeleteFile deleteFile) {
- this.deleteFile = deleteFile;
- this.dataSequenceNumber = null;
- }
-
- public DeleteFile deleteFile() {
- return deleteFile;
- }
-
- public Long dataSequenceNumber() {
- return dataSequenceNumber;
- }
- }
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 74997cc898..22f6ac5e0b 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -34,15 +34,18 @@ import static
org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
+import java.math.RoundingMode;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.events.CreateSnapshotEvent;
@@ -59,10 +62,14 @@ import org.apache.iceberg.metrics.ImmutableCommitReport;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.Timer.Timed;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.apache.iceberg.util.Exceptions;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
@@ -73,6 +80,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("UnnecessaryAnonymousClass")
abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
private static final Logger LOG =
LoggerFactory.getLogger(SnapshotProducer.class);
+ static final int MIN_FILE_GROUP_SIZE = 10_000;
static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();
/** Default callback used to delete files. */
@@ -554,6 +562,88 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
return true;
}
+ protected List<ManifestFile> writeDataManifests(List<DataFile> files,
PartitionSpec spec) {
+ return writeDataManifests(files, null /* inherit data seq */, spec);
+ }
+
+ protected List<ManifestFile> writeDataManifests(
+ List<DataFile> files, Long dataSeq, PartitionSpec spec) {
+ return writeManifests(files, group -> writeDataFileGroup(group, dataSeq,
spec));
+ }
+
+ private List<ManifestFile> writeDataFileGroup(
+ List<DataFile> files, Long dataSeq, PartitionSpec spec) {
+ RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);
+
+ try (RollingManifestWriter<DataFile> closableWriter = writer) {
+ if (dataSeq != null) {
+ files.forEach(file -> closableWriter.add(file, dataSeq));
+ } else {
+ files.forEach(closableWriter::add);
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to write data manifests");
+ }
+
+ return writer.toManifestFiles();
+ }
+
+ protected List<ManifestFile> writeDeleteManifests(
+ List<DeleteFileHolder> files, PartitionSpec spec) {
+ return writeManifests(files, group -> writeDeleteFileGroup(group, spec));
+ }
+
+ private List<ManifestFile> writeDeleteFileGroup(
+ List<DeleteFileHolder> files, PartitionSpec spec) {
+ RollingManifestWriter<DeleteFile> writer =
newRollingDeleteManifestWriter(spec);
+
+ try (RollingManifestWriter<DeleteFile> closableWriter = writer) {
+ for (DeleteFileHolder file : files) {
+ if (file.dataSequenceNumber() != null) {
+ closableWriter.add(file.deleteFile(), file.dataSequenceNumber());
+ } else {
+ closableWriter.add(file.deleteFile());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to write delete manifests");
+ }
+
+ return writer.toManifestFiles();
+ }
+
+ private static <F> List<ManifestFile> writeManifests(
+ List<F> files, Function<List<F>, List<ManifestFile>> writeFunc) {
+ int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE,
files.size());
+ List<List<F>> groups = divide(files, parallelism);
+ Queue<ManifestFile> manifests = Queues.newConcurrentLinkedQueue();
+ Tasks.foreach(groups)
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .executeWith(ThreadPools.getWorkerPool())
+ .run(group -> manifests.addAll(writeFunc.apply(group)));
+ return ImmutableList.copyOf(manifests);
+ }
+
+ private static <T> List<List<T>> divide(List<T> list, int groupCount) {
+ int groupSize = IntMath.divide(list.size(), groupCount,
RoundingMode.CEILING);
+ return Lists.partition(list, groupSize);
+ }
+
+ /**
+ * Calculates how many manifest writers can be used to concurrently to
handle the given number of
+ * files without creating too small manifests.
+ *
+ * @param workerPoolSize the size of the available worker pool
+ * @param fileCount the total number of files to be processed
+ * @return the number of manifest writers that can be used concurrently
+ */
+ @VisibleForTesting
+ static int manifestWriterCount(int workerPoolSize, int fileCount) {
+ int limit = IntMath.divide(fileCount, MIN_FILE_GROUP_SIZE,
RoundingMode.HALF_UP);
+ return Math.max(1, Math.min(workerPoolSize, limit));
+ }
+
private static ManifestFile addMetadata(TableOperations ops, ManifestFile
manifest) {
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
@@ -654,4 +744,38 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
}
}
}
+
+ protected static class DeleteFileHolder {
+ private final DeleteFile deleteFile;
+ private final Long dataSequenceNumber;
+
+ /**
+ * Wrap a delete file for commit with a given data sequence number.
+ *
+ * @param deleteFile delete file
+ * @param dataSequenceNumber data sequence number to apply
+ */
+ DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) {
+ this.deleteFile = deleteFile;
+ this.dataSequenceNumber = dataSequenceNumber;
+ }
+
+ /**
+ * Wrap a delete file for commit with the latest sequence number.
+ *
+ * @param deleteFile delete file
+ */
+ DeleteFileHolder(DeleteFile deleteFile) {
+ this.deleteFile = deleteFile;
+ this.dataSequenceNumber = null;
+ }
+
+ public DeleteFile deleteFile() {
+ return deleteFile;
+ }
+
+ public Long dataSequenceNumber() {
+ return dataSequenceNumber;
+ }
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java
b/core/src/test/java/org/apache/iceberg/TestBase.java
index e03a1efd51..23fabc2a94 100644
--- a/core/src/test/java/org/apache/iceberg/TestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TestBase.java
@@ -27,6 +27,7 @@ import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -460,6 +461,10 @@ public class TestBase {
}
void validateTableFiles(Table tbl, DataFile... expectedFiles) {
+ validateTableFiles(tbl, Arrays.asList(expectedFiles));
+ }
+
+ void validateTableFiles(Table tbl, Collection<DataFile> expectedFiles) {
Set<CharSequence> expectedFilePaths = Sets.newHashSet();
for (DataFile file : expectedFiles) {
expectedFilePaths.add(file.path());
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index 8125c528d9..b2f19fbd5f 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -42,6 +42,25 @@ public class TestFastAppend extends TestBase {
return Arrays.asList(1, 2, 3);
}
+ @TestTemplate
+ public void testAddManyFiles() {
+ assertThat(listManifestFiles()).as("Table should start empty").isEmpty();
+
+ List<DataFile> dataFiles = Lists.newArrayList();
+
+ for (int ordinal = 0; ordinal < 2 * SnapshotProducer.MIN_FILE_GROUP_SIZE;
ordinal++) {
+ StructLike partition = TestHelpers.Row.of(ordinal % 2);
+ DataFile dataFile = FileGenerationUtil.generateDataFile(table,
partition);
+ dataFiles.add(dataFile);
+ }
+
+ AppendFiles append = table.newAppend();
+ dataFiles.forEach(append::appendFile);
+ append.commit();
+
+ validateTableFiles(table, dataFiles);
+ }
+
@TestTemplate
public void appendNullFile() {
assertThatThrownBy(() -> table.newFastAppend().appendFile(null).commit())
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index abfcb31833..e079f63401 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -33,9 +33,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.TestHelpers.Row;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.TestTemplate;
@@ -62,6 +64,25 @@ public class TestMergeAppend extends TestBase {
.hasMessage("Invalid data file: null");
}
+ @TestTemplate
+ public void testAddManyFiles() {
+ assertThat(listManifestFiles()).as("Table should start empty").isEmpty();
+
+ List<DataFile> dataFiles = Lists.newArrayList();
+
+ for (int ordinal = 0; ordinal < 2 * SnapshotProducer.MIN_FILE_GROUP_SIZE;
ordinal++) {
+ StructLike partition = Row.of(ordinal % 2);
+ DataFile dataFile = FileGenerationUtil.generateDataFile(table,
partition);
+ dataFiles.add(dataFile);
+ }
+
+ AppendFiles append = table.newAppend();
+ dataFiles.forEach(append::appendFile);
+ append.commit();
+
+ validateTableFiles(table, dataFiles);
+ }
+
@TestTemplate
public void testEmptyTableAppend() {
assertThat(listManifestFiles()).isEmpty();
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
new file mode 100644
index 0000000000..52bffdf185
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+
+public class TestSnapshotProducer {
+
+ @Test
+ public void testManifestFileGroupSize() {
+ assertManifestWriterCount(
+ 4 /* worker pool size */,
+ 100 /* file count */,
+ 1 /* manifest writer count */,
+ "Must use 1 writer if file count is small");
+
+ assertManifestWriterCount(
+ 4 /* worker pool size */,
+ SnapshotProducer.MIN_FILE_GROUP_SIZE /* file count */,
+ 1 /* manifest writer count */,
+ "Must use 1 writer if file count matches min group size");
+
+ assertManifestWriterCount(
+ 4 /* worker pool size */,
+ SnapshotProducer.MIN_FILE_GROUP_SIZE + 1 /* file count */,
+ 1 /* manifest writer count */,
+ "Must use 1 writer if file count is slightly above min group size");
+
+ assertManifestWriterCount(
+ 4 /* worker pool size */,
+ (int) (1.25 * SnapshotProducer.MIN_FILE_GROUP_SIZE) /* file count */,
+ 1 /* manifest writer count */,
+ "Must use 1 writer when file count is < 1.5 * min group size");
+
+ assertManifestWriterCount(
+ 4 /* worker pool size */,
+ (int) (1.5 * SnapshotProducer.MIN_FILE_GROUP_SIZE) /* file count */,
+ 2 /* manifest writer count */,
+ "Must use 2 writers when file count is >= 1.5 * min group size");
+
+ assertManifestWriterCount(
+ 3 /* worker pool size */,
+ 100 * SnapshotProducer.MIN_FILE_GROUP_SIZE /* file count */,
+ 3 /* manifest writer count */,
+ "Must limit parallelism to worker pool size when file count is large");
+
+ assertManifestWriterCount(
+ 32 /* worker pool size */,
+ 5 * SnapshotProducer.MIN_FILE_GROUP_SIZE /* file count */,
+ 5 /* manifest writer count */,
+ "Must limit parallelism to avoid tiny manifests");
+ }
+
+ private void assertManifestWriterCount(
+ int workerPoolSize, int fileCount, int expectedManifestWriterCount,
String errMsg) {
+ int writerCount = SnapshotProducer.manifestWriterCount(workerPoolSize,
fileCount);
+
assertThat(writerCount).withFailMessage(errMsg).isEqualTo(expectedManifestWriterCount);
+ }
+}