This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a7a09d43b1 Core: Avoid generating huge manifests during commits (#6335)
a7a09d43b1 is described below
commit a7a09d43b10e4584faf5598c40bc8fc424c402e1
Author: Xianyang Liu <[email protected]>
AuthorDate: Sat Jul 29 02:06:31 2023 +0800
Core: Avoid generating huge manifests during commits (#6335)
Co-authored-by: xianyangliu <[email protected]>
---
.../main/java/org/apache/iceberg/FastAppend.java | 47 +++--
.../apache/iceberg/MergingSnapshotProducer.java | 44 +++--
.../org/apache/iceberg/RollingManifestWriter.java | 150 +++++++++++++++
.../java/org/apache/iceberg/SnapshotProducer.java | 15 ++
.../org/apache/iceberg/TestManifestWriter.java | 201 +++++++++++++++++++++
5 files changed, 419 insertions(+), 38 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 5e5e512841..3079757392 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -49,7 +49,7 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
private final List<DataFile> newFiles = Lists.newArrayList();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests =
Lists.newArrayList();
- private ManifestFile newManifest = null;
+ private List<ManifestFile> newManifests = null;
private boolean hasNewFiles = false;
FastAppend(String tableName, TableOperations ops) {
@@ -143,12 +143,12 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
- List<ManifestFile> newManifests = Lists.newArrayList();
+ List<ManifestFile> manifests = Lists.newArrayList();
try {
- ManifestFile manifest = writeManifest();
- if (manifest != null) {
- newManifests.add(manifest);
+ List<ManifestFile> newWrittenManifests = writeNewManifests();
+ if (newWrittenManifests != null) {
+ manifests.addAll(newWrittenManifests);
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write manifest");
@@ -158,13 +158,13 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
Iterables.transform(
Iterables.concat(appendManifests, rewrittenAppendManifests),
manifest ->
GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
- Iterables.addAll(newManifests, appendManifestsWithMetadata);
+ Iterables.addAll(manifests, appendManifestsWithMetadata);
if (snapshot != null) {
- newManifests.addAll(snapshot.allManifests(ops.io()));
+ manifests.addAll(snapshot.allManifests(ops.io()));
}
- return newManifests;
+ return manifests;
}
@Override
@@ -178,8 +178,17 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
- if (newManifest != null && !committed.contains(newManifest)) {
- deleteFile(newManifest.path());
+ if (newManifests != null) {
+ List<ManifestFile> committedNewManifests = Lists.newArrayList();
+ for (ManifestFile manifest : newManifests) {
+ if (committed.contains(manifest)) {
+ committedNewManifests.add(manifest);
+ } else {
+ deleteFile(manifest.path());
+ }
+ }
+
+ this.newManifests = committedNewManifests;
}
// clean up only rewrittenAppendManifests as they are always owned by the
table
@@ -191,24 +200,24 @@ class FastAppend extends SnapshotProducer<AppendFiles>
implements AppendFiles {
}
}
- private ManifestFile writeManifest() throws IOException {
- if (hasNewFiles && newManifest != null) {
- deleteFile(newManifest.path());
- newManifest = null;
+ private List<ManifestFile> writeNewManifests() throws IOException {
+ if (hasNewFiles && newManifests != null) {
+ newManifests.forEach(file -> deleteFile(file.path()));
+ newManifests = null;
}
- if (newManifest == null && newFiles.size() > 0) {
- ManifestWriter<DataFile> writer = newManifestWriter(spec);
+ if (newManifests == null && newFiles.size() > 0) {
+ RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);
try {
- writer.addAll(newFiles);
+ newFiles.forEach(writer::add);
} finally {
writer.close();
}
- this.newManifest = writer.toManifestFile();
+ this.newManifests = writer.toManifestFiles();
hasNewFiles = false;
}
- return newManifest;
+ return newManifests;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 50270a05f3..cb9a361ab2 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -94,7 +94,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
private PartitionSpec dataSpec;
// cache new data manifests after writing
- private ManifestFile cachedNewDataManifest = null;
+ private List<ManifestFile> cachedNewDataManifests = null;
private boolean hasNewDataFiles = false;
// cache new manifests for delete files
@@ -907,9 +907,17 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
}
private void cleanUncommittedAppends(Set<ManifestFile> committed) {
- if (cachedNewDataManifest != null &&
!committed.contains(cachedNewDataManifest)) {
- deleteFile(cachedNewDataManifest.path());
- this.cachedNewDataManifest = null;
+ if (cachedNewDataManifests != null) {
+ List<ManifestFile> committedNewDataManifests = Lists.newArrayList();
+ for (ManifestFile manifest : cachedNewDataManifests) {
+ if (committed.contains(manifest)) {
+ committedNewDataManifests.add(manifest);
+ } else {
+ deleteFile(manifest.path());
+ }
+ }
+
+ this.cachedNewDataManifests = committedNewDataManifests;
}
ListIterator<ManifestFile> deleteManifestsIterator =
cachedNewDeleteManifests.listIterator();
@@ -952,10 +960,8 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
private Iterable<ManifestFile> prepareNewDataManifests() {
Iterable<ManifestFile> newManifests;
if (newDataFiles.size() > 0) {
- ManifestFile newManifest = newDataFilesAsManifest();
- newManifests =
- Iterables.concat(
- ImmutableList.of(newManifest), appendManifests,
rewrittenAppendManifests);
+ List<ManifestFile> dataFileManifests = newDataFilesAsManifests();
+ newManifests = Iterables.concat(dataFileManifests, appendManifests,
rewrittenAppendManifests);
} else {
newManifests = Iterables.concat(appendManifests,
rewrittenAppendManifests);
}
@@ -965,18 +971,18 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
manifest ->
GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
}
- private ManifestFile newDataFilesAsManifest() {
- if (hasNewDataFiles && cachedNewDataManifest != null) {
- deleteFile(cachedNewDataManifest.path());
- cachedNewDataManifest = null;
+ private List<ManifestFile> newDataFilesAsManifests() {
+ if (hasNewDataFiles && cachedNewDataManifests != null) {
+ cachedNewDataManifests.forEach(file -> deleteFile(file.path()));
+ cachedNewDataManifests = null;
}
- if (cachedNewDataManifest == null) {
+ if (cachedNewDataManifests == null) {
try {
- ManifestWriter<DataFile> writer = newManifestWriter(dataSpec());
+ RollingManifestWriter<DataFile> writer =
newRollingManifestWriter(dataSpec());
try {
if (newDataFilesDataSequenceNumber == null) {
- writer.addAll(newDataFiles);
+ newDataFiles.forEach(writer::add);
} else {
newDataFiles.forEach(f -> writer.add(f,
newDataFilesDataSequenceNumber));
}
@@ -984,14 +990,14 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
writer.close();
}
- this.cachedNewDataManifest = writer.toManifestFile();
+ this.cachedNewDataManifests = writer.toManifestFiles();
this.hasNewDataFiles = false;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
}
- return cachedNewDataManifest;
+ return cachedNewDataManifests;
}
private Iterable<ManifestFile> prepareDeleteManifests() {
@@ -1017,7 +1023,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
(specId, deleteFiles) -> {
PartitionSpec spec = ops.current().spec(specId);
try {
- ManifestWriter<DeleteFile> writer =
newDeleteManifestWriter(spec);
+ RollingManifestWriter<DeleteFile> writer =
newRollingDeleteManifestWriter(spec);
try {
deleteFiles.forEach(
df -> {
@@ -1030,7 +1036,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
} finally {
writer.close();
}
- cachedNewDeleteManifests.add(writer.toManifestFile());
+ cachedNewDeleteManifests.addAll(writer.toManifestFiles());
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest
writer");
}
diff --git a/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java
b/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java
new file mode 100644
index 0000000000..5480415eee
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/** As opposed to {@link ManifestWriter}, a rolling writer could produce
multiple manifest files. */
+public class RollingManifestWriter<F extends ContentFile<F>> implements
Closeable {
+ private static final int ROWS_DIVISOR = 250;
+
+ private final Supplier<ManifestWriter<F>> manifestWriterSupplier;
+ private final long targetFileSizeInBytes;
+ private final List<ManifestFile> manifestFiles;
+
+ private long currentFileRows = 0;
+ private ManifestWriter<F> currentWriter = null;
+
+ private boolean closed = false;
+
+ public RollingManifestWriter(
+ Supplier<ManifestWriter<F>> manifestWriterSupplier, long
targetFileSizeInBytes) {
+ this.manifestWriterSupplier = manifestWriterSupplier;
+ this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.manifestFiles = Lists.newArrayList();
+ }
+
+ /**
+ * Add an added entry for a file.
+ *
+ * <p>The entry's snapshot ID will be this manifest's snapshot ID. The data
and file sequence
+ * numbers will be assigned at commit.
+ *
+ * @param addedFile a data file
+ */
+ public void add(F addedFile) {
+ currentWriter().add(addedFile);
+ currentFileRows++;
+ }
+
+ /**
+ * Add an added entry for a file with a specific sequence number.
+ *
+ * <p>The entry's snapshot ID will be this manifest's snapshot ID. The
entry's data sequence
+ * number will be the provided data sequence number. The entry's file
sequence number will be
+ * assigned at commit.
+ *
+ * @param addedFile a data file
+ * @param dataSequenceNumber a data sequence number for the file
+ */
+ public void add(F addedFile, long dataSequenceNumber) {
+ currentWriter().add(addedFile, dataSequenceNumber);
+ currentFileRows++;
+ }
+
+ /**
+ * Add an existing entry for a file.
+ *
+ * <p>The original data and file sequence numbers, snapshot ID, which were
assigned at commit,
+ * must be preserved when adding an existing entry.
+ *
+ * @param existingFile a file
+ * @param fileSnapshotId snapshot ID when the data file was added to the
table
+ * @param dataSequenceNumber a data sequence number of the file (assigned
when the file was added)
+ * @param fileSequenceNumber a file sequence number (assigned when the file
was added)
+ */
+ public void existing(
+ F existingFile, long fileSnapshotId, long dataSequenceNumber, Long
fileSequenceNumber) {
+ currentWriter().existing(existingFile, fileSnapshotId, dataSequenceNumber,
fileSequenceNumber);
+ currentFileRows++;
+ }
+
+ /**
+ * Add a delete entry for a file.
+ *
+ * <p>The entry's snapshot ID will be this manifest's snapshot ID. However,
the original data and
+ * file sequence numbers of the file must be preserved when the file is
marked as deleted.
+ *
+ * @param deletedFile a file
+ * @param dataSequenceNumber a data sequence number of the file (assigned
when the file was added)
+ * @param fileSequenceNumber a file sequence number (assigned when the file
was added)
+ */
+ public void delete(F deletedFile, long dataSequenceNumber, Long
fileSequenceNumber) {
+ currentWriter().delete(deletedFile, dataSequenceNumber,
fileSequenceNumber);
+ currentFileRows++;
+ }
+
+ private ManifestWriter<F> currentWriter() {
+ if (currentWriter == null) {
+ this.currentWriter = manifestWriterSupplier.get();
+ } else if (shouldRollToNewFile()) {
+ closeCurrentWriter();
+ this.currentWriter = manifestWriterSupplier.get();
+ }
+
+ return currentWriter;
+ }
+
+ private boolean shouldRollToNewFile() {
+ return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >=
targetFileSizeInBytes;
+ }
+
+ private void closeCurrentWriter() {
+ if (currentWriter != null) {
+ try {
+ currentWriter.close();
+ ManifestFile currentFile = currentWriter.toManifestFile();
+ manifestFiles.add(currentFile);
+ this.currentWriter = null;
+ this.currentFileRows = 0;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close current writer", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closeCurrentWriter();
+ this.closed = true;
+ }
+ }
+
+ public List<ManifestFile> toManifestFiles() {
+ Preconditions.checkState(closed, "Cannot get ManifestFile list from
unclosed writer");
+ return manifestFiles;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 226388a2b0..5a6a01ea06 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -26,6 +26,8 @@ import static
org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -85,6 +87,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
private final AtomicInteger manifestCount = new AtomicInteger(0);
private final AtomicInteger attempt = new AtomicInteger(0);
private final List<String> manifestLists = Lists.newArrayList();
+ private final long targetManifestSizeBytes;
private MetricsReporter reporter = LoggingMetricsReporter.instance();
private volatile Long snapshotId = null;
private TableMetadata base;
@@ -107,6 +110,9 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
}
return addMetadata(ops, file);
});
+ this.targetManifestSizeBytes =
+ ops.current()
+ .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES,
MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
}
protected abstract ThisT self();
@@ -494,6 +500,15 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
ops.current().formatVersion(), spec, newManifestOutput(),
snapshotId());
}
+ protected RollingManifestWriter<DataFile>
newRollingManifestWriter(PartitionSpec spec) {
+ return new RollingManifestWriter<>(() -> newManifestWriter(spec),
targetManifestSizeBytes);
+ }
+
+ protected RollingManifestWriter<DeleteFile>
newRollingDeleteManifestWriter(PartitionSpec spec) {
+ return new RollingManifestWriter<>(
+ () -> newDeleteManifestWriter(spec), targetManifestSizeBytes);
+ }
+
protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
return ManifestFiles.read(manifest, ops.io(), ops.current().specsById());
}
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
index 245ad1b817..17a41f418a 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
@@ -20,12 +20,15 @@ package org.apache.iceberg;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
@@ -43,6 +46,9 @@ public class TestManifestWriter extends TableTestBase {
super(formatVersion);
}
+ private static final int FILE_SIZE_CHECK_ROWS_DIVISOR = 250;
+ private static final long SMALL_FILE_SIZE = 10L;
+
@Test
public void testManifestStats() throws IOException {
ManifestFile manifest =
@@ -218,6 +224,166 @@ public class TestManifestWriter extends TableTestBase {
statuses(Status.EXISTING, Status.EXISTING));
}
+ @Test
+ public void testRollingManifestWriterNoRecords() throws IOException {
+ RollingManifestWriter<DataFile> writer =
newRollingWriteManifest(SMALL_FILE_SIZE);
+
+ writer.close();
+ Assertions.assertThat(writer.toManifestFiles()).isEmpty();
+
+ writer.close();
+ Assertions.assertThat(writer.toManifestFiles()).isEmpty();
+ }
+
+ @Test
+ public void testRollingDeleteManifestWriterNoRecords() throws IOException {
+ Assumptions.assumeThat(formatVersion).isGreaterThan(1);
+ RollingManifestWriter<DeleteFile> writer =
newRollingWriteDeleteManifest(SMALL_FILE_SIZE);
+
+ writer.close();
+ Assertions.assertThat(writer.toManifestFiles()).isEmpty();
+
+ writer.close();
+ Assertions.assertThat(writer.toManifestFiles()).isEmpty();
+ }
+
+ @Test
+ public void testRollingManifestWriterSplitFiles() throws IOException {
+ RollingManifestWriter<DataFile> writer =
newRollingWriteManifest(SMALL_FILE_SIZE);
+
+ int[] addedFileCounts = new int[3];
+ int[] existingFileCounts = new int[3];
+ int[] deletedFileCounts = new int[3];
+ long[] addedRowCounts = new long[3];
+ long[] existingRowCounts = new long[3];
+ long[] deletedRowCounts = new long[3];
+
+ for (int i = 0; i < FILE_SIZE_CHECK_ROWS_DIVISOR * 3; i++) {
+ int type = i % 3;
+ int fileIndex = i / FILE_SIZE_CHECK_ROWS_DIVISOR;
+ if (type == 0) {
+ writer.add(newFile(i));
+ addedFileCounts[fileIndex] += 1;
+ addedRowCounts[fileIndex] += i;
+ } else if (type == 1) {
+ writer.existing(newFile(i), 1, 1, null);
+ existingFileCounts[fileIndex] += 1;
+ existingRowCounts[fileIndex] += i;
+ } else {
+ writer.delete(newFile(i), 1, null);
+ deletedFileCounts[fileIndex] += 1;
+ deletedRowCounts[fileIndex] += i;
+ }
+ }
+
+ writer.close();
+ List<ManifestFile> manifestFiles = writer.toManifestFiles();
+ Assertions.assertThat(manifestFiles.size()).isEqualTo(3);
+
+ checkManifests(
+ manifestFiles,
+ addedFileCounts,
+ existingFileCounts,
+ deletedFileCounts,
+ addedRowCounts,
+ existingRowCounts,
+ deletedRowCounts);
+
+ writer.close();
+ manifestFiles = writer.toManifestFiles();
+ Assertions.assertThat(manifestFiles.size()).isEqualTo(3);
+
+ checkManifests(
+ manifestFiles,
+ addedFileCounts,
+ existingFileCounts,
+ deletedFileCounts,
+ addedRowCounts,
+ existingRowCounts,
+ deletedRowCounts);
+ }
+
+ @Test
+ public void testRollingDeleteManifestWriterSplitFiles() throws IOException {
+ Assumptions.assumeThat(formatVersion).isGreaterThan(1);
+ RollingManifestWriter<DeleteFile> writer =
newRollingWriteDeleteManifest(SMALL_FILE_SIZE);
+
+ int[] addedFileCounts = new int[3];
+ int[] existingFileCounts = new int[3];
+ int[] deletedFileCounts = new int[3];
+ long[] addedRowCounts = new long[3];
+ long[] existingRowCounts = new long[3];
+ long[] deletedRowCounts = new long[3];
+ for (int i = 0; i < 3 * FILE_SIZE_CHECK_ROWS_DIVISOR; i++) {
+ int type = i % 3;
+ int fileIndex = i / FILE_SIZE_CHECK_ROWS_DIVISOR;
+ if (type == 0) {
+ writer.add(newPosDeleteFile(i));
+ addedFileCounts[fileIndex] += 1;
+ addedRowCounts[fileIndex] += i;
+ } else if (type == 1) {
+ writer.existing(newPosDeleteFile(i), 1, 1, null);
+ existingFileCounts[fileIndex] += 1;
+ existingRowCounts[fileIndex] += i;
+ } else {
+ writer.delete(newPosDeleteFile(i), 1, null);
+ deletedFileCounts[fileIndex] += 1;
+ deletedRowCounts[fileIndex] += i;
+ }
+ }
+
+ writer.close();
+ List<ManifestFile> manifestFiles = writer.toManifestFiles();
+ Assertions.assertThat(manifestFiles.size()).isEqualTo(3);
+
+ checkManifests(
+ manifestFiles,
+ addedFileCounts,
+ existingFileCounts,
+ deletedFileCounts,
+ addedRowCounts,
+ existingRowCounts,
+ deletedRowCounts);
+
+ writer.close();
+ manifestFiles = writer.toManifestFiles();
+ Assertions.assertThat(manifestFiles.size()).isEqualTo(3);
+
+ checkManifests(
+ manifestFiles,
+ addedFileCounts,
+ existingFileCounts,
+ deletedFileCounts,
+ addedRowCounts,
+ existingRowCounts,
+ deletedRowCounts);
+ }
+
+ private void checkManifests(
+ List<ManifestFile> manifests,
+ int[] addedFileCounts,
+ int[] existingFileCounts,
+ int[] deletedFileCounts,
+ long[] addedRowCounts,
+ long[] existingRowCounts,
+ long[] deletedRowCounts) {
+ for (int i = 0; i < manifests.size(); i++) {
+ ManifestFile manifest = manifests.get(i);
+
+ Assertions.assertThat(manifest.hasAddedFiles()).isTrue();
+
Assertions.assertThat(manifest.addedFilesCount()).isEqualTo(addedFileCounts[i]);
+
Assertions.assertThat(manifest.addedRowsCount()).isEqualTo(addedRowCounts[i]);
+
+ Assertions.assertThat(manifest.hasExistingFiles()).isTrue();
+
Assertions.assertThat(manifest.existingFilesCount()).isEqualTo(existingFileCounts[i]);
+
Assertions.assertThat(manifest.existingRowsCount()).isEqualTo(existingRowCounts[i]);
+
+ Assertions.assertThat(manifest.hasDeletedFiles()).isTrue();
+
Assertions.assertThat(manifest.deletedFilesCount()).isEqualTo(deletedFileCounts[i]);
+
Assertions.assertThat(manifest.deletedRowsCount()).isEqualTo(deletedRowCounts[i]);
+ }
+ }
+
private DataFile newFile(long recordCount) {
return newFile(recordCount, null);
}
@@ -234,4 +400,39 @@ public class TestManifestWriter extends TableTestBase {
}
return builder.build();
}
+
+ private DeleteFile newPosDeleteFile(long recordCount) {
+ return FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/delete-" + UUID.randomUUID() + ".parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(recordCount)
+ .build();
+ }
+
+ private RollingManifestWriter<DataFile> newRollingWriteManifest(long
targetFileSize) {
+ return new RollingManifestWriter<>(
+ () -> {
+ OutputFile newManifestFile = newManifestFile();
+ return ManifestFiles.write(formatVersion, SPEC, newManifestFile,
null);
+ },
+ targetFileSize);
+ }
+
+ private RollingManifestWriter<DeleteFile> newRollingWriteDeleteManifest(long
targetFileSize) {
+ return new RollingManifestWriter<>(
+ () -> {
+ OutputFile newManifestFile = newManifestFile();
+ return ManifestFiles.writeDeleteManifest(formatVersion, SPEC,
newManifestFile, null);
+ },
+ targetFileSize);
+ }
+
+ private OutputFile newManifestFile() {
+ try {
+ return
Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}