This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 6852278d6c Core: `streaming-skip-overwrite-snapshots` only skips 
(#8980)
6852278d6c is described below

commit 6852278d6cf01bec2998be954d7bd6f6cc37cc94
Author: cccs-jc <[email protected]>
AuthorDate: Fri Jan 26 15:30:27 2024 -0500

    Core: `streaming-skip-overwrite-snapshots` only skips (#8980)
---
 .../spark/source/SparkMicroBatchStream.java        |  30 +++++-
 .../spark/source/TestStructuredStreamingRead3.java | 109 ++++++++++++++++++++-
 2 files changed, 137 insertions(+), 2 deletions(-)

diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 4019fedcbb..3ffd9904bb 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -392,8 +392,15 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
 
       // if everything was OK and we consumed complete snapshot then move to 
next snapshot
       if (shouldContinueReading) {
+        Snapshot nextValid = nextValidSnapshot(curSnapshot);
+        if (nextValid == null) {
+          // nextValide implies all the remaining snapshots should be skipped.
+          shouldContinueReading = false;
+          break;
+        }
+        // we found the next available snapshot, continue from there.
+        curSnapshot = nextValid;
         startPosOfSnapOffset = -1;
-        curSnapshot = SnapshotUtil.snapshotAfter(table, 
curSnapshot.snapshotId());
         // if anyhow we are moving to next snapshot we should only scan 
addedFiles
         scanAllFiles = false;
       }
@@ -406,6 +413,27 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
     return latestStreamingOffset.equals(startingOffset) ? null : 
latestStreamingOffset;
   }
 
+  /**
+   * Get the next snapshot skiping over rewrite and delete snapshots.
+   *
+   * @param curSnapshot the current snapshot
+   * @return the next valid snapshot (not a rewrite or delete snapshot), 
returns null if all
+   *     remaining snapshots should be skipped.
+   */
+  private Snapshot nextValidSnapshot(Snapshot curSnapshot) {
+    Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, 
curSnapshot.snapshotId());
+    // skip over rewrite and delete snapshots
+    while (!shouldProcess(nextSnapshot)) {
+      LOG.debug("Skipping snapshot: {} of table {}", 
nextSnapshot.snapshotId(), table.name());
+      // if the currentSnapShot was also the mostRecentSnapshot then break
+      if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+        return null;
+      }
+      nextSnapshot = SnapshotUtil.snapshotAfter(table, 
nextSnapshot.snapshotId());
+    }
+    return nextSnapshot;
+  }
+
   private long addedFilesCount(Snapshot snapshot) {
     long addedFilesCount =
         PropertyUtil.propertyAsLong(snapshot.summary(), 
SnapshotSummary.ADDED_FILES_PROP, -1);
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 7fe5951ef1..22e7df0f4e 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -31,11 +31,14 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
@@ -118,7 +121,8 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         "CREATE TABLE %s "
             + "(id INT, data STRING) "
             + "USING iceberg "
-            + "PARTITIONED BY (bucket(3, id))",
+            + "PARTITIONED BY (bucket(3, id)) "
+            + "TBLPROPERTIES ('commit.manifest.min-count-to-merge'='3', 
'commit.manifest-merge.enabled'='true')",
         tableName);
     this.table = validationCatalog.loadTable(tableIdent);
     microBatches.set(0);
@@ -494,6 +498,86 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         .hasMessageStartingWith("Cannot process overwrite snapshot");
   }
 
+  @TestTemplate
+  public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplace() 
throws Exception {
+    // fill table with some data
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+
+    makeRewriteDataFiles();
+
+    assertThat(
+            microBatchCount(
+                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
+        .isEqualTo(6);
+  }
+
+  @TestTemplate
+  public void 
testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxRows()
+      throws Exception {
+    // fill table with some data
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+
+    makeRewriteDataFiles();
+
+    assertThat(
+            microBatchCount(
+                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
+        .isEqualTo(2);
+  }
+
+  @TestTemplate
+  public void 
testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxFilesAndRows()
+      throws Exception {
+    // fill table with some data
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+
+    makeRewriteDataFiles();
+
+    assertThat(
+            microBatchCount(
+                ImmutableMap.of(
+                    SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
+                    "4",
+                    SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
+                    "1")))
+        .isEqualTo(6);
+  }
+
+  @TestTemplate
+  public void testReadStreamWithSnapshotType2RewriteDataFilesIgnoresReplace() 
throws Exception {
+    // fill table with some data
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+
+    makeRewriteDataFiles();
+    makeRewriteDataFiles();
+
+    assertThat(
+            microBatchCount(
+                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
+        .isEqualTo(6);
+  }
+
+  @TestTemplate
+  public void 
testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceFollowedByAppend()
+      throws Exception {
+    // fill table with some data
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+
+    makeRewriteDataFiles();
+
+    appendDataAsMultipleSnapshots(expected);
+
+    assertThat(
+            microBatchCount(
+                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
+        .isEqualTo(12);
+  }
+
   @TestTemplate
   public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws 
Exception {
     // fill table with some data
@@ -574,6 +658,29 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
   }
 
+  /**
+   * We are testing that all the files in a rewrite snapshot are skipped 
Create a rewrite data files
+   * snapshot using existing files.
+   */
+  public void makeRewriteDataFiles() {
+    table.refresh();
+
+    // we are testing that all the files in a rewrite snapshot are skipped
+    // create a rewrite data files snapshot using existing files
+    RewriteFiles rewrite = table.newRewrite();
+    Iterable<Snapshot> it = table.snapshots();
+    for (Snapshot snapshot : it) {
+      if (snapshot.operation().equals(DataOperations.APPEND)) {
+        Iterable<DataFile> datafiles = snapshot.addedDataFiles(table.io());
+        for (DataFile datafile : datafiles) {
+          rewrite.addFile(datafile);
+          rewrite.deleteFile(datafile);
+        }
+      }
+    }
+    rewrite.commit();
+  }
+
   /**
    * appends each list as a Snapshot on the iceberg table at the given 
location. accepts a list of
    * lists - each list representing data per snapshot.

Reply via email to