This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 6852278d6c Core: `streaming-skip-overwrite-snapshots` only skips
(#8980)
6852278d6c is described below
commit 6852278d6cf01bec2998be954d7bd6f6cc37cc94
Author: cccs-jc <[email protected]>
AuthorDate: Fri Jan 26 15:30:27 2024 -0500
Core: `streaming-skip-overwrite-snapshots` only skips (#8980)
---
.../spark/source/SparkMicroBatchStream.java | 30 +++++-
.../spark/source/TestStructuredStreamingRead3.java | 109 ++++++++++++++++++++-
2 files changed, 137 insertions(+), 2 deletions(-)
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 4019fedcbb..3ffd9904bb 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -392,8 +392,15 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
// if everything was OK and we consumed complete snapshot then move to
next snapshot
if (shouldContinueReading) {
+ Snapshot nextValid = nextValidSnapshot(curSnapshot);
+ if (nextValid == null) {
+ // nextValide implies all the remaining snapshots should be skipped.
+ shouldContinueReading = false;
+ break;
+ }
+ // we found the next available snapshot, continue from there.
+ curSnapshot = nextValid;
startPosOfSnapOffset = -1;
- curSnapshot = SnapshotUtil.snapshotAfter(table,
curSnapshot.snapshotId());
// if anyhow we are moving to next snapshot we should only scan
addedFiles
scanAllFiles = false;
}
@@ -406,6 +413,27 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
return latestStreamingOffset.equals(startingOffset) ? null :
latestStreamingOffset;
}
+ /**
+ * Get the next snapshot skiping over rewrite and delete snapshots.
+ *
+ * @param curSnapshot the current snapshot
+ * @return the next valid snapshot (not a rewrite or delete snapshot),
returns null if all
+ * remaining snapshots should be skipped.
+ */
+ private Snapshot nextValidSnapshot(Snapshot curSnapshot) {
+ Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table,
curSnapshot.snapshotId());
+ // skip over rewrite and delete snapshots
+ while (!shouldProcess(nextSnapshot)) {
+ LOG.debug("Skipping snapshot: {} of table {}",
nextSnapshot.snapshotId(), table.name());
+ // if the currentSnapShot was also the mostRecentSnapshot then break
+ if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+ return null;
+ }
+ nextSnapshot = SnapshotUtil.snapshotAfter(table,
nextSnapshot.snapshotId());
+ }
+ return nextSnapshot;
+ }
+
private long addedFilesCount(Snapshot snapshot) {
long addedFilesCount =
PropertyUtil.propertyAsLong(snapshot.summary(),
SnapshotSummary.ADDED_FILES_PROP, -1);
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 7fe5951ef1..22e7df0f4e 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -31,11 +31,14 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
@@ -118,7 +121,8 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
"CREATE TABLE %s "
+ "(id INT, data STRING) "
+ "USING iceberg "
- + "PARTITIONED BY (bucket(3, id))",
+ + "PARTITIONED BY (bucket(3, id)) "
+ + "TBLPROPERTIES ('commit.manifest.min-count-to-merge'='3',
'commit.manifest-merge.enabled'='true')",
tableName);
this.table = validationCatalog.loadTable(tableIdent);
microBatches.set(0);
@@ -494,6 +498,86 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
.hasMessageStartingWith("Cannot process overwrite snapshot");
}
+ @TestTemplate
+ public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplace()
throws Exception {
+ // fill table with some data
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expected);
+
+ makeRewriteDataFiles();
+
+ assertThat(
+ microBatchCount(
+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
+ .isEqualTo(6);
+ }
+
+ @TestTemplate
+ public void
testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxRows()
+ throws Exception {
+ // fill table with some data
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expected);
+
+ makeRewriteDataFiles();
+
+ assertThat(
+ microBatchCount(
+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
+ .isEqualTo(2);
+ }
+
+ @TestTemplate
+ public void
testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxFilesAndRows()
+ throws Exception {
+ // fill table with some data
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expected);
+
+ makeRewriteDataFiles();
+
+ assertThat(
+ microBatchCount(
+ ImmutableMap.of(
+ SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
+ "4",
+ SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
+ "1")))
+ .isEqualTo(6);
+ }
+
+ @TestTemplate
+ public void testReadStreamWithSnapshotType2RewriteDataFilesIgnoresReplace()
throws Exception {
+ // fill table with some data
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expected);
+
+ makeRewriteDataFiles();
+ makeRewriteDataFiles();
+
+ assertThat(
+ microBatchCount(
+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
+ .isEqualTo(6);
+ }
+
+ @TestTemplate
+ public void
testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceFollowedByAppend()
+ throws Exception {
+ // fill table with some data
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expected);
+
+ makeRewriteDataFiles();
+
+ appendDataAsMultipleSnapshots(expected);
+
+ assertThat(
+ microBatchCount(
+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
+ .isEqualTo(12);
+ }
+
@TestTemplate
public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws
Exception {
// fill table with some data
@@ -574,6 +658,29 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}
+ /**
+ * We are testing that all the files in a rewrite snapshot are skipped
Create a rewrite data files
+ * snapshot using existing files.
+ */
+ public void makeRewriteDataFiles() {
+ table.refresh();
+
+ // we are testing that all the files in a rewrite snapshot are skipped
+ // create a rewrite data files snapshot using existing files
+ RewriteFiles rewrite = table.newRewrite();
+ Iterable<Snapshot> it = table.snapshots();
+ for (Snapshot snapshot : it) {
+ if (snapshot.operation().equals(DataOperations.APPEND)) {
+ Iterable<DataFile> datafiles = snapshot.addedDataFiles(table.io());
+ for (DataFile datafile : datafiles) {
+ rewrite.addFile(datafile);
+ rewrite.deleteFile(datafile);
+ }
+ }
+ }
+ rewrite.commit();
+ }
+
/**
* appends each list as a Snapshot on the iceberg table at the given
location. accepts a list of
* lists - each list representing data per snapshot.