This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9245abd  Add RewriteManifests operation (#200)
9245abd is described below

commit 9245abd248fb4e37b6eca39aca8bfab044d815fa
Author: bryanck <[email protected]>
AuthorDate: Sat Jun 8 14:04:14 2019 -0700

    Add RewriteManifests operation (#200)
    
    This operation can select manifests to rewrite and cluster the entries of 
those manifests to make scan planning more efficient.
---
 .../java/org/apache/iceberg/RewriteManifests.java  |  57 ++++
 api/src/main/java/org/apache/iceberg/Table.java    |   8 +
 .../main/java/org/apache/iceberg/Transaction.java  |   7 +
 .../main/java/org/apache/iceberg/BaseTable.java    |   5 +
 .../java/org/apache/iceberg/BaseTransaction.java   |  13 +
 .../java/org/apache/iceberg/ReplaceManifests.java  | 261 ++++++++++++++++
 .../org/apache/iceberg/TestReplaceManifests.java   | 332 +++++++++++++++++++++
 7 files changed, 683 insertions(+)

diff --git a/api/src/main/java/org/apache/iceberg/RewriteManifests.java 
b/api/src/main/java/org/apache/iceberg/RewriteManifests.java
new file mode 100644
index 0000000..1ed1ddf
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/RewriteManifests.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+/**
+ * API for rewriting manifests for a table.
+ * <p>
+ * This API accumulates manifest files, produces a new {@link Snapshot} of the 
table
+ * described only by the manifest files that were added, and commits that 
snapshot as the
+ * current.
+ * <p>
+ * When committing, these changes will be applied to the latest table 
snapshot. Commit conflicts
+ * will be resolved by applying the changes to the new latest snapshot and 
reattempting the commit.
+ */
+public interface RewriteManifests extends SnapshotUpdate<RewriteManifests> {
+  /**
+   * Groups an existing {@link DataFile} by a cluster key produced by a 
function. The cluster key
+   * will determine which data file will be associated with a particular 
manifest. All data files
+   * with the same cluster key will be written to the same manifest (unless 
the file is large and
+   * split into multiple files).
+   *
+   * @param func Function used to cluster data files to manifests.
+   * @return this for method chaining
+   */
+  RewriteManifests clusterBy(Function<DataFile, Object> func);
+
+  /**
+   * Determines which existing {@link ManifestFile} for the table should be 
rewritten. Manifests
+   * that do not match the predicate are kept as-is. If this is not called and 
no predicate is set, then
+   * all manifests will be rewritten.
+   *
+   * @param predicate Predicate used to determine which manifests to rewrite. 
If true then the manifest
+   *                  file will be included for rewrite. If false then then 
manifest is kept as-is.
+   * @return this for method chaining
+   */
+  RewriteManifests rewriteIf(Predicate<ManifestFile> predicate);
+}
diff --git a/api/src/main/java/org/apache/iceberg/Table.java 
b/api/src/main/java/org/apache/iceberg/Table.java
index d0b5821..e6390cc 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -137,6 +137,14 @@ public interface Table {
   RewriteFiles newRewrite();
 
   /**
+   * Create a new {@link RewriteManifests rewrite manifests API} to replace 
manifests for this
+   * table and commit.
+   *
+   * @return a new {@link RewriteManifests}
+   */
+  RewriteManifests rewriteManifests();
+
+  /**
    * Create a new {@link OverwriteFiles overwrite API} to overwrite files by a 
filter expression.
    *
    * @return a new {@link OverwriteFiles}
diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java 
b/api/src/main/java/org/apache/iceberg/Transaction.java
index d2e6320..f5b9f84 100644
--- a/api/src/main/java/org/apache/iceberg/Transaction.java
+++ b/api/src/main/java/org/apache/iceberg/Transaction.java
@@ -85,6 +85,13 @@ public interface Transaction {
   RewriteFiles newRewrite();
 
   /**
+   * Create a new {@link RewriteManifests rewrite manifests API} to replace 
manifests for this table.
+   *
+   * @return a new {@link RewriteManifests}
+   */
+  RewriteManifests rewriteManifests();
+
+  /**
    * Create a new {@link OverwriteFiles overwrite API} to overwrite files by a 
filter expression.
    *
    * @return a new {@link OverwriteFiles}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java 
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 4fb69c9..9e61e1c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -114,6 +114,11 @@ public class BaseTable implements Table, 
HasTableOperations {
   }
 
   @Override
+  public RewriteManifests rewriteManifests() {
+    return new ReplaceManifests(ops);
+  }
+
+  @Override
   public OverwriteFiles newOverwrite() {
     return new OverwriteData(ops);
   }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java 
b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index b3ec03f..d16d143 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -143,6 +143,14 @@ class BaseTransaction implements Transaction {
   }
 
   @Override
+  public RewriteManifests rewriteManifests() {
+    checkLastOperationCommitted("RewriteManifests");
+    RewriteManifests rewrite = new ReplaceManifests(transactionOps);
+    updates.add(rewrite);
+    return rewrite;
+  }
+
+  @Override
   public OverwriteFiles newOverwrite() {
     checkLastOperationCommitted("OverwriteFiles");
     OverwriteFiles overwrite = new OverwriteData(transactionOps);
@@ -372,6 +380,11 @@ class BaseTransaction implements Transaction {
     }
 
     @Override
+    public RewriteManifests rewriteManifests() {
+      return BaseTransaction.this.rewriteManifests();
+    }
+
+    @Override
     public OverwriteFiles newOverwrite() {
       return BaseTransaction.this.newOverwrite();
     }
diff --git a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java 
b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
new file mode 100644
index 0000000..0077070
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
+
+
+public class ReplaceManifests extends SnapshotProducer<RewriteManifests> 
implements RewriteManifests {
+  private final TableOperations ops;
+  private final PartitionSpec spec;
+  private final long manifestTargetSizeBytes;
+
+  private final List<ManifestFile> keptManifests = 
Collections.synchronizedList(new ArrayList<>());
+  private final List<ManifestFile> newManifests = 
Collections.synchronizedList(new ArrayList<>());
+  private final Set<ManifestFile> replacedManifests = 
Collections.synchronizedSet(new HashSet<>());
+  private final Map<Object, WriterWrapper> writers = 
Collections.synchronizedMap(new HashMap<>());
+
+  private final AtomicInteger manifestCount = new AtomicInteger(0);
+  private final AtomicLong entryCount = new AtomicLong(0);
+
+  private final Map<String, String> summaryProps = new HashMap<>();
+
+  private Function<DataFile, Object> clusterByFunc;
+  private Predicate<ManifestFile> predicate;
+
+  private static final String REPLACED_CNT = "manifests-replaced";
+  private static final String KEPT_CNT = "manifests-kept";
+  private static final String NEW_CNT = "manifests-created";
+  private static final String ENTRY_CNT = "entries-processed";
+
+  ReplaceManifests(TableOperations ops) {
+    super(ops);
+    this.ops = ops;
+    this.spec = ops.current().spec();
+    this.manifestTargetSizeBytes =
+      ops.current().propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, 
MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
+  }
+
+  @Override
+  protected String operation() {
+    return DataOperations.REPLACE;
+  }
+
+  @Override
+  public RewriteManifests set(String property, String value) {
+    summaryProps.put(property, value);
+    return this;
+  }
+
+  @Override
+  protected Map<String, String> summary() {
+    Map<String, String> result = new HashMap<>();
+    result.putAll(summaryProps);
+    result.put(KEPT_CNT, Integer.toString(keptManifests.size()));
+    result.put(NEW_CNT, Integer.toString(newManifests.size()));
+    result.put(REPLACED_CNT, Integer.toString(replacedManifests.size()));
+    result.put(ENTRY_CNT, Long.toString(entryCount.get()));
+    return result;
+  }
+
+  @Override
+  public ReplaceManifests clusterBy(Function<DataFile, Object> func) {
+    this.clusterByFunc = func;
+    return this;
+  }
+
+  @Override
+  public ReplaceManifests rewriteIf(Predicate<ManifestFile> pred) {
+    this.predicate = pred;
+    return this;
+  }
+
+  @Override
+  public List<ManifestFile> apply(TableMetadata base) {
+    Preconditions.checkNotNull(clusterByFunc, "clusterBy function cannot be 
null");
+
+    List<ManifestFile> currentManifests = base.currentSnapshot().manifests();
+
+    if (requiresRewrite(currentManifests)) {
+      // run the rewrite process
+      performRewrite(currentManifests);
+    } else {
+      // just keep any new manifests that were added since the last apply(), 
don't rerun
+      addExistingFromNewCommit(currentManifests);
+    }
+
+    // put new manifests at the beginning
+    List<ManifestFile> apply = new ArrayList<>();
+    apply.addAll(newManifests);
+    apply.addAll(keptManifests);
+
+    return apply;
+  }
+
+  private boolean requiresRewrite(List<ManifestFile> currentManifests) {
+    if (replacedManifests.size() == 0) {
+      // nothing yet processed so perform a full rewrite
+      return true;
+    }
+    // if any processed manifest is not in the current manifest list, perform 
a full rewrite
+    Set<ManifestFile> set = Sets.newHashSet(currentManifests);
+    return replacedManifests.stream().anyMatch(manifest -> 
!set.contains(manifest));
+  }
+
+  private void addExistingFromNewCommit(List<ManifestFile> currentManifests) {
+    // keep any existing manifests as-is that were not processed
+    keptManifests.clear();
+    currentManifests.stream()
+      .filter(manifest -> !replacedManifests.contains(manifest))
+      .forEach(manifest -> keptManifests.add(manifest));
+  }
+
+  private void reset() {
+    cleanAll();
+    entryCount.set(0);
+    manifestCount.set(0);
+    keptManifests.clear();
+    replacedManifests.clear();
+    newManifests.clear();
+    writers.clear();
+  }
+
+  private void performRewrite(List<ManifestFile> currentManifests) {
+    reset();
+
+    try {
+      Tasks.foreach(currentManifests)
+          .executeWith(ThreadPools.getWorkerPool())
+          .run(manifest -> {
+            if (predicate != null && !predicate.test(manifest)) {
+              keptManifests.add(manifest);
+            } else {
+              replacedManifests.add(manifest);
+              long entryNum = manifest.addedFilesCount() + 
manifest.existingFilesCount() + manifest.deletedFilesCount();
+              long avgEntryLen = manifest.length() / entryNum;
+
+              try (ManifestReader reader =
+                     
ManifestReader.read(ops.io().newInputFile(manifest.path()), 
ops.current()::spec)) {
+                FilteredManifest filteredManifest = 
reader.select(Arrays.asList("*"));
+                filteredManifest.liveEntries().forEach(
+                    entry -> appendEntry(entry, avgEntryLen, 
clusterByFunc.apply(entry.file()))
+                );
+
+              } catch (IOException x) {
+                throw new RuntimeIOException(x);
+              }
+            }
+          });
+    } finally {
+      
Tasks.foreach(writers.values()).executeWith(ThreadPools.getWorkerPool()).run(writer
 -> writer.close());
+    }
+  }
+
+  private void appendEntry(ManifestEntry entry, long avgEntryLen, Object key) {
+    Preconditions.checkNotNull(entry, "Manifest entry cannot be null");
+    Preconditions.checkNotNull(key, "Key cannot be null");
+
+    WriterWrapper writer = getWriter(key);
+    writer.addEntry(entry, avgEntryLen);
+    entryCount.incrementAndGet();
+  }
+
+  private WriterWrapper getWriter(Object key) {
+    WriterWrapper writer = writers.get(key);
+    if (writer == null) {
+      synchronized (writers) {
+        writer = writers.get(key); // check again after getting lock
+        if (writer == null) {
+          writer = new WriterWrapper();
+          writers.put(key, writer);
+        }
+      }
+    }
+    return writer;
+  }
+
+  @Override
+  protected void cleanUncommitted(Set<ManifestFile> committed) {
+    for (ManifestFile manifest : newManifests) {
+      if (!committed.contains(manifest)) {
+        deleteFile(manifest.path());
+      }
+    }
+  }
+
+  long getManifestTargetSizeBytes() {
+    return manifestTargetSizeBytes;
+  }
+
+  class WriterWrapper {
+    private ManifestWriter writer;
+    private long estimatedSize;
+
+    synchronized void addEntry(ManifestEntry entry, long len) {
+      if (writer == null) {
+        writer = newWriter();
+      } else if (estimatedSize >= getManifestTargetSizeBytes()) {
+        close();
+        writer = newWriter();
+      }
+
+      writer.addExisting(entry);
+      estimatedSize += len;
+    }
+
+    private ManifestWriter newWriter() {
+      estimatedSize = 0;
+      return new ManifestWriter(spec, 
manifestPath(manifestCount.getAndIncrement()), snapshotId());
+    }
+
+    synchronized void close() {
+      if (writer != null) {
+        try {
+          writer.close();
+          newManifests.add(writer.toManifestFile());
+        } catch (IOException x) {
+          throw new RuntimeIOException(x);
+        }
+      }
+    }
+
+  }
+
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceManifests.java 
b/core/src/test/java/org/apache/iceberg/TestReplaceManifests.java
new file mode 100644
index 0000000..7281916
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestReplaceManifests.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestReplaceManifests extends TableTestBase {
+
+  @Test
+  public void testReplaceManifestsSeparate() {
+    Table table = load();
+    table.newFastAppend()
+      .appendFile(FILE_A)
+      .appendFile(FILE_B)
+      .commit();
+    long appendId = table.currentSnapshot().snapshotId();
+
+    Assert.assertEquals(1, table.currentSnapshot().manifests().size());
+
+    // cluster by path will split the manifest into two
+
+    table.rewriteManifests()
+      .clusterBy(file -> file.path())
+      .commit();
+
+    List<ManifestFile> manifests = table.currentSnapshot().manifests();
+    Assert.assertEquals(2, manifests.size());
+    manifests.sort(Comparator.comparing(ManifestFile::path));
+
+    validateManifestEntries(manifests.get(0),
+                            ids(appendId),
+                            files(FILE_A),
+                            statuses(ManifestEntry.Status.EXISTING));
+    validateManifestEntries(manifests.get(1),
+                            ids(appendId),
+                            files(FILE_B),
+                            statuses(ManifestEntry.Status.EXISTING));
+  }
+
+  @Test
+  public void testReplaceManifestsConsolidate() throws IOException {
+    Table table = load();
+
+    table.newFastAppend()
+      .appendFile(FILE_A)
+      .commit();
+    long appendIdA = table.currentSnapshot().snapshotId();
+    table.newFastAppend()
+      .appendFile(FILE_B)
+      .commit();
+    long appendIdB = table.currentSnapshot().snapshotId();
+
+    Assert.assertEquals(2, table.currentSnapshot().manifests().size());
+
+    // cluster by constant will combine manifests into one
+
+    table.rewriteManifests()
+      .clusterBy(file -> "file")
+      .commit();
+
+    List<ManifestFile> manifests = table.currentSnapshot().manifests();
+    Assert.assertEquals(1, manifests.size());
+
+    // get the file order correct
+    List<DataFile> files;
+    List<Long> ids;
+    try (ManifestReader reader = 
ManifestReader.read(localInput(manifests.get(0).path()))) {
+      if (reader.iterator().next().path().equals(FILE_A.path())) {
+        files = Arrays.asList(FILE_A, FILE_B);
+        ids = Arrays.asList(appendIdA, appendIdB);
+      } else {
+        files = Arrays.asList(FILE_B, FILE_A);
+        ids = Arrays.asList(appendIdB, appendIdA);
+      }
+    }
+
+    validateManifestEntries(manifests.get(0),
+                            ids.iterator(),
+                            files.iterator(),
+                            statuses(ManifestEntry.Status.EXISTING, 
ManifestEntry.Status.EXISTING));
+  }
+
+  @Test
+  public void testReplaceManifestsWithFilter() throws IOException {
+    Table table = load();
+
+    table.newFastAppend()
+      .appendFile(FILE_A)
+      .commit();
+    long appendIdA = table.currentSnapshot().snapshotId();
+
+    table.newFastAppend()
+      .appendFile(FILE_B)
+      .commit();
+    long appendIdB = table.currentSnapshot().snapshotId();
+
+    table.newFastAppend()
+      .appendFile(FILE_C)
+      .commit();
+    long appendIdC = table.currentSnapshot().snapshotId();
+
+    Assert.assertEquals(3, table.currentSnapshot().manifests().size());
+
+    //keep the file A manifest, combine the other two
+
+    table.rewriteManifests()
+      .clusterBy(file -> "file")
+      .rewriteIf(manifest -> {
+        try (ManifestReader reader = 
ManifestReader.read(localInput(manifest.path()))) {
+          return !reader.iterator().next().path().equals(FILE_A.path());
+        } catch (IOException x) {
+          throw new RuntimeIOException(x);
+        }
+      })
+      .commit();
+
+    List<ManifestFile> manifests = table.currentSnapshot().manifests();
+    Assert.assertEquals(2, manifests.size());
+
+    // get the file order correct
+    List<DataFile> files;
+    List<Long> ids;
+    try (ManifestReader reader = 
ManifestReader.read(localInput(manifests.get(0).path()))) {
+      if (reader.iterator().next().path().equals(FILE_B.path())) {
+        files = Arrays.asList(FILE_B, FILE_C);
+        ids = Arrays.asList(appendIdB, appendIdC);
+      } else {
+        files = Arrays.asList(FILE_C, FILE_B);
+        ids = Arrays.asList(appendIdC, appendIdB);
+      }
+    }
+
+    validateManifestEntries(manifests.get(0),
+                            ids.iterator(),
+                            files.iterator(),
+                            statuses(ManifestEntry.Status.EXISTING, 
ManifestEntry.Status.EXISTING));
+    validateManifestEntries(manifests.get(1),
+                            ids(appendIdA),
+                            files(FILE_A),
+                            statuses(ManifestEntry.Status.ADDED));
+  }
+
+  @Test
+  public void testReplaceManifestsMaxSize() {
+    Table table = load();
+    table.newFastAppend()
+      .appendFile(FILE_A)
+      .appendFile(FILE_B)
+      .commit();
+    long appendId = table.currentSnapshot().snapshotId();
+
+    Assert.assertEquals(1, table.currentSnapshot().manifests().size());
+
+    // cluster by constant will combine manifests into one but small target 
size will create one per entry
+    ReplaceManifests rewriteManifests = spy((ReplaceManifests) 
table.rewriteManifests());
+    when(rewriteManifests.getManifestTargetSizeBytes()).thenReturn(1L);
+    rewriteManifests.clusterBy(file -> "file").commit();
+
+    List<ManifestFile> manifests = table.currentSnapshot().manifests();
+    Assert.assertEquals(2, manifests.size());
+    manifests.sort(Comparator.comparing(ManifestFile::path));
+
+    validateManifestEntries(manifests.get(0),
+                            ids(appendId),
+                            files(FILE_A),
+                            statuses(ManifestEntry.Status.EXISTING));
+    validateManifestEntries(manifests.get(1),
+                            ids(appendId),
+                            files(FILE_B),
+                            statuses(ManifestEntry.Status.EXISTING));
+  }
+
+  @Test
+  public void testConcurrentRewriteManifest() throws IOException {
+    Table table = load();
+    table.newFastAppend()
+      .appendFile(FILE_A)
+      .commit();
+    long appendIdA = table.currentSnapshot().snapshotId();
+    table.newFastAppend()
+      .appendFile(FILE_B)
+      .commit();
+    long appendIdB = table.currentSnapshot().snapshotId();
+
+    // start a rewrite manifests that involves both manifests
+    RewriteManifests rewrite = table.rewriteManifests();
+    rewrite.clusterBy(file -> "file").apply();
+
+    // commit a rewrite manifests that only involves one manifest
+    table.rewriteManifests()
+      .clusterBy(file -> "file")
+      .rewriteIf(manifest -> {
+        try (ManifestReader reader = 
ManifestReader.read(localInput(manifest.path()))) {
+          return !reader.iterator().next().path().equals(FILE_A.path());
+        } catch (IOException x) {
+          throw new RuntimeIOException(x);
+        }
+      })
+      .commit();
+
+    Assert.assertEquals(2, table.currentSnapshot().manifests().size());
+
+    // commit the rewrite manifests in progress - this should perform a full 
rewrite as the manifest
+    // with file B is no longer part of the snapshot
+    rewrite.commit();
+
+    List<ManifestFile> manifests = table.currentSnapshot().manifests();
+    Assert.assertEquals(1, manifests.size());
+
+    // get the file order correct
+    List<DataFile> files;
+    List<Long> ids;
+    try (ManifestReader reader = 
ManifestReader.read(localInput(manifests.get(0).path()))) {
+      if (reader.iterator().next().path().equals(FILE_A.path())) {
+        files = Arrays.asList(FILE_A, FILE_B);
+        ids = Arrays.asList(appendIdA, appendIdB);
+      } else {
+        files = Arrays.asList(FILE_B, FILE_A);
+        ids = Arrays.asList(appendIdB, appendIdA);
+      }
+    }
+
+    validateManifestEntries(manifests.get(0),
+                            ids.iterator(),
+                            files.iterator(),
+                            statuses(ManifestEntry.Status.EXISTING, 
ManifestEntry.Status.EXISTING));
+  }
+
+  @Test
+  public void testAppendDuringRewriteManifest() {
+    Table table = load();
+    table.newFastAppend()
+      .appendFile(FILE_A)
+      .commit();
+    long appendIdA = table.currentSnapshot().snapshotId();
+
+    // start the rewrite manifests
+    RewriteManifests rewrite = table.rewriteManifests();
+    rewrite.clusterBy(file -> "file").apply();
+
+    // append a file
+    table.newFastAppend()
+      .appendFile(FILE_B)
+      .commit();
+    long appendIdB = table.currentSnapshot().snapshotId();
+
+    Assert.assertEquals(2, table.currentSnapshot().manifests().size());
+
+    // commit the rewrite manifests in progress
+    rewrite.commit();
+
+    // the rewrite should only affect the first manifest, so we will end up 
with 2 manifests even though we
+    // have a single cluster key, rewritten one should be the first in the list
+
+    List<ManifestFile> manifests = table.currentSnapshot().manifests();
+    Assert.assertEquals(2, manifests.size());
+
+    validateManifestEntries(manifests.get(0),
+                            ids(appendIdA),
+                            files(FILE_A),
+                            statuses(ManifestEntry.Status.EXISTING));
+    validateManifestEntries(manifests.get(1),
+                            ids(appendIdB),
+                            files(FILE_B),
+                            statuses(ManifestEntry.Status.ADDED));
+  }
+
+  @Test
+  public void testRewriteManifestDuringAppend() {
+    Table table = load();
+    table.newFastAppend()
+      .appendFile(FILE_A)
+      .commit();
+    long appendIdA = table.currentSnapshot().snapshotId();
+
+    // start an append
+    AppendFiles append = table.newFastAppend();
+    append.appendFile(FILE_B).apply();
+
+    // rewrite the manifests - only affects the first
+    table.rewriteManifests()
+      .clusterBy(file -> "file")
+      .commit();
+
+    Assert.assertEquals(1, table.currentSnapshot().manifests().size());
+
+    // commit the append in progress
+    append.commit();
+    long appendIdB = table.currentSnapshot().snapshotId();
+
+    List<ManifestFile> manifests = table.currentSnapshot().manifests();
+    Assert.assertEquals(2, manifests.size());
+
+    // last append should be the first in the list
+
+    validateManifestEntries(manifests.get(0),
+                            ids(appendIdB),
+                            files(FILE_B),
+                            statuses(ManifestEntry.Status.ADDED));
+    validateManifestEntries(manifests.get(1),
+                            ids(appendIdA),
+                            files(FILE_A),
+                            statuses(ManifestEntry.Status.EXISTING));
+  }
+}

Reply via email to