This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a7a09d43b1 Core: Avoid generating huge manifests during commits (#6335)
a7a09d43b1 is described below

commit a7a09d43b10e4584faf5598c40bc8fc424c402e1
Author: Xianyang Liu <[email protected]>
AuthorDate: Sat Jul 29 02:06:31 2023 +0800

    Core: Avoid generating huge manifests during commits (#6335)
    
    Co-authored-by: xianyangliu <[email protected]>
---
 .../main/java/org/apache/iceberg/FastAppend.java   |  47 +++--
 .../apache/iceberg/MergingSnapshotProducer.java    |  44 +++--
 .../org/apache/iceberg/RollingManifestWriter.java  | 150 +++++++++++++++
 .../java/org/apache/iceberg/SnapshotProducer.java  |  15 ++
 .../org/apache/iceberg/TestManifestWriter.java     | 201 +++++++++++++++++++++
 5 files changed, 419 insertions(+), 38 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java 
b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 5e5e512841..3079757392 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -49,7 +49,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
   private final List<DataFile> newFiles = Lists.newArrayList();
   private final List<ManifestFile> appendManifests = Lists.newArrayList();
   private final List<ManifestFile> rewrittenAppendManifests = 
Lists.newArrayList();
-  private ManifestFile newManifest = null;
+  private List<ManifestFile> newManifests = null;
   private boolean hasNewFiles = false;
 
   FastAppend(String tableName, TableOperations ops) {
@@ -143,12 +143,12 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
 
   @Override
   public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
-    List<ManifestFile> newManifests = Lists.newArrayList();
+    List<ManifestFile> manifests = Lists.newArrayList();
 
     try {
-      ManifestFile manifest = writeManifest();
-      if (manifest != null) {
-        newManifests.add(manifest);
+      List<ManifestFile> newWrittenManifests = writeNewManifests();
+      if (newWrittenManifests != null) {
+        manifests.addAll(newWrittenManifests);
       }
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to write manifest");
@@ -158,13 +158,13 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
         Iterables.transform(
             Iterables.concat(appendManifests, rewrittenAppendManifests),
             manifest -> 
GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
-    Iterables.addAll(newManifests, appendManifestsWithMetadata);
+    Iterables.addAll(manifests, appendManifestsWithMetadata);
 
     if (snapshot != null) {
-      newManifests.addAll(snapshot.allManifests(ops.io()));
+      manifests.addAll(snapshot.allManifests(ops.io()));
     }
 
-    return newManifests;
+    return manifests;
   }
 
   @Override
@@ -178,8 +178,17 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
 
   @Override
   protected void cleanUncommitted(Set<ManifestFile> committed) {
-    if (newManifest != null && !committed.contains(newManifest)) {
-      deleteFile(newManifest.path());
+    if (newManifests != null) {
+      List<ManifestFile> committedNewManifests = Lists.newArrayList();
+      for (ManifestFile manifest : newManifests) {
+        if (committed.contains(manifest)) {
+          committedNewManifests.add(manifest);
+        } else {
+          deleteFile(manifest.path());
+        }
+      }
+
+      this.newManifests = committedNewManifests;
     }
 
     // clean up only rewrittenAppendManifests as they are always owned by the 
table
@@ -191,24 +200,24 @@ class FastAppend extends SnapshotProducer<AppendFiles> 
implements AppendFiles {
     }
   }
 
-  private ManifestFile writeManifest() throws IOException {
-    if (hasNewFiles && newManifest != null) {
-      deleteFile(newManifest.path());
-      newManifest = null;
+  private List<ManifestFile> writeNewManifests() throws IOException {
+    if (hasNewFiles && newManifests != null) {
+      newManifests.forEach(file -> deleteFile(file.path()));
+      newManifests = null;
     }
 
-    if (newManifest == null && newFiles.size() > 0) {
-      ManifestWriter<DataFile> writer = newManifestWriter(spec);
+    if (newManifests == null && newFiles.size() > 0) {
+      RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);
       try {
-        writer.addAll(newFiles);
+        newFiles.forEach(writer::add);
       } finally {
         writer.close();
       }
 
-      this.newManifest = writer.toManifestFile();
+      this.newManifests = writer.toManifestFiles();
       hasNewFiles = false;
     }
 
-    return newManifest;
+    return newManifests;
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 50270a05f3..cb9a361ab2 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -94,7 +94,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   private PartitionSpec dataSpec;
 
   // cache new data manifests after writing
-  private ManifestFile cachedNewDataManifest = null;
+  private List<ManifestFile> cachedNewDataManifests = null;
   private boolean hasNewDataFiles = false;
 
   // cache new manifests for delete files
@@ -907,9 +907,17 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   }
 
   private void cleanUncommittedAppends(Set<ManifestFile> committed) {
-    if (cachedNewDataManifest != null && 
!committed.contains(cachedNewDataManifest)) {
-      deleteFile(cachedNewDataManifest.path());
-      this.cachedNewDataManifest = null;
+    if (cachedNewDataManifests != null) {
+      List<ManifestFile> committedNewDataManifests = Lists.newArrayList();
+      for (ManifestFile manifest : cachedNewDataManifests) {
+        if (committed.contains(manifest)) {
+          committedNewDataManifests.add(manifest);
+        } else {
+          deleteFile(manifest.path());
+        }
+      }
+
+      this.cachedNewDataManifests = committedNewDataManifests;
     }
 
     ListIterator<ManifestFile> deleteManifestsIterator = 
cachedNewDeleteManifests.listIterator();
@@ -952,10 +960,8 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   private Iterable<ManifestFile> prepareNewDataManifests() {
     Iterable<ManifestFile> newManifests;
     if (newDataFiles.size() > 0) {
-      ManifestFile newManifest = newDataFilesAsManifest();
-      newManifests =
-          Iterables.concat(
-              ImmutableList.of(newManifest), appendManifests, 
rewrittenAppendManifests);
+      List<ManifestFile> dataFileManifests = newDataFilesAsManifests();
+      newManifests = Iterables.concat(dataFileManifests, appendManifests, 
rewrittenAppendManifests);
     } else {
       newManifests = Iterables.concat(appendManifests, 
rewrittenAppendManifests);
     }
@@ -965,18 +971,18 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
         manifest -> 
GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
   }
 
-  private ManifestFile newDataFilesAsManifest() {
-    if (hasNewDataFiles && cachedNewDataManifest != null) {
-      deleteFile(cachedNewDataManifest.path());
-      cachedNewDataManifest = null;
+  private List<ManifestFile> newDataFilesAsManifests() {
+    if (hasNewDataFiles && cachedNewDataManifests != null) {
+      cachedNewDataManifests.forEach(file -> deleteFile(file.path()));
+      cachedNewDataManifests = null;
     }
 
-    if (cachedNewDataManifest == null) {
+    if (cachedNewDataManifests == null) {
       try {
-        ManifestWriter<DataFile> writer = newManifestWriter(dataSpec());
+        RollingManifestWriter<DataFile> writer = 
newRollingManifestWriter(dataSpec());
         try {
           if (newDataFilesDataSequenceNumber == null) {
-            writer.addAll(newDataFiles);
+            newDataFiles.forEach(writer::add);
           } else {
             newDataFiles.forEach(f -> writer.add(f, 
newDataFilesDataSequenceNumber));
           }
@@ -984,14 +990,14 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
           writer.close();
         }
 
-        this.cachedNewDataManifest = writer.toManifestFile();
+        this.cachedNewDataManifests = writer.toManifestFiles();
         this.hasNewDataFiles = false;
       } catch (IOException e) {
         throw new RuntimeIOException(e, "Failed to close manifest writer");
       }
     }
 
-    return cachedNewDataManifest;
+    return cachedNewDataManifests;
   }
 
   private Iterable<ManifestFile> prepareDeleteManifests() {
@@ -1017,7 +1023,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
           (specId, deleteFiles) -> {
             PartitionSpec spec = ops.current().spec(specId);
             try {
-              ManifestWriter<DeleteFile> writer = 
newDeleteManifestWriter(spec);
+              RollingManifestWriter<DeleteFile> writer = 
newRollingDeleteManifestWriter(spec);
               try {
                 deleteFiles.forEach(
                     df -> {
@@ -1030,7 +1036,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
               } finally {
                 writer.close();
               }
-              cachedNewDeleteManifests.add(writer.toManifestFile());
+              cachedNewDeleteManifests.addAll(writer.toManifestFiles());
             } catch (IOException e) {
               throw new RuntimeIOException(e, "Failed to close manifest 
writer");
             }
diff --git a/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java 
b/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java
new file mode 100644
index 0000000000..5480415eee
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/** As opposed to {@link ManifestWriter}, a rolling writer could produce 
multiple manifest files. */
+public class RollingManifestWriter<F extends ContentFile<F>> implements 
Closeable {
+  private static final int ROWS_DIVISOR = 250;
+
+  private final Supplier<ManifestWriter<F>> manifestWriterSupplier;
+  private final long targetFileSizeInBytes;
+  private final List<ManifestFile> manifestFiles;
+
+  private long currentFileRows = 0;
+  private ManifestWriter<F> currentWriter = null;
+
+  private boolean closed = false;
+
+  public RollingManifestWriter(
+      Supplier<ManifestWriter<F>> manifestWriterSupplier, long 
targetFileSizeInBytes) {
+    this.manifestWriterSupplier = manifestWriterSupplier;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.manifestFiles = Lists.newArrayList();
+  }
+
+  /**
+   * Add an added entry for a file.
+   *
+   * <p>The entry's snapshot ID will be this manifest's snapshot ID. The data 
and file sequence
+   * numbers will be assigned at commit.
+   *
+   * @param addedFile a data file
+   */
+  public void add(F addedFile) {
+    currentWriter().add(addedFile);
+    currentFileRows++;
+  }
+
+  /**
+   * Add an added entry for a file with a specific sequence number.
+   *
+   * <p>The entry's snapshot ID will be this manifest's snapshot ID. The 
entry's data sequence
+   * number will be the provided data sequence number. The entry's file 
sequence number will be
+   * assigned at commit.
+   *
+   * @param addedFile a data file
+   * @param dataSequenceNumber a data sequence number for the file
+   */
+  public void add(F addedFile, long dataSequenceNumber) {
+    currentWriter().add(addedFile, dataSequenceNumber);
+    currentFileRows++;
+  }
+
+  /**
+   * Add an existing entry for a file.
+   *
+   * <p>The original data and file sequence numbers, snapshot ID, which were 
assigned at commit,
+   * must be preserved when adding an existing entry.
+   *
+   * @param existingFile a file
+   * @param fileSnapshotId snapshot ID when the data file was added to the 
table
+   * @param dataSequenceNumber a data sequence number of the file (assigned 
when the file was added)
+   * @param fileSequenceNumber a file sequence number (assigned when the file 
was added)
+   */
+  public void existing(
+      F existingFile, long fileSnapshotId, long dataSequenceNumber, Long 
fileSequenceNumber) {
+    currentWriter().existing(existingFile, fileSnapshotId, dataSequenceNumber, 
fileSequenceNumber);
+    currentFileRows++;
+  }
+
+  /**
+   * Add a delete entry for a file.
+   *
+   * <p>The entry's snapshot ID will be this manifest's snapshot ID. However, 
the original data and
+   * file sequence numbers of the file must be preserved when the file is 
marked as deleted.
+   *
+   * @param deletedFile a file
+   * @param dataSequenceNumber a data sequence number of the file (assigned 
when the file was added)
+   * @param fileSequenceNumber a file sequence number (assigned when the file 
was added)
+   */
+  public void delete(F deletedFile, long dataSequenceNumber, Long 
fileSequenceNumber) {
+    currentWriter().delete(deletedFile, dataSequenceNumber, 
fileSequenceNumber);
+    currentFileRows++;
+  }
+
+  private ManifestWriter<F> currentWriter() {
+    if (currentWriter == null) {
+      this.currentWriter = manifestWriterSupplier.get();
+    } else if (shouldRollToNewFile()) {
+      closeCurrentWriter();
+      this.currentWriter = manifestWriterSupplier.get();
+    }
+
+    return currentWriter;
+  }
+
+  private boolean shouldRollToNewFile() {
+    return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= 
targetFileSizeInBytes;
+  }
+
+  private void closeCurrentWriter() {
+    if (currentWriter != null) {
+      try {
+        currentWriter.close();
+        ManifestFile currentFile = currentWriter.toManifestFile();
+        manifestFiles.add(currentFile);
+        this.currentWriter = null;
+        this.currentFileRows = 0;
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to close current writer", e);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closeCurrentWriter();
+      this.closed = true;
+    }
+  }
+
+  public List<ManifestFile> toManifestFiles() {
+    Preconditions.checkState(closed, "Cannot get ManifestFile list from 
unclosed writer");
+    return manifestFiles;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 226388a2b0..5a6a01ea06 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -26,6 +26,8 @@ import static 
org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
 import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
 import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
 import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -85,6 +87,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   private final AtomicInteger manifestCount = new AtomicInteger(0);
   private final AtomicInteger attempt = new AtomicInteger(0);
   private final List<String> manifestLists = Lists.newArrayList();
+  private final long targetManifestSizeBytes;
   private MetricsReporter reporter = LoggingMetricsReporter.instance();
   private volatile Long snapshotId = null;
   private TableMetadata base;
@@ -107,6 +110,9 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
                   }
                   return addMetadata(ops, file);
                 });
+    this.targetManifestSizeBytes =
+        ops.current()
+            .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, 
MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
   }
 
   protected abstract ThisT self();
@@ -494,6 +500,15 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
         ops.current().formatVersion(), spec, newManifestOutput(), 
snapshotId());
   }
 
+  protected RollingManifestWriter<DataFile> 
newRollingManifestWriter(PartitionSpec spec) {
+    return new RollingManifestWriter<>(() -> newManifestWriter(spec), 
targetManifestSizeBytes);
+  }
+
+  protected RollingManifestWriter<DeleteFile> 
newRollingDeleteManifestWriter(PartitionSpec spec) {
+    return new RollingManifestWriter<>(
+        () -> newDeleteManifestWriter(spec), targetManifestSizeBytes);
+  }
+
   protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
     return ManifestFiles.read(manifest, ops.io(), ops.current().specsById());
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java 
b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
index 245ad1b817..17a41f418a 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
@@ -20,12 +20,15 @@ package org.apache.iceberg;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.UUID;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
@@ -43,6 +46,9 @@ public class TestManifestWriter extends TableTestBase {
     super(formatVersion);
   }
 
+  private static final int FILE_SIZE_CHECK_ROWS_DIVISOR = 250;
+  private static final long SMALL_FILE_SIZE = 10L;
+
   @Test
   public void testManifestStats() throws IOException {
     ManifestFile manifest =
@@ -218,6 +224,166 @@ public class TestManifestWriter extends TableTestBase {
         statuses(Status.EXISTING, Status.EXISTING));
   }
 
+  @Test
+  public void testRollingManifestWriterNoRecords() throws IOException {
+    RollingManifestWriter<DataFile> writer = 
newRollingWriteManifest(SMALL_FILE_SIZE);
+
+    writer.close();
+    Assertions.assertThat(writer.toManifestFiles()).isEmpty();
+
+    writer.close();
+    Assertions.assertThat(writer.toManifestFiles()).isEmpty();
+  }
+
+  @Test
+  public void testRollingDeleteManifestWriterNoRecords() throws IOException {
+    Assumptions.assumeThat(formatVersion).isGreaterThan(1);
+    RollingManifestWriter<DeleteFile> writer = 
newRollingWriteDeleteManifest(SMALL_FILE_SIZE);
+
+    writer.close();
+    Assertions.assertThat(writer.toManifestFiles()).isEmpty();
+
+    writer.close();
+    Assertions.assertThat(writer.toManifestFiles()).isEmpty();
+  }
+
+  @Test
+  public void testRollingManifestWriterSplitFiles() throws IOException {
+    RollingManifestWriter<DataFile> writer = 
newRollingWriteManifest(SMALL_FILE_SIZE);
+
+    int[] addedFileCounts = new int[3];
+    int[] existingFileCounts = new int[3];
+    int[] deletedFileCounts = new int[3];
+    long[] addedRowCounts = new long[3];
+    long[] existingRowCounts = new long[3];
+    long[] deletedRowCounts = new long[3];
+
+    for (int i = 0; i < FILE_SIZE_CHECK_ROWS_DIVISOR * 3; i++) {
+      int type = i % 3;
+      int fileIndex = i / FILE_SIZE_CHECK_ROWS_DIVISOR;
+      if (type == 0) {
+        writer.add(newFile(i));
+        addedFileCounts[fileIndex] += 1;
+        addedRowCounts[fileIndex] += i;
+      } else if (type == 1) {
+        writer.existing(newFile(i), 1, 1, null);
+        existingFileCounts[fileIndex] += 1;
+        existingRowCounts[fileIndex] += i;
+      } else {
+        writer.delete(newFile(i), 1, null);
+        deletedFileCounts[fileIndex] += 1;
+        deletedRowCounts[fileIndex] += i;
+      }
+    }
+
+    writer.close();
+    List<ManifestFile> manifestFiles = writer.toManifestFiles();
+    Assertions.assertThat(manifestFiles.size()).isEqualTo(3);
+
+    checkManifests(
+        manifestFiles,
+        addedFileCounts,
+        existingFileCounts,
+        deletedFileCounts,
+        addedRowCounts,
+        existingRowCounts,
+        deletedRowCounts);
+
+    writer.close();
+    manifestFiles = writer.toManifestFiles();
+    Assertions.assertThat(manifestFiles.size()).isEqualTo(3);
+
+    checkManifests(
+        manifestFiles,
+        addedFileCounts,
+        existingFileCounts,
+        deletedFileCounts,
+        addedRowCounts,
+        existingRowCounts,
+        deletedRowCounts);
+  }
+
+  @Test
+  public void testRollingDeleteManifestWriterSplitFiles() throws IOException {
+    Assumptions.assumeThat(formatVersion).isGreaterThan(1);
+    RollingManifestWriter<DeleteFile> writer = 
newRollingWriteDeleteManifest(SMALL_FILE_SIZE);
+
+    int[] addedFileCounts = new int[3];
+    int[] existingFileCounts = new int[3];
+    int[] deletedFileCounts = new int[3];
+    long[] addedRowCounts = new long[3];
+    long[] existingRowCounts = new long[3];
+    long[] deletedRowCounts = new long[3];
+    for (int i = 0; i < 3 * FILE_SIZE_CHECK_ROWS_DIVISOR; i++) {
+      int type = i % 3;
+      int fileIndex = i / FILE_SIZE_CHECK_ROWS_DIVISOR;
+      if (type == 0) {
+        writer.add(newPosDeleteFile(i));
+        addedFileCounts[fileIndex] += 1;
+        addedRowCounts[fileIndex] += i;
+      } else if (type == 1) {
+        writer.existing(newPosDeleteFile(i), 1, 1, null);
+        existingFileCounts[fileIndex] += 1;
+        existingRowCounts[fileIndex] += i;
+      } else {
+        writer.delete(newPosDeleteFile(i), 1, null);
+        deletedFileCounts[fileIndex] += 1;
+        deletedRowCounts[fileIndex] += i;
+      }
+    }
+
+    writer.close();
+    List<ManifestFile> manifestFiles = writer.toManifestFiles();
+    Assertions.assertThat(manifestFiles.size()).isEqualTo(3);
+
+    checkManifests(
+        manifestFiles,
+        addedFileCounts,
+        existingFileCounts,
+        deletedFileCounts,
+        addedRowCounts,
+        existingRowCounts,
+        deletedRowCounts);
+
+    writer.close();
+    manifestFiles = writer.toManifestFiles();
+    Assertions.assertThat(manifestFiles.size()).isEqualTo(3);
+
+    checkManifests(
+        manifestFiles,
+        addedFileCounts,
+        existingFileCounts,
+        deletedFileCounts,
+        addedRowCounts,
+        existingRowCounts,
+        deletedRowCounts);
+  }
+
+  private void checkManifests(
+      List<ManifestFile> manifests,
+      int[] addedFileCounts,
+      int[] existingFileCounts,
+      int[] deletedFileCounts,
+      long[] addedRowCounts,
+      long[] existingRowCounts,
+      long[] deletedRowCounts) {
+    for (int i = 0; i < manifests.size(); i++) {
+      ManifestFile manifest = manifests.get(i);
+
+      Assertions.assertThat(manifest.hasAddedFiles()).isTrue();
+      
Assertions.assertThat(manifest.addedFilesCount()).isEqualTo(addedFileCounts[i]);
+      
Assertions.assertThat(manifest.addedRowsCount()).isEqualTo(addedRowCounts[i]);
+
+      Assertions.assertThat(manifest.hasExistingFiles()).isTrue();
+      
Assertions.assertThat(manifest.existingFilesCount()).isEqualTo(existingFileCounts[i]);
+      
Assertions.assertThat(manifest.existingRowsCount()).isEqualTo(existingRowCounts[i]);
+
+      Assertions.assertThat(manifest.hasDeletedFiles()).isTrue();
+      
Assertions.assertThat(manifest.deletedFilesCount()).isEqualTo(deletedFileCounts[i]);
+      
Assertions.assertThat(manifest.deletedRowsCount()).isEqualTo(deletedRowCounts[i]);
+    }
+  }
+
   private DataFile newFile(long recordCount) {
     return newFile(recordCount, null);
   }
@@ -234,4 +400,39 @@ public class TestManifestWriter extends TableTestBase {
     }
     return builder.build();
   }
+
+  private DeleteFile newPosDeleteFile(long recordCount) {
+    return FileMetadata.deleteFileBuilder(SPEC)
+        .ofPositionDeletes()
+        .withPath("/path/to/delete-" + UUID.randomUUID() + ".parquet")
+        .withFileSizeInBytes(10)
+        .withRecordCount(recordCount)
+        .build();
+  }
+
+  private RollingManifestWriter<DataFile> newRollingWriteManifest(long 
targetFileSize) {
+    return new RollingManifestWriter<>(
+        () -> {
+          OutputFile newManifestFile = newManifestFile();
+          return ManifestFiles.write(formatVersion, SPEC, newManifestFile, 
null);
+        },
+        targetFileSize);
+  }
+
+  private RollingManifestWriter<DeleteFile> newRollingWriteDeleteManifest(long 
targetFileSize) {
+    return new RollingManifestWriter<>(
+        () -> {
+          OutputFile newManifestFile = newManifestFile();
+          return ManifestFiles.writeDeleteManifest(formatVersion, SPEC, 
newManifestFile, null);
+        },
+        targetFileSize);
+  }
+
+  private OutputFile newManifestFile() {
+    try {
+      return 
Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
 }

Reply via email to