rdblue commented on a change in pull request #2660:
URL: https://github.com/apache/iceberg/pull/2660#discussion_r655707282



##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +120,19 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
 
     return newFiles;
   }
+
+  /**
+   * traverses through the table's snapshots and finds the {@link Table}'s
+   * Snapshot that is committed after the specified snapshotId
+   * @return null if passed in snapshot expired, else the snapshot after the 
passed in snapshot
+   */
+  public static Snapshot snapshotAfter(Table table, long snapshotId) {
+    Snapshot previousSnapshot = table.snapshot(snapshotId);
+    Snapshot pointer = table.currentSnapshot();

Review comment:
       Nit: other methods in this file use `current` instead of `pointer`. It 
would be good to rename.

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +120,19 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
 
     return newFiles;
   }
+
+  /**
+   * traverses through the table's snapshots and finds the {@link Table}'s
+   * Snapshot that is committed after the specified snapshotId
+   * @return null if passed in snapshot expired, else the snapshot after the 
passed in snapshot

Review comment:
       I don't think this quite captures what the method is doing because it 
doesn't mention that this is with respect to the table's current state. How 
about this?
   
   > Traverses the history of the table's current snapshot and finds the 
snapshot with the given snapshot id as its parent.
   
   I think this should also document the behavior when:
   * The given snapshot ID is not found in the table 
(`IllegalArgumentException`?)
   * The given snapshot ID is not an ancestor of the current table state 
(`IllegalStateException`?)
   
   Nit: We also prefer to use sentence case for documentation.

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -63,6 +63,19 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
     return ancestorIds(table.currentSnapshot(), table::snapshot);
   }
 
+  /**
+   * Find the oldest Snapshot of a {@link Table}.

Review comment:
       I think this should be more clear, similar to my comments on the docs 
about the method below. Basically, it should state that the "oldest" is 
determined by traversing the history of the current table state.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Joiner SLASH = Joiner.on("/");
+
+  private final Table table;
+  private final boolean caseSensitive;
+  private final String expectedSchema;
+  private final Broadcast<Table> tableBroadcast;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean 
caseSensitive,
+                        Schema expectedSchema, CaseInsensitiveStringMap 
options, String checkpointLocation) {
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = SchemaParser.toJson(expectedSchema);
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+    this.tableBroadcast = 
sparkContext.broadcast(SerializableTable.copyOf(table));
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, 
checkpointLocation);
+    this.initialOffset = initialOffsetStore.initialOffset();
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    return new StreamingOffset(latestSnapshot.snapshotId(), 
Iterables.size(latestSnapshot.addedFiles()), false);
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions(Offset start, Offset end) {
+    Preconditions.checkArgument(end instanceof StreamingOffset, "Invalid end 
offset: %s is not a StreamingOffset", end);
+    Preconditions.checkArgument(
+        start instanceof StreamingOffset, "Invalid start offset: %s is not a 
StreamingOffset", start);
+
+    if (end.equals(StreamingOffset.START_OFFSET)) {
+      return new InputPartition[0];
+    }
+
+    StreamingOffset endOffset = (StreamingOffset) end;
+    StreamingOffset startOffset = (StreamingOffset) start;
+
+    List<FileScanTask> fileScanTasks = planFiles(startOffset, endOffset);
+
+    CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+        CloseableIterable.withNoopClose(fileScanTasks),
+        splitSize);
+    List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, 
splitOpenFileCost));
+    InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+    for (int i = 0; i < combinedScanTasks.size(); i++) {
+      readTasks[i] = new ReadTask(
+          combinedScanTasks.get(i), tableBroadcast, expectedSchema,
+          caseSensitive, localityPreferred);
+    }
+
+    return readTasks;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory(0);
+  }
+
+  @Override
+  public Offset initialOffset() {
+    return initialOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  private List<FileScanTask> planFiles(StreamingOffset startOffset, 
StreamingOffset endOffset) {
+    List<FileScanTask> fileScanTasks = Lists.newArrayList();
+    MicroBatch latestMicroBatch = null;
+    StreamingOffset batchStartOffset = 
StreamingOffset.START_OFFSET.equals(startOffset) ?
+        new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 
0, false) :
+        startOffset;
+
+    do {
+      StreamingOffset currentOffset =
+          latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
+              new 
StreamingOffset(snapshotAfter(latestMicroBatch.snapshotId()), 0L, false) :
+              batchStartOffset;
+
+      latestMicroBatch = 
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+          .caseSensitive(caseSensitive)
+          .specsById(table.specs())
+          .generate(currentOffset.position(), Long.MAX_VALUE, 
currentOffset.shouldScanAllFiles());
+
+      fileScanTasks.addAll(latestMicroBatch.tasks());
+    } while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
+
+    return fileScanTasks;
+  }
+
+  private long snapshotAfter(long snapshotId) {
+    Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, snapshotId);
+
+    Preconditions.checkState(snapshotAfter != null, "Cannot find next 
snapshot: as Snapshot %s expired", snapshotId);

Review comment:
       I think this should be moved into the `SnapshotUtil` method.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    long filesNewlyAddedInLatestSnapshot = 
Iterables.size(latestSnapshot.addedFiles());
+    long existingFilesInheritedByLatestSnapshot = 
SnapshotUtil.existingDataFiles(latestSnapshot);
+    long positionValue = scanAllFiles ? existingFilesInheritedByLatestSnapshot 
+ filesNewlyAddedInLatestSnapshot :
+        filesNewlyAddedInLatestSnapshot;
+
+    return new StreamingOffset(latestSnapshot.snapshotId(), positionValue, 
scanAllFiles);
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions(Offset start, Offset end) {
+    if (end.equals(StreamingOffset.START_OFFSET)) {
+      return new InputPartition[0];
+    }
+
+    // broadcast the table metadata as input partitions will be sent to 
executors
+    Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(SerializableTable.copyOf(table));
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+    Preconditions.checkState(
+        end instanceof StreamingOffset,
+        "The end offset passed to planInputPartitions() is not an instance of 
StreamingOffset.");
+
+    Preconditions.checkState(
+        start instanceof StreamingOffset,
+        "The start offset passed to planInputPartitions() is not an instance 
of StreamingOffset.");
+
+    StreamingOffset endOffset = (StreamingOffset) end;
+    StreamingOffset startOffset = (StreamingOffset) start;
+
+    List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, 
endOffset);
+
+    CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+        CloseableIterable.withNoopClose(fileScanTasks),
+        splitSize);
+    List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, 
splitOpenFileCost));
+    InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+    for (int i = 0; i < combinedScanTasks.size(); i++) {
+      readTasks[i] = new ReadTask(
+          combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
+          caseSensitive, localityPreferred);
+    }
+
+    return readTasks;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory(0);
+  }
+
+  @Override
+  public Offset initialOffset() {
+    return initialOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  private static StreamingOffset getOrWriteInitialOffset(InitialOffsetStore 
initialOffsetStore) {
+    if (initialOffsetStore.isOffsetStoreInitialized()) {
+      return initialOffsetStore.getInitialOffset();
+    }
+
+    return initialOffsetStore.addInitialOffset();
+  }
+
+  private List<FileScanTask> calculateFileScanTasks(StreamingOffset 
startOffset, StreamingOffset endOffset) {
+    List<FileScanTask> fileScanTasks = new ArrayList<>();
+    MicroBatch latestMicroBatch = null;
+    StreamingOffset batchStartOffset = 
StreamingOffset.START_OFFSET.equals(startOffset) ?
+        new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 
0, false) :
+        startOffset;
+
+    do {
+      final StreamingOffset currentOffset =
+          latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
+          getNextAvailableSnapshot(latestMicroBatch.snapshotId()) :
+              batchStartOffset;
+
+      latestMicroBatch = 
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+          .caseSensitive(caseSensitive)
+          .specsById(table.specs())
+          .generate(currentOffset.position(), Long.MAX_VALUE, 
currentOffset.shouldScanAllFiles());
+
+      fileScanTasks.addAll(latestMicroBatch.tasks());
+    } while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
+
+    return fileScanTasks;
+  }
+
+  private StreamingOffset getNextAvailableSnapshot(long snapshotId) {
+    Snapshot previousSnapshot = table.snapshot(snapshotId);
+    Snapshot pointer = table.currentSnapshot();
+    while (pointer != null && previousSnapshot.snapshotId() != 
pointer.parentId()) {
+      
Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND),
+          "Encountered Snapshot DataOperation other than APPEND.");
+
+      pointer = table.snapshot(pointer.parentId());
+    }
+
+    Preconditions.checkState(pointer != null,
+        "Cannot read data from snapshot which has already expired: %s", 
snapshotId);
+
+    return new StreamingOffset(pointer.snapshotId(), 0L, false);
+  }
+
+  interface InitialOffsetStore {
+    static InitialOffsetStore getInstance(Table table, String 
checkpointLocation) {
+      return new InitialOffsetStoreImpl(table, checkpointLocation);
+    }
+
+    StreamingOffset addInitialOffset();
+
+    boolean isOffsetStoreInitialized();
+
+    StreamingOffset getInitialOffset();
+  }
+
+  private static class InitialOffsetStoreImpl implements InitialOffsetStore {
+    private final Table table;
+    private final FileIO io;
+    private final String initialOffsetLocation;
+
+    InitialOffsetStoreImpl(Table table, String checkpointLocation) {
+      this.table = table;
+      this.io = table.io();
+      this.initialOffsetLocation = new Path(checkpointLocation, 
"offsets/0").toString();

Review comment:
       I'm unresolving this so it is more obvious later if we refer back to 
this PR.

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -63,6 +63,19 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
     return ancestorIds(table.currentSnapshot(), table::snapshot);
   }
 
+  /**
+   * Traverses the history of the table's state and finds the oldest Snapshot.

Review comment:
       I think that "table's state" isn't clear enough. How about "history of 
the table's current snapshot" like the one below?

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -63,6 +63,19 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
     return ancestorIds(table.currentSnapshot(), table::snapshot);
   }
 
+  /**
+   * Traverses the history of the table's state and finds the oldest Snapshot.
+   * @return null if the table is empty, else the oldest Snapshot.

Review comment:
       What does "empty" mean? I think it means that there is no current 
snapshot, but it would be better to say that.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java
##########
@@ -62,20 +63,23 @@ static StreamingOffset fromJson(String json) {
 
     try {
       JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class);
-      // The version of StreamingOffset. The offset was created with a version 
number
-      // used to validate when deserializing from json string.
-      int version = JsonUtil.getInt(VERSION, node);
-      Preconditions.checkArgument(version == CURR_VERSION,
-              "Cannot parse offset JSON: offset version %s is not supported", 
version);
+      return fromJsonNode(node);
+    } catch (IOException e) {
+      throw new IllegalArgumentException(String.format("Failed to parse 
StreamingOffset from JSON string %s", json), e);

Review comment:
       I think this should also be `UncheckedIOException`.

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResumingStreamReadFromCheckpoint() throws IOException, 
TimeoutException, StreamingQueryException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+    File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+    final String tempView = "microBatchView";
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+          .trigger(Trigger.Once())
+          .option("checkpointLocation", writerCheckpoint.toString())
+          .foreachBatch((batchDF, batchId) -> {
+            batchDF.createOrReplaceGlobalTempView(tempView);
+          });
+
+      String globalTempView = "global_temp." + tempView;
+
+      List<SimpleRecord> processStreamOnEmptyIcebergTable = 
processMicroBatch(singleBatchWriter, globalTempView);
+      Assert.assertEquals(
+          Collections.emptyList(),
+          processStreamOnEmptyIcebergTable);
+
+      for (List<List<SimpleRecord>> expectedCheckpoint : 
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+        appendData(expectedCheckpoint, location);
+        table.refresh();
+
+        List<SimpleRecord> actualDataInCurrentMicroBatch = 
processMicroBatch(singleBatchWriter, globalTempView);
+        Assert.assertEquals(
+            
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+            actualDataInCurrentMicroBatch);
+      }
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParquetOrcAvroDataInOneTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+
+    // use partition buckets - so that single list<SimpleRecord> can generate 
multiple files per write
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+        new SimpleRecord(1, "one"),
+        new SimpleRecord(2, "two"),
+        new SimpleRecord(3, "three"));
+
+    List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+        new SimpleRecord(4, "four"),
+        new SimpleRecord(5, "five"));
+
+    List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+        new SimpleRecord(6, "six"),
+        new SimpleRecord(7, "seven"));
+
+    appendData(parquetFileRecords, location, "parquet");
+    appendData(orcFileRecords, location, "orc");
+    appendData(avroFileRecords, location, "avro");
+
+    table.refresh();
+
+
+    try {
+      Dataset<Row> ds = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(ds);
+      List<SimpleRecord> expected = Stream.concat(Stream.concat(
+          parquetFileRecords.stream(),
+          orcFileRecords.stream()),
+          avroFileRecords.stream())
+            .collect(Collectors.toList());
+      Assert.assertEquals(expected, actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamFromEmptyTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+      Assert.assertEquals(Collections.emptyList(), actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    // upgrade table to verison 2 - to facilitate creation of Snapshot of type 
OVERWRITE.
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    // fill table with some data
+    List<List<SimpleRecord>> dataAcrossSnapshots = 
getTestDataForMultipleSnapshots();
+    appendData(dataAcrossSnapshots, location);
+
+    table.refresh();
+
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "one") // id = 1
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
dataDeletes, deleteRowSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    // check pre-condition - that the above Delete file write - actually 
resulted in snapshot of type OVERWRITE
+    Assert.assertEquals(DataOperations.OVERWRITE, 
table.currentSnapshot().operation());
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      StreamingQuery streamingQuery = df.writeStream()
+          .format("memory")
+          .queryName("testtablewithoverwrites")
+          .outputMode(OutputMode.Append())
+          .start();
+
+      try {
+        streamingQuery.processAllAvailable();
+        Assert.assertTrue(false); // should be unreachable
+      } catch (Exception exception) {
+        Assert.assertTrue(exception instanceof StreamingQueryException);
+        Assert.assertTrue(((StreamingQueryException) exception).cause() 
instanceof IllegalStateException);
+      }
+
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    // fill table with some data
+    List<List<SimpleRecord>> dataAcrossSnapshots = 
getTestDataForMultipleSnapshots();
+    appendData(dataAcrossSnapshots, location);
+
+    table.refresh();
+
+    // this should create a snapshot with type Replace.
+    table.rewriteManifests()
+        .clusterBy(f -> 1)
+        .commit();
+
+    // check pre-condition
+    Assert.assertEquals(DataOperations.REPLACE, 
table.currentSnapshot().operation());
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      StreamingQuery streamingQuery = df.writeStream()
+          .format("memory")
+          .queryName("testtablewithreplace")
+          .outputMode(OutputMode.Append())
+          .start();
+
+      try {
+        streamingQuery.processAllAvailable();
+        Assert.assertTrue(false); // should be unreachable
+      } catch (Exception exception) {
+        Assert.assertTrue(exception instanceof StreamingQueryException);
+        Assert.assertTrue(((StreamingQueryException) exception).cause() 
instanceof IllegalStateException);
+      }
+
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("id").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    // fill table with some data
+    List<List<SimpleRecord>> dataAcrossSnapshots = 
getTestDataForMultipleSnapshots();
+    appendData(dataAcrossSnapshots, location);
+
+    table.refresh();
+
+    // this should create a snapshot with type delete.
+    table.newDelete()
+        .deleteFromRowFilter(Expressions.equal("id", 4))
+        .commit();
+
+    // check pre-condition - that the above newDelete operation on table 
resulted in Snapshot of Type DELETE.
+    Assert.assertEquals(DataOperations.DELETE, 
table.currentSnapshot().operation());
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      StreamingQuery streamingQuery = df.writeStream()
+          .format("memory")
+          .queryName("testtablewithdelete")
+          .outputMode(OutputMode.Append())
+          .start();
+
+      try {
+        streamingQuery.processAllAvailable();
+        Assert.assertTrue(false); // should be unreachable
+      } catch (Exception exception) {
+        Assert.assertTrue(exception instanceof StreamingQueryException);
+        Assert.assertTrue(((StreamingQueryException) exception).cause() 
instanceof IllegalStateException);
+      }
+
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  private static List<SimpleRecord> processMicroBatch(DataStreamWriter<Row> 
singleBatchWriter, String viewName)
+      throws TimeoutException, StreamingQueryException {
+    StreamingQuery streamingQuery = singleBatchWriter.start();
+    streamingQuery.awaitTermination();
+
+    return spark.sql(String.format("select * from %s", viewName))
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+  }
+
+  /**
+   * get test data - a list of records per snapshot
+   */
+  private static List<List<SimpleRecord>> getTestDataForMultipleSnapshots() {
+    return Lists.newArrayList(
+        Lists.newArrayList(
+            new SimpleRecord(1, "one"),
+            new SimpleRecord(2, "two"),
+            new SimpleRecord(3, "three")),
+        Lists.newArrayList(
+            new SimpleRecord(4, "four"),
+            new SimpleRecord(5, "five")),
+        Lists.newArrayList(
+            new SimpleRecord(6, "six"),
+            new SimpleRecord(7, "seven"))
+    );
+  }
+
+  /**
+   * appends each list as a Snapshot on the iceberg table at the given 
location.
+   * accepts a list of lists - each list representing data per snapshot.
+   */
+  private static void appendData(List<List<SimpleRecord>> data, File location) 
{
+    // generate multiple snapshots
+    for (List<SimpleRecord> l : data) {
+      appendData(l, location, "parquet");
+    }
+  }
+
+  private static void appendData(List<SimpleRecord> data, File location, 
String fileFormat) {
+    Dataset<Row> df = spark.createDataFrame(data, SimpleRecord.class);
+    df.select("id", "data").write()
+        .format("iceberg")
+        .option("write-format", fileFormat)
+        .mode("append")
+        .save(location.toString());
+  }
+
+  private static List<SimpleRecord> processStreamTillEnd(Dataset<Row> df) 
throws TimeoutException {
+    StreamingQuery streamingQuery = df.writeStream()
+        .format("memory")
+        .queryName("test12")
+        .outputMode(OutputMode.Append())
+        .start();
+    streamingQuery.processAllAvailable();
+    return spark.sql("select * from test12")
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+  }
+
+  /**
+   * gets test data - to be used for multiple write batches
+   * each batch inturn will have multiple snapshots
+   */
+  private static List<List<List<SimpleRecord>>> 
getTestDataForMultipleWritesWithMultipleSnapshots() {

Review comment:
       Why isn't this a static final variable?

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),

Review comment:
       Nit: continuation indents are 4 spaces (or 2 indents). Can you fix this 
file?

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +121,26 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
 
     return newFiles;
   }
+
+  /**
+   * Traverses the history of the table's current snapshot and finds the 
snapshot with the given snapshot id as its
+   * parent.
+   * @return null if the passed in snapshot is not present in the table, else 
the snapshot for which the given snapshot
+   * is the parent
+   * @throws IllegalArgumentException when the given SnapshotId is not found 
in the table

Review comment:
       Why is `SnapshotId` capitalized?

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +121,26 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
 
     return newFiles;
   }
+
+  /**
+   * Traverses the history of the table's current snapshot and finds the 
snapshot with the given snapshot id as its
+   * parent.
+   * @return null if the passed in snapshot is not present in the table, else 
the snapshot for which the given snapshot
+   * is the parent
+   * @throws IllegalArgumentException when the given SnapshotId is not found 
in the table
+   * @throws IllegalStateException when the given snapshot ID is not an 
ancestor of the current table state
+   */
+  public static Snapshot snapshotAfter(Table table, long snapshotId) {
+    Snapshot previousSnapshot = table.snapshot(snapshotId);

Review comment:
       Minor: You may want to rename this `parent` so that it is clear why the 
loop exits.

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +121,26 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
 
     return newFiles;
   }
+
+  /**
+   * Traverses the history of the table's current snapshot and finds the 
snapshot with the given snapshot id as its
+   * parent.
+   * @return null if the passed in snapshot is not present in the table, else 
the snapshot for which the given snapshot
+   * is the parent
+   * @throws IllegalArgumentException when the given SnapshotId is not found 
in the table
+   * @throws IllegalStateException when the given snapshot ID is not an 
ancestor of the current table state
+   */
+  public static Snapshot snapshotAfter(Table table, long snapshotId) {
+    Snapshot previousSnapshot = table.snapshot(snapshotId);
+    Preconditions.checkArgument(previousSnapshot != null,
+        "Invalid snapshotId: %s, snapshot not found in the table.", 
snapshotId);

Review comment:
       The ID isn't invalid, the problem is that the snapshot is not present. I 
think it should be `"Cannot find parent snapshot: %s"`

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +121,26 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
 
     return newFiles;
   }
+
+  /**
+   * Traverses the history of the table's current snapshot and finds the 
snapshot with the given snapshot id as its
+   * parent.
+   * @return null if the passed in snapshot is not present in the table, else 
the snapshot for which the given snapshot
+   * is the parent
+   * @throws IllegalArgumentException when the given SnapshotId is not found 
in the table
+   * @throws IllegalStateException when the given snapshot ID is not an 
ancestor of the current table state
+   */
+  public static Snapshot snapshotAfter(Table table, long snapshotId) {
+    Snapshot previousSnapshot = table.snapshot(snapshotId);
+    Preconditions.checkArgument(previousSnapshot != null,
+        "Invalid snapshotId: %s, snapshot not found in the table.", 
snapshotId);
+
+    Snapshot current = table.currentSnapshot();
+    while (previousSnapshot != null && current != null && 
previousSnapshot.snapshotId() != current.parentId()) {
+      current = table.snapshot(current.parentId());
+    }
+
+    Preconditions.checkState(current != null, "Cannot find next snapshot: as 
Snapshot %s expired", snapshotId);

Review comment:
       I don't think that this error message is correct. The problem is that 
traversing the current snapshot's history didn't find the parent snapshot. 
Since the parent is defined (we know this from the check above) we know that 
the parent is actually not an ancestor of the current table state. That's a 
more descriptive error message: `"Cannot find snapshot after %s: not an 
ancestor of the current snapshot"`

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +121,26 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
 
     return newFiles;
   }
+
+  /**
+   * Traverses the history of the table's current snapshot and finds the 
snapshot with the given snapshot id as its
+   * parent.
+   * @return null if the passed in snapshot is not present in the table, else 
the snapshot for which the given snapshot
+   * is the parent
+   * @throws IllegalArgumentException when the given SnapshotId is not found 
in the table
+   * @throws IllegalStateException when the given snapshot ID is not an 
ancestor of the current table state
+   */
+  public static Snapshot snapshotAfter(Table table, long snapshotId) {
+    Snapshot previousSnapshot = table.snapshot(snapshotId);

Review comment:
       Actually, I think it may be a good idea to change the loop to exit 
early. That would be a bit cleaner. And there's no need to keep the parent 
around at all, or to check that it is non-null every time through the loop.
   
   ```
   Preconditions.checkArgument(table.snapshot(snapshotId) != null, ...);
   
   Snapshot current = table.currentSnapshot();
   while (current != null) {
     if (current.parentId() == snapshotId) {
       return current;
     }
     current  = table.snapshot(current.parentId());
   }
   
   throw new IllegalStateException(...);
   ```

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {

Review comment:
       For tests, you can just throw `Exception` so that these lists don't get 
too long.

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);

Review comment:
       Looks like `processStreamTillEnd` can't really guarantee an ordering for 
the records that are returned, so Spark may do something later that breaks this 
test. I think that this is probably a good time to use the new `Assertions` 
helper that @nastra recommended so that it is easy to assert that the list that 
is returned as a specific size and contains all of the expected records:
   
   ```java
   
Assertions.assertThat(actual).containsExactlyInAnyOrder(Iterables.concat(expected));
   ```

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {

Review comment:
       It looks like this `finally` block should be an `@After` method instead?

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResumingStreamReadFromCheckpoint() throws IOException, 
TimeoutException, StreamingQueryException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+    File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());

Review comment:
       Is it possible to refactor table creation into a `@Before` method and 
drop the table in `@After`?

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResumingStreamReadFromCheckpoint() throws IOException, 
TimeoutException, StreamingQueryException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+    File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+    final String tempView = "microBatchView";
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+          .trigger(Trigger.Once())
+          .option("checkpointLocation", writerCheckpoint.toString())
+          .foreachBatch((batchDF, batchId) -> {
+            batchDF.createOrReplaceGlobalTempView(tempView);
+          });
+
+      String globalTempView = "global_temp." + tempView;
+
+      List<SimpleRecord> processStreamOnEmptyIcebergTable = 
processMicroBatch(singleBatchWriter, globalTempView);
+      Assert.assertEquals(
+          Collections.emptyList(),
+          processStreamOnEmptyIcebergTable);
+
+      for (List<List<SimpleRecord>> expectedCheckpoint : 
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+        appendData(expectedCheckpoint, location);
+        table.refresh();
+
+        List<SimpleRecord> actualDataInCurrentMicroBatch = 
processMicroBatch(singleBatchWriter, globalTempView);
+        Assert.assertEquals(
+            
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+            actualDataInCurrentMicroBatch);
+      }
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParquetOrcAvroDataInOneTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+
+    // use partition buckets - so that single list<SimpleRecord> can generate 
multiple files per write
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+        new SimpleRecord(1, "one"),
+        new SimpleRecord(2, "two"),
+        new SimpleRecord(3, "three"));
+
+    List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+        new SimpleRecord(4, "four"),
+        new SimpleRecord(5, "five"));
+
+    List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+        new SimpleRecord(6, "six"),
+        new SimpleRecord(7, "seven"));
+
+    appendData(parquetFileRecords, location, "parquet");
+    appendData(orcFileRecords, location, "orc");
+    appendData(avroFileRecords, location, "avro");
+
+    table.refresh();
+
+
+    try {
+      Dataset<Row> ds = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(ds);
+      List<SimpleRecord> expected = Stream.concat(Stream.concat(

Review comment:
       You may want to consider using `Iterables` instead of streams since that 
will work directly with the lists and is friendly with `Assertions`.

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResumingStreamReadFromCheckpoint() throws IOException, 
TimeoutException, StreamingQueryException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+    File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+    final String tempView = "microBatchView";
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+          .trigger(Trigger.Once())
+          .option("checkpointLocation", writerCheckpoint.toString())
+          .foreachBatch((batchDF, batchId) -> {
+            batchDF.createOrReplaceGlobalTempView(tempView);
+          });
+
+      String globalTempView = "global_temp." + tempView;
+
+      List<SimpleRecord> processStreamOnEmptyIcebergTable = 
processMicroBatch(singleBatchWriter, globalTempView);
+      Assert.assertEquals(
+          Collections.emptyList(),
+          processStreamOnEmptyIcebergTable);
+
+      for (List<List<SimpleRecord>> expectedCheckpoint : 
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+        appendData(expectedCheckpoint, location);
+        table.refresh();
+
+        List<SimpleRecord> actualDataInCurrentMicroBatch = 
processMicroBatch(singleBatchWriter, globalTempView);
+        Assert.assertEquals(
+            
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+            actualDataInCurrentMicroBatch);
+      }
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParquetOrcAvroDataInOneTable() throws IOException, 
TimeoutException {

Review comment:
       It's okay to have this, but I don't see much of a benefit to testing 
this. File formats are orthogonal to the streaming that is done in this PR.

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResumingStreamReadFromCheckpoint() throws IOException, 
TimeoutException, StreamingQueryException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+    File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+    final String tempView = "microBatchView";
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+          .trigger(Trigger.Once())
+          .option("checkpointLocation", writerCheckpoint.toString())
+          .foreachBatch((batchDF, batchId) -> {
+            batchDF.createOrReplaceGlobalTempView(tempView);
+          });
+
+      String globalTempView = "global_temp." + tempView;
+
+      List<SimpleRecord> processStreamOnEmptyIcebergTable = 
processMicroBatch(singleBatchWriter, globalTempView);
+      Assert.assertEquals(
+          Collections.emptyList(),
+          processStreamOnEmptyIcebergTable);
+
+      for (List<List<SimpleRecord>> expectedCheckpoint : 
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+        appendData(expectedCheckpoint, location);
+        table.refresh();
+
+        List<SimpleRecord> actualDataInCurrentMicroBatch = 
processMicroBatch(singleBatchWriter, globalTempView);
+        Assert.assertEquals(
+            
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+            actualDataInCurrentMicroBatch);
+      }
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParquetOrcAvroDataInOneTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+
+    // use partition buckets - so that single list<SimpleRecord> can generate 
multiple files per write
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+        new SimpleRecord(1, "one"),
+        new SimpleRecord(2, "two"),
+        new SimpleRecord(3, "three"));
+
+    List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+        new SimpleRecord(4, "four"),
+        new SimpleRecord(5, "five"));
+
+    List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+        new SimpleRecord(6, "six"),
+        new SimpleRecord(7, "seven"));
+
+    appendData(parquetFileRecords, location, "parquet");
+    appendData(orcFileRecords, location, "orc");
+    appendData(avroFileRecords, location, "avro");
+
+    table.refresh();
+
+
+    try {
+      Dataset<Row> ds = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(ds);
+      List<SimpleRecord> expected = Stream.concat(Stream.concat(
+          parquetFileRecords.stream(),
+          orcFileRecords.stream()),
+          avroFileRecords.stream())
+            .collect(Collectors.toList());
+      Assert.assertEquals(expected, actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamFromEmptyTable() throws IOException, 
TimeoutException {

Review comment:
       Can you add a new test case (or extend this one) to write a single batch 
and make sure that a table that starts with no snapshot works as expected after 
data is added?

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResumingStreamReadFromCheckpoint() throws IOException, 
TimeoutException, StreamingQueryException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+    File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+    final String tempView = "microBatchView";
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+          .trigger(Trigger.Once())
+          .option("checkpointLocation", writerCheckpoint.toString())
+          .foreachBatch((batchDF, batchId) -> {
+            batchDF.createOrReplaceGlobalTempView(tempView);
+          });
+
+      String globalTempView = "global_temp." + tempView;
+
+      List<SimpleRecord> processStreamOnEmptyIcebergTable = 
processMicroBatch(singleBatchWriter, globalTempView);
+      Assert.assertEquals(
+          Collections.emptyList(),
+          processStreamOnEmptyIcebergTable);
+
+      for (List<List<SimpleRecord>> expectedCheckpoint : 
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+        appendData(expectedCheckpoint, location);
+        table.refresh();
+
+        List<SimpleRecord> actualDataInCurrentMicroBatch = 
processMicroBatch(singleBatchWriter, globalTempView);
+        Assert.assertEquals(
+            
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+            actualDataInCurrentMicroBatch);
+      }
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParquetOrcAvroDataInOneTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+
+    // use partition buckets - so that single list<SimpleRecord> can generate 
multiple files per write
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+        new SimpleRecord(1, "one"),
+        new SimpleRecord(2, "two"),
+        new SimpleRecord(3, "three"));
+
+    List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+        new SimpleRecord(4, "four"),
+        new SimpleRecord(5, "five"));
+
+    List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+        new SimpleRecord(6, "six"),
+        new SimpleRecord(7, "seven"));
+
+    appendData(parquetFileRecords, location, "parquet");
+    appendData(orcFileRecords, location, "orc");
+    appendData(avroFileRecords, location, "avro");
+
+    table.refresh();
+
+
+    try {
+      Dataset<Row> ds = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(ds);
+      List<SimpleRecord> expected = Stream.concat(Stream.concat(
+          parquetFileRecords.stream(),
+          orcFileRecords.stream()),
+          avroFileRecords.stream())
+            .collect(Collectors.toList());
+      Assert.assertEquals(expected, actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamFromEmptyTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+      Assert.assertEquals(Collections.emptyList(), actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    // upgrade table to verison 2 - to facilitate creation of Snapshot of type 
OVERWRITE.
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    // fill table with some data
+    List<List<SimpleRecord>> dataAcrossSnapshots = 
getTestDataForMultipleSnapshots();
+    appendData(dataAcrossSnapshots, location);
+
+    table.refresh();
+
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "one") // id = 1
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
dataDeletes, deleteRowSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    // check pre-condition - that the above Delete file write - actually 
resulted in snapshot of type OVERWRITE
+    Assert.assertEquals(DataOperations.OVERWRITE, 
table.currentSnapshot().operation());
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      StreamingQuery streamingQuery = df.writeStream()
+          .format("memory")
+          .queryName("testtablewithoverwrites")
+          .outputMode(OutputMode.Append())
+          .start();
+
+      try {
+        streamingQuery.processAllAvailable();
+        Assert.assertTrue(false); // should be unreachable
+      } catch (Exception exception) {

Review comment:
       Please use `AssertHelpers` or `Assertions` for this instead of coding 
the try/catch in each case.

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResumingStreamReadFromCheckpoint() throws IOException, 
TimeoutException, StreamingQueryException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+    File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+    final String tempView = "microBatchView";
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+          .trigger(Trigger.Once())
+          .option("checkpointLocation", writerCheckpoint.toString())
+          .foreachBatch((batchDF, batchId) -> {
+            batchDF.createOrReplaceGlobalTempView(tempView);
+          });
+
+      String globalTempView = "global_temp." + tempView;
+
+      List<SimpleRecord> processStreamOnEmptyIcebergTable = 
processMicroBatch(singleBatchWriter, globalTempView);
+      Assert.assertEquals(
+          Collections.emptyList(),
+          processStreamOnEmptyIcebergTable);
+
+      for (List<List<SimpleRecord>> expectedCheckpoint : 
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+        appendData(expectedCheckpoint, location);
+        table.refresh();
+
+        List<SimpleRecord> actualDataInCurrentMicroBatch = 
processMicroBatch(singleBatchWriter, globalTempView);
+        Assert.assertEquals(
+            
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+            actualDataInCurrentMicroBatch);
+      }
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParquetOrcAvroDataInOneTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+
+    // use partition buckets - so that single list<SimpleRecord> can generate 
multiple files per write
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+        new SimpleRecord(1, "one"),
+        new SimpleRecord(2, "two"),
+        new SimpleRecord(3, "three"));
+
+    List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+        new SimpleRecord(4, "four"),
+        new SimpleRecord(5, "five"));
+
+    List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+        new SimpleRecord(6, "six"),
+        new SimpleRecord(7, "seven"));
+
+    appendData(parquetFileRecords, location, "parquet");
+    appendData(orcFileRecords, location, "orc");
+    appendData(avroFileRecords, location, "avro");
+
+    table.refresh();
+
+
+    try {
+      Dataset<Row> ds = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(ds);
+      List<SimpleRecord> expected = Stream.concat(Stream.concat(
+          parquetFileRecords.stream(),
+          orcFileRecords.stream()),
+          avroFileRecords.stream())
+            .collect(Collectors.toList());
+      Assert.assertEquals(expected, actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamFromEmptyTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+      Assert.assertEquals(Collections.emptyList(), actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    // upgrade table to verison 2 - to facilitate creation of Snapshot of type 
OVERWRITE.
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    // fill table with some data
+    List<List<SimpleRecord>> dataAcrossSnapshots = 
getTestDataForMultipleSnapshots();
+    appendData(dataAcrossSnapshots, location);
+
+    table.refresh();
+
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "one") // id = 1
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
dataDeletes, deleteRowSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    // check pre-condition - that the above Delete file write - actually 
resulted in snapshot of type OVERWRITE
+    Assert.assertEquals(DataOperations.OVERWRITE, 
table.currentSnapshot().operation());
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      StreamingQuery streamingQuery = df.writeStream()
+          .format("memory")
+          .queryName("testtablewithoverwrites")
+          .outputMode(OutputMode.Append())
+          .start();
+
+      try {
+        streamingQuery.processAllAvailable();
+        Assert.assertTrue(false); // should be unreachable
+      } catch (Exception exception) {
+        Assert.assertTrue(exception instanceof StreamingQueryException);
+        Assert.assertTrue(((StreamingQueryException) exception).cause() 
instanceof IllegalStateException);
+      }
+
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    // fill table with some data
+    List<List<SimpleRecord>> dataAcrossSnapshots = 
getTestDataForMultipleSnapshots();
+    appendData(dataAcrossSnapshots, location);
+
+    table.refresh();
+
+    // this should create a snapshot with type Replace.
+    table.rewriteManifests()
+        .clusterBy(f -> 1)
+        .commit();
+
+    // check pre-condition
+    Assert.assertEquals(DataOperations.REPLACE, 
table.currentSnapshot().operation());
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      StreamingQuery streamingQuery = df.writeStream()
+          .format("memory")
+          .queryName("testtablewithreplace")
+          .outputMode(OutputMode.Append())
+          .start();
+
+      try {
+        streamingQuery.processAllAvailable();
+        Assert.assertTrue(false); // should be unreachable
+      } catch (Exception exception) {
+        Assert.assertTrue(exception instanceof StreamingQueryException);
+        Assert.assertTrue(((StreamingQueryException) exception).cause() 
instanceof IllegalStateException);
+      }
+
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("id").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    // fill table with some data
+    List<List<SimpleRecord>> dataAcrossSnapshots = 
getTestDataForMultipleSnapshots();
+    appendData(dataAcrossSnapshots, location);
+
+    table.refresh();
+
+    // this should create a snapshot with type delete.
+    table.newDelete()
+        .deleteFromRowFilter(Expressions.equal("id", 4))
+        .commit();
+
+    // check pre-condition - that the above newDelete operation on table 
resulted in Snapshot of Type DELETE.
+    Assert.assertEquals(DataOperations.DELETE, 
table.currentSnapshot().operation());
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      StreamingQuery streamingQuery = df.writeStream()
+          .format("memory")
+          .queryName("testtablewithdelete")
+          .outputMode(OutputMode.Append())
+          .start();
+
+      try {
+        streamingQuery.processAllAvailable();
+        Assert.assertTrue(false); // should be unreachable
+      } catch (Exception exception) {
+        Assert.assertTrue(exception instanceof StreamingQueryException);
+        Assert.assertTrue(((StreamingQueryException) exception).cause() 
instanceof IllegalStateException);
+      }
+
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  private static List<SimpleRecord> processMicroBatch(DataStreamWriter<Row> 
singleBatchWriter, String viewName)
+      throws TimeoutException, StreamingQueryException {
+    StreamingQuery streamingQuery = singleBatchWriter.start();
+    streamingQuery.awaitTermination();
+
+    return spark.sql(String.format("select * from %s", viewName))
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+  }
+
+  /**
+   * get test data - a list of records per snapshot
+   */
+  private static List<List<SimpleRecord>> getTestDataForMultipleSnapshots() {
+    return Lists.newArrayList(
+        Lists.newArrayList(
+            new SimpleRecord(1, "one"),
+            new SimpleRecord(2, "two"),
+            new SimpleRecord(3, "three")),
+        Lists.newArrayList(
+            new SimpleRecord(4, "four"),
+            new SimpleRecord(5, "five")),
+        Lists.newArrayList(
+            new SimpleRecord(6, "six"),
+            new SimpleRecord(7, "seven"))
+    );
+  }
+
+  /**
+   * appends each list as a Snapshot on the iceberg table at the given 
location.
+   * accepts a list of lists - each list representing data per snapshot.
+   */
+  private static void appendData(List<List<SimpleRecord>> data, File location) 
{
+    // generate multiple snapshots
+    for (List<SimpleRecord> l : data) {
+      appendData(l, location, "parquet");
+    }
+  }
+
+  private static void appendData(List<SimpleRecord> data, File location, 
String fileFormat) {
+    Dataset<Row> df = spark.createDataFrame(data, SimpleRecord.class);
+    df.select("id", "data").write()
+        .format("iceberg")
+        .option("write-format", fileFormat)
+        .mode("append")
+        .save(location.toString());
+  }
+
+  private static List<SimpleRecord> processStreamTillEnd(Dataset<Row> df) 
throws TimeoutException {

Review comment:
       Looks like this method name is slightly misleading because it isn't 
processing and ending the stream, it is processing all of the available data. 
How about `processAvailable`?

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResumingStreamReadFromCheckpoint() throws IOException, 
TimeoutException, StreamingQueryException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+    File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+    final String tempView = "microBatchView";
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+          .trigger(Trigger.Once())

Review comment:
       I think that the intent is that `Trigger.Once()` combined with 
`awaitTermination` in `processMicroBatch` will stop the stream, is that 
correct? I think that it would be helpful to have a comment above this 
explaining how this is causing Spark to checkpoint and start new streams for 
each call to `processMicroBatch`.

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResumingStreamReadFromCheckpoint() throws IOException, 
TimeoutException, StreamingQueryException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+    File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+    final String tempView = "microBatchView";
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+          .trigger(Trigger.Once())
+          .option("checkpointLocation", writerCheckpoint.toString())
+          .foreachBatch((batchDF, batchId) -> {
+            batchDF.createOrReplaceGlobalTempView(tempView);
+          });
+
+      String globalTempView = "global_temp." + tempView;
+
+      List<SimpleRecord> processStreamOnEmptyIcebergTable = 
processMicroBatch(singleBatchWriter, globalTempView);
+      Assert.assertEquals(
+          Collections.emptyList(),
+          processStreamOnEmptyIcebergTable);
+
+      for (List<List<SimpleRecord>> expectedCheckpoint : 
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+        appendData(expectedCheckpoint, location);
+        table.refresh();
+
+        List<SimpleRecord> actualDataInCurrentMicroBatch = 
processMicroBatch(singleBatchWriter, globalTempView);
+        Assert.assertEquals(
+            
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+            actualDataInCurrentMicroBatch);
+      }
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParquetOrcAvroDataInOneTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+
+    // use partition buckets - so that single list<SimpleRecord> can generate 
multiple files per write
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+        new SimpleRecord(1, "one"),
+        new SimpleRecord(2, "two"),
+        new SimpleRecord(3, "three"));
+
+    List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+        new SimpleRecord(4, "four"),
+        new SimpleRecord(5, "five"));
+
+    List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+        new SimpleRecord(6, "six"),
+        new SimpleRecord(7, "seven"));
+
+    appendData(parquetFileRecords, location, "parquet");
+    appendData(orcFileRecords, location, "orc");
+    appendData(avroFileRecords, location, "avro");
+
+    table.refresh();
+
+
+    try {
+      Dataset<Row> ds = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(ds);
+      List<SimpleRecord> expected = Stream.concat(Stream.concat(
+          parquetFileRecords.stream(),
+          orcFileRecords.stream()),
+          avroFileRecords.stream())
+            .collect(Collectors.toList());
+      Assert.assertEquals(expected, actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamFromEmptyTable() throws IOException, 
TimeoutException {

Review comment:
       Nevermind, I think that the checkpoint test covers this.

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+          optional(1, "id", Types.IntegerType.get()),
+          optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreamingRead3.spark = SparkSession.builder()
+            .master("local[2]")
+            .config("spark.sql.shuffle.partitions", 4)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+    TestStructuredStreamingRead3.spark = null;
+    currentSpark.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+    appendData(expected, location);
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+              .format("iceberg")
+              .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+
+      Assert.assertEquals(
+          expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+          actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResumingStreamReadFromCheckpoint() throws IOException, 
TimeoutException, StreamingQueryException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+    File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+    final String tempView = "microBatchView";
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+          .trigger(Trigger.Once())
+          .option("checkpointLocation", writerCheckpoint.toString())
+          .foreachBatch((batchDF, batchId) -> {
+            batchDF.createOrReplaceGlobalTempView(tempView);
+          });
+
+      String globalTempView = "global_temp." + tempView;
+
+      List<SimpleRecord> processStreamOnEmptyIcebergTable = 
processMicroBatch(singleBatchWriter, globalTempView);
+      Assert.assertEquals(
+          Collections.emptyList(),
+          processStreamOnEmptyIcebergTable);
+
+      for (List<List<SimpleRecord>> expectedCheckpoint : 
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+        appendData(expectedCheckpoint, location);
+        table.refresh();
+
+        List<SimpleRecord> actualDataInCurrentMicroBatch = 
processMicroBatch(singleBatchWriter, globalTempView);
+        Assert.assertEquals(
+            
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+            actualDataInCurrentMicroBatch);
+      }
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParquetOrcAvroDataInOneTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+
+    // use partition buckets - so that single list<SimpleRecord> can generate 
multiple files per write
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+        new SimpleRecord(1, "one"),
+        new SimpleRecord(2, "two"),
+        new SimpleRecord(3, "three"));
+
+    List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+        new SimpleRecord(4, "four"),
+        new SimpleRecord(5, "five"));
+
+    List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+        new SimpleRecord(6, "six"),
+        new SimpleRecord(7, "seven"));
+
+    appendData(parquetFileRecords, location, "parquet");
+    appendData(orcFileRecords, location, "orc");
+    appendData(avroFileRecords, location, "avro");
+
+    table.refresh();
+
+
+    try {
+      Dataset<Row> ds = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+      List<SimpleRecord> actual = processStreamTillEnd(ds);
+      List<SimpleRecord> expected = Stream.concat(Stream.concat(
+          parquetFileRecords.stream(),
+          orcFileRecords.stream()),
+          avroFileRecords.stream())
+            .collect(Collectors.toList());
+      Assert.assertEquals(expected, actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamFromEmptyTable() throws IOException, 
TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    table.refresh();
+
+    try {
+      Dataset<Row> df = spark.readStream()
+          .format("iceberg")
+          .load(location.toString());
+
+      List<SimpleRecord> actual = processStreamTillEnd(df);
+      Assert.assertEquals(Collections.emptyList(), actual);
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws 
IOException, TimeoutException {
+    File parent = temp.newFolder("parent");
+    File location = new File(parent, "test-table");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 
3).build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    // upgrade table to verison 2 - to facilitate creation of Snapshot of type 
OVERWRITE.
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    // fill table with some data
+    List<List<SimpleRecord>> dataAcrossSnapshots = 
getTestDataForMultipleSnapshots();
+    appendData(dataAcrossSnapshots, location);
+
+    table.refresh();
+
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(

Review comment:
       I think you could just run a `DELETE FROM` or `MERGE INTO` SQL query.

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {

Review comment:
       Could you create a variant of these tests based on 
`SparkCatalogTestBase`? In addition to testing tables that are identified by 
location, we should also validate that everything works with tables tracked in 
catalogs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to