SreeramGarlapati commented on a change in pull request #2660:
URL: https://github.com/apache/iceberg/pull/2660#discussion_r650082097



##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final InitialOffsetStore initialOffsetStore;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+    this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, null))
+        .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT));
+    this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, null))
+        .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT));
+    this.splitOpenFileCost = Optional.ofNullable(
+        Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, 
null))
+        .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_OPEN_FILE_COST,
+            SPLIT_OPEN_FILE_COST_DEFAULT));
+    this.initialOffsetStore = InitialOffsetStore.getInstance(table.io(), 
checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(table, initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    String addedFilesValue = 
latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP);
+    long addedFiles = addedFilesValue == null ?
+        Iterables.size(latestSnapshot.addedFiles()) :
+        Long.parseLong(addedFilesValue);
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(),
+        addedFiles,
+        scanAllFiles);
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions(Offset start, Offset end) {
+    if (end.equals(StreamingOffset.START_OFFSET)) {
+      return new InputPartition[0];
+    }
+
+    // broadcast the table metadata as input partitions will be sent to 
executors
+    Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(SerializableTable.copyOf(table));
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+    Preconditions.checkState(
+        end instanceof StreamingOffset,
+        "The end offset passed to planInputPartitions() is not an instance of 
StreamingOffset.");
+
+    Preconditions.checkState(
+        start instanceof StreamingOffset,
+        "The start offset passed to planInputPartitions() is not an instance 
of StreamingOffset.");
+
+    StreamingOffset endOffset = (StreamingOffset) end;
+    StreamingOffset startOffset = (StreamingOffset) start;
+
+    List<FileScanTask> fileScanTasks = getFileScanTasks(startOffset, 
endOffset);
+
+    CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+        CloseableIterable.withNoopClose(fileScanTasks),
+        splitSize);
+    List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, 
splitOpenFileCost));
+    InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+    for (int i = 0; i < combinedScanTasks.size(); i++) {
+      readTasks[i] = new ReadTask(
+          combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
+          caseSensitive, localityPreferred);
+    }
+
+    return readTasks;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    int batchSizeValueToDisableColumnarReads = 0;
+    return new ReaderFactory(batchSizeValueToDisableColumnarReads);
+  }
+
+  @Override
+  public Offset initialOffset() {
+    return initialOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  private static StreamingOffset getOrWriteInitialOffset(Table table, 
InitialOffsetStore initialOffsetStore) {
+    if (initialOffsetStore.isOffsetStoreInitialized()) {
+      return initialOffsetStore.getInitialOffset();
+    }
+
+    table.refresh();
+    StreamingOffset offset = table.currentSnapshot() == null ?
+        StreamingOffset.START_OFFSET :
+        new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 
0, true);
+    initialOffsetStore.addInitialOffset(offset);
+
+    return offset;
+  }
+
+  private List<FileScanTask> getFileScanTasks(StreamingOffset startOffset, 
StreamingOffset endOffset) {

Review comment:
       renamed to `calculate`. 
   naming this was a little bit of a head scratcher :) calculate / resolve / 
plan all fitted the bill - but finally felt calculate is good enough to tell 
give a gist of whats going on in this method..

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
##########
@@ -62,6 +63,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkBatchScan.class);
 
   private final JavaSparkContext sparkContext;
+  private final SparkSession spark;

Review comment:
       fixed this.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
##########
@@ -108,6 +111,12 @@ public Batch toBatch() {
     return this;
   }
 
+  @Override
+  public MicroBatchStream toMicroBatchStream(String checkpointLocation) {

Review comment:
       yes. I guess that is the answer for my question here too: 
https://github.com/apache/iceberg/pull/2611#discussion_r635010805

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
##########
@@ -213,10 +222,10 @@ public String description() {
     return String.format("%s [filters=%s]", table, filters);
   }
 
-  private static class ReaderFactory implements PartitionReaderFactory {
+  public static class ReaderFactory implements PartitionReaderFactory {

Review comment:
       @rdblue - addressed the visibility feedback.
   
   SparkBatchScan class could use a bit of Refactoring - I will follow this UP 
with a refactor only pr. Rationale for that is:
   * changes to the codebase are easily auditable
   * mix of refactor and an actual code change - will be lost - very hard for 
any code review to make sense out of the diff at that point.
   
   with that, I am resolving this comment. Please feel free to reopen if you 
feel we must refactor in the current PR.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
##########
@@ -213,10 +222,10 @@ public String description() {
     return String.format("%s [filters=%s]", table, filters);
   }
 
-  private static class ReaderFactory implements PartitionReaderFactory {
+  public static class ReaderFactory implements PartitionReaderFactory {

Review comment:
       @rdblue - addressed the visibility feedback.
   
   SparkBatchScan class could use a bit of Refactoring - I will follow this UP 
with a refactor only pr & address this suggestion as well. Rationale for 
separating this out into REFACTOR ONLY PRs is that:
   * changes to the codebase are easily auditable
   * cognitive overload on reviewers: mix of refactor and an actual code change 
- will be very hard for any code reviewer to make sense out of the diff at that 
point.
   
   with that, I am resolving this comment. Please feel free to reopen if you 
feel a strong need to refactor in the current PR.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
##########
@@ -108,6 +109,12 @@ public Batch toBatch() {
     return this;
   }
 
+  @Override
+  public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
+    return new SparkMicroBatchStream(

Review comment:
       good point @aokolnychyi 
   `stats estimation` - is something that i felt - will evolve differently for 
MicroBatchStreaming vs Batch reads - as Batch reads only rely on the latest 
snapshot - where as the data returned by MicroBatchStreaming is multiple 
Snapshots. Given the cognitive overload of the current PR - & the sheer number 
of things to drive consensus on - I want to take that in a different PR. Added 
to the **Whats Next** section - of this PR. Once the PR - concludes - I will 
file those as issues & track. Pl. lemme know if that makes sense.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);

Review comment:
       Given that 
   1. this is **Streaming** - & I envision that most of the use cases would be 
high traffic tables in Streaming & could result in chatty logs
   2. spark already logs the batches returned checkpointed etc. 
   - I wasn't adding the logs unless I feel a strong need.
   
   Can you pl. point me to specific places that you feel will be helpful.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final InitialOffsetStore initialOffsetStore;

Review comment:
       fixed this.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final InitialOffsetStore initialOffsetStore;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+    this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, null))

Review comment:
       nice, adopted that formatting.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final InitialOffsetStore initialOffsetStore;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+    this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, null))
+        .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT));
+    this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, null))
+        .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT));
+    this.splitOpenFileCost = Optional.ofNullable(
+        Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, 
null))
+        .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_OPEN_FILE_COST,
+            SPLIT_OPEN_FILE_COST_DEFAULT));
+    this.initialOffsetStore = InitialOffsetStore.getInstance(table, 
checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?
+        latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) :
+        latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP);
+
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), positionValue != null ? 
Long.parseLong(positionValue) : 0, scanAllFiles);
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions(Offset start, Offset end) {
+    if (end.equals(StreamingOffset.START_OFFSET)) {
+      return new InputPartition[0];
+    }
+
+    // broadcast the table metadata as input partitions will be sent to 
executors
+    Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(SerializableTable.copyOf(table));
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+    Preconditions.checkState(
+        end instanceof StreamingOffset,
+        "The end offset passed to planInputPartitions() is not an instance of 
StreamingOffset.");
+
+    Preconditions.checkState(
+        start instanceof StreamingOffset,
+        "The start offset passed to planInputPartitions() is not an instance 
of StreamingOffset.");
+
+    StreamingOffset endOffset = (StreamingOffset) end;
+    StreamingOffset startOffset = (StreamingOffset) start;
+
+    List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, 
endOffset);
+
+    CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+        CloseableIterable.withNoopClose(fileScanTasks),
+        splitSize);
+    List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, 
splitOpenFileCost));
+    InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+    for (int i = 0; i < combinedScanTasks.size(); i++) {
+      readTasks[i] = new ReadTask(
+          combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
+          caseSensitive, localityPreferred);
+    }
+
+    return readTasks;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory(0);
+  }
+
+  @Override
+  public Offset initialOffset() {
+    return initialOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  private static StreamingOffset getOrWriteInitialOffset(InitialOffsetStore 
initialOffsetStore) {
+    if (initialOffsetStore.isOffsetStoreInitialized()) {
+      return initialOffsetStore.getInitialOffset();
+    }
+
+    return initialOffsetStore.addInitialOffset();
+  }
+
+  private List<FileScanTask> calculateFileScanTasks(StreamingOffset 
startOffset, StreamingOffset endOffset) {
+    List<FileScanTask> fileScanTasks = new ArrayList<>();
+    MicroBatch latestMicroBatch = null;
+    StreamingOffset batchStartOffset = 
StreamingOffset.START_OFFSET.equals(startOffset) ?
+        new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 
0, false) :
+        startOffset;
+
+    do {
+      final StreamingOffset currentOffset =
+          latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
+          getNextAvailableSnapshot(latestMicroBatch.snapshotId()) :
+              batchStartOffset;
+
+      latestMicroBatch = 
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+          .caseSensitive(caseSensitive)
+          .specsById(table.specs())
+          .generate(currentOffset.position(), Long.MAX_VALUE, 
currentOffset.shouldScanAllFiles());
+
+      fileScanTasks.addAll(latestMicroBatch.tasks());
+    } while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
+
+    return fileScanTasks;
+  }
+
+  private StreamingOffset getNextAvailableSnapshot(long snapshotId) {
+    Snapshot previousSnapshot = table.snapshot(snapshotId);
+    Snapshot pointer = table.currentSnapshot();
+    while (pointer != null && previousSnapshot.snapshotId() != 
pointer.parentId()) {
+      
Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND),
+          "Encountered Snapshot DataOperation other than APPEND.");
+
+      pointer = table.snapshot(pointer.parentId());
+    }
+
+    Preconditions.checkState(pointer != null,
+        "Cannot read data from snapshot which has already expired: %s", 
snapshotId);
+
+    return new StreamingOffset(pointer.snapshotId(), 0L, false);
+  }
+
+  interface InitialOffsetStore {

Review comment:
       this `interface` really is serving one purpose - popping out those 
specific methods/operations/expectations from the InitialOffsetStore. the 
underlying impl. class could have several methods within it - but this 
interface is what will be used in our code.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?
+        latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) :
+        latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP);
+
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), positionValue != null ? 
Long.parseLong(positionValue) : 0, scanAllFiles);
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions(Offset start, Offset end) {
+    if (end.equals(StreamingOffset.START_OFFSET)) {
+      return new InputPartition[0];
+    }
+
+    // broadcast the table metadata as input partitions will be sent to 
executors
+    Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(SerializableTable.copyOf(table));
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+    Preconditions.checkState(
+        end instanceof StreamingOffset,
+        "The end offset passed to planInputPartitions() is not an instance of 
StreamingOffset.");
+
+    Preconditions.checkState(
+        start instanceof StreamingOffset,
+        "The start offset passed to planInputPartitions() is not an instance 
of StreamingOffset.");
+
+    StreamingOffset endOffset = (StreamingOffset) end;
+    StreamingOffset startOffset = (StreamingOffset) start;
+
+    List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, 
endOffset);
+
+    CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+        CloseableIterable.withNoopClose(fileScanTasks),
+        splitSize);
+    List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, 
splitOpenFileCost));
+    InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+    for (int i = 0; i < combinedScanTasks.size(); i++) {
+      readTasks[i] = new ReadTask(
+          combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
+          caseSensitive, localityPreferred);
+    }
+
+    return readTasks;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory(0);
+  }
+
+  @Override
+  public Offset initialOffset() {
+    return initialOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {

Review comment:
       yes, that is correct.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
##########
@@ -108,6 +111,12 @@ public Batch toBatch() {
     return this;
   }
 
+  @Override
+  public MicroBatchStream toMicroBatchStream(String checkpointLocation) {

Review comment:
       added this to my - **Next Steps** list in the PR description. Once, the 
PR concludes - I will create Issues for these.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {

Review comment:
       yes, this is a limitation with spark's current `MicroBatchStream` 
interface. it is asking the streaming source only for 2 values: `latest` and 
`initial` - & is building a MicroBatch out of it. let me explore the limit API 
a little bit more - but, yes this is very good call out - we must leverage the 
limit API - if it solves that problem - as soon as we switch to 3.1.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?
+        latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) :
+        latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP);
+
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), positionValue != null ? 
Long.parseLong(positionValue) : 0, scanAllFiles);

Review comment:
       This is currently unused - but per my debugging - Snapshots of type 
`Delete`/`Replace` - doesn't set `ADDED_FILES_PROP` - at which point it 
defaults to 0 - ie., there are no files to read.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {

Review comment:
       this is one of the primary reasons why I added `rate limiting` - as one 
of the items to address - in the **Next Steps** section - in the PR 
description. I will add this special case - as I would imagine a lot of users 
out there will run into this.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
##########
@@ -108,6 +109,12 @@ public Batch toBatch() {
     return this;
   }
 
+  @Override
+  public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
+    return new SparkMicroBatchStream(

Review comment:
       I am resolving the comments - the ones which I felt - where we are able 
to drive reasonable consensus or tracked in my PR **What next** section - so 
that - **the ones that need more discussion will pop out**. Pl. definitely 
reopen or start a new comment - if you feel - the need for more discussion 
right here.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?

Review comment:
        @aokolnychyi - is the concern here about - that the value being `NOT 
PRESENT` in `Snapshot.summary()`? or is it that the `value is present - but 
could be wrong`?

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?
+        latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) :
+        latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP);
+
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), positionValue != null ? 
Long.parseLong(positionValue) : 0, scanAllFiles);

Review comment:
       I am resolving the comments - the ones which I felt - where we are able 
to drive reasonable consensus or tracked in my PR **What next** section - so 
that - **the ones that need more discussion will pop out**. Pl. definitely 
reopen or start a new comment - if you feel - the need for more discussion 
right here.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?
+        latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) :
+        latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP);
+
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), positionValue != null ? 
Long.parseLong(positionValue) : 0, scanAllFiles);
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions(Offset start, Offset end) {
+    if (end.equals(StreamingOffset.START_OFFSET)) {
+      return new InputPartition[0];
+    }
+
+    // broadcast the table metadata as input partitions will be sent to 
executors
+    Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(SerializableTable.copyOf(table));
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+    Preconditions.checkState(
+        end instanceof StreamingOffset,
+        "The end offset passed to planInputPartitions() is not an instance of 
StreamingOffset.");
+
+    Preconditions.checkState(
+        start instanceof StreamingOffset,
+        "The start offset passed to planInputPartitions() is not an instance 
of StreamingOffset.");
+
+    StreamingOffset endOffset = (StreamingOffset) end;
+    StreamingOffset startOffset = (StreamingOffset) start;
+
+    List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, 
endOffset);
+
+    CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+        CloseableIterable.withNoopClose(fileScanTasks),
+        splitSize);
+    List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, 
splitOpenFileCost));
+    InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+    for (int i = 0; i < combinedScanTasks.size(); i++) {
+      readTasks[i] = new ReadTask(
+          combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
+          caseSensitive, localityPreferred);
+    }
+
+    return readTasks;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory(0);
+  }
+
+  @Override
+  public Offset initialOffset() {
+    return initialOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {

Review comment:
       I am resolving the comments - the ones which I felt - where we are able 
to drive reasonable consensus or tracked in my PR **What next** section - so 
that - **the ones that need more discussion will pop out**. Pl. definitely 
reopen or start a new comment - if you feel - the need for more discussion 
right here.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final InitialOffsetStore initialOffsetStore;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+    this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, null))
+        .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT));
+    this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, null))
+        .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT));
+    this.splitOpenFileCost = Optional.ofNullable(
+        Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, 
null))
+        .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_OPEN_FILE_COST,
+            SPLIT_OPEN_FILE_COST_DEFAULT));
+    this.initialOffsetStore = InitialOffsetStore.getInstance(table, 
checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?
+        latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) :
+        latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP);
+
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), positionValue != null ? 
Long.parseLong(positionValue) : 0, scanAllFiles);
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions(Offset start, Offset end) {
+    if (end.equals(StreamingOffset.START_OFFSET)) {
+      return new InputPartition[0];
+    }
+
+    // broadcast the table metadata as input partitions will be sent to 
executors
+    Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(SerializableTable.copyOf(table));
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+    Preconditions.checkState(
+        end instanceof StreamingOffset,
+        "The end offset passed to planInputPartitions() is not an instance of 
StreamingOffset.");
+
+    Preconditions.checkState(
+        start instanceof StreamingOffset,
+        "The start offset passed to planInputPartitions() is not an instance 
of StreamingOffset.");
+
+    StreamingOffset endOffset = (StreamingOffset) end;
+    StreamingOffset startOffset = (StreamingOffset) start;
+
+    List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, 
endOffset);
+
+    CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+        CloseableIterable.withNoopClose(fileScanTasks),
+        splitSize);
+    List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, 
splitOpenFileCost));
+    InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+    for (int i = 0; i < combinedScanTasks.size(); i++) {
+      readTasks[i] = new ReadTask(
+          combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
+          caseSensitive, localityPreferred);
+    }
+
+    return readTasks;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory(0);
+  }
+
+  @Override
+  public Offset initialOffset() {
+    return initialOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  private static StreamingOffset getOrWriteInitialOffset(InitialOffsetStore 
initialOffsetStore) {
+    if (initialOffsetStore.isOffsetStoreInitialized()) {
+      return initialOffsetStore.getInitialOffset();
+    }
+
+    return initialOffsetStore.addInitialOffset();
+  }
+
+  private List<FileScanTask> calculateFileScanTasks(StreamingOffset 
startOffset, StreamingOffset endOffset) {
+    List<FileScanTask> fileScanTasks = new ArrayList<>();
+    MicroBatch latestMicroBatch = null;
+    StreamingOffset batchStartOffset = 
StreamingOffset.START_OFFSET.equals(startOffset) ?
+        new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 
0, false) :
+        startOffset;
+
+    do {
+      final StreamingOffset currentOffset =
+          latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
+          getNextAvailableSnapshot(latestMicroBatch.snapshotId()) :
+              batchStartOffset;
+
+      latestMicroBatch = 
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+          .caseSensitive(caseSensitive)
+          .specsById(table.specs())
+          .generate(currentOffset.position(), Long.MAX_VALUE, 
currentOffset.shouldScanAllFiles());
+
+      fileScanTasks.addAll(latestMicroBatch.tasks());
+    } while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
+
+    return fileScanTasks;
+  }
+
+  private StreamingOffset getNextAvailableSnapshot(long snapshotId) {
+    Snapshot previousSnapshot = table.snapshot(snapshotId);
+    Snapshot pointer = table.currentSnapshot();
+    while (pointer != null && previousSnapshot.snapshotId() != 
pointer.parentId()) {
+      
Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND),
+          "Encountered Snapshot DataOperation other than APPEND.");
+
+      pointer = table.snapshot(pointer.parentId());
+    }
+
+    Preconditions.checkState(pointer != null,
+        "Cannot read data from snapshot which has already expired: %s", 
snapshotId);
+
+    return new StreamingOffset(pointer.snapshotId(), 0L, false);
+  }
+
+  interface InitialOffsetStore {

Review comment:
       & I am not fully certain - if `getOrWriteInitialOffset` will persist - 
as this class matures to support more options. So, conceptually, I wanted to 
keep this separation - that there is a tangible `InitialOffsetStore` - which is 
providing us with storing and checking on `initialOffset`. Did this make sense!?

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?

Review comment:
        @aokolnychyi - is the concern here about - that the value being `NOT 
PRESENT` in `Snapshot.summary()`? or is it that the `value is present - but 
could be wrong`? or both :)

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {

Review comment:
       I am resolving the comments - the ones which I felt - where we are able 
to drive reasonable consensus or is tracked in my PR **What next** section - so 
that - **the ones that need more discussion will pop out**. Pl. definitely 
reopen or start a new comment - if you feel - the need for more discussion 
right here.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final InitialOffsetStore initialOffsetStore;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+    this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, null))
+        .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT));
+    this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, null))
+        .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT));
+    this.splitOpenFileCost = Optional.ofNullable(
+        Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, 
null))
+        .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_OPEN_FILE_COST,
+            SPLIT_OPEN_FILE_COST_DEFAULT));
+    this.initialOffsetStore = InitialOffsetStore.getInstance(table, 
checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?
+        latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) :
+        latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP);
+
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), positionValue != null ? 
Long.parseLong(positionValue) : 0, scanAllFiles);
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions(Offset start, Offset end) {
+    if (end.equals(StreamingOffset.START_OFFSET)) {
+      return new InputPartition[0];
+    }
+
+    // broadcast the table metadata as input partitions will be sent to 
executors
+    Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(SerializableTable.copyOf(table));
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+    Preconditions.checkState(
+        end instanceof StreamingOffset,
+        "The end offset passed to planInputPartitions() is not an instance of 
StreamingOffset.");
+
+    Preconditions.checkState(
+        start instanceof StreamingOffset,
+        "The start offset passed to planInputPartitions() is not an instance 
of StreamingOffset.");
+
+    StreamingOffset endOffset = (StreamingOffset) end;
+    StreamingOffset startOffset = (StreamingOffset) start;
+
+    List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, 
endOffset);
+
+    CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+        CloseableIterable.withNoopClose(fileScanTasks),
+        splitSize);
+    List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, 
splitOpenFileCost));
+    InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+    for (int i = 0; i < combinedScanTasks.size(); i++) {
+      readTasks[i] = new ReadTask(
+          combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
+          caseSensitive, localityPreferred);
+    }
+
+    return readTasks;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory(0);
+  }
+
+  @Override
+  public Offset initialOffset() {
+    return initialOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  private static StreamingOffset getOrWriteInitialOffset(InitialOffsetStore 
initialOffsetStore) {
+    if (initialOffsetStore.isOffsetStoreInitialized()) {
+      return initialOffsetStore.getInitialOffset();
+    }
+
+    return initialOffsetStore.addInitialOffset();
+  }
+
+  private List<FileScanTask> calculateFileScanTasks(StreamingOffset 
startOffset, StreamingOffset endOffset) {
+    List<FileScanTask> fileScanTasks = new ArrayList<>();
+    MicroBatch latestMicroBatch = null;
+    StreamingOffset batchStartOffset = 
StreamingOffset.START_OFFSET.equals(startOffset) ?
+        new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 
0, false) :
+        startOffset;
+
+    do {
+      final StreamingOffset currentOffset =
+          latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
+          getNextAvailableSnapshot(latestMicroBatch.snapshotId()) :
+              batchStartOffset;
+
+      latestMicroBatch = 
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+          .caseSensitive(caseSensitive)
+          .specsById(table.specs())
+          .generate(currentOffset.position(), Long.MAX_VALUE, 
currentOffset.shouldScanAllFiles());
+
+      fileScanTasks.addAll(latestMicroBatch.tasks());
+    } while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
+
+    return fileScanTasks;
+  }
+
+  private StreamingOffset getNextAvailableSnapshot(long snapshotId) {
+    Snapshot previousSnapshot = table.snapshot(snapshotId);
+    Snapshot pointer = table.currentSnapshot();
+    while (pointer != null && previousSnapshot.snapshotId() != 
pointer.parentId()) {
+      
Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND),
+          "Encountered Snapshot DataOperation other than APPEND.");
+
+      pointer = table.snapshot(pointer.parentId());
+    }
+
+    Preconditions.checkState(pointer != null,
+        "Cannot read data from snapshot which has already expired: %s", 
snapshotId);
+
+    return new StreamingOffset(pointer.snapshotId(), 0L, false);
+  }
+
+  interface InitialOffsetStore {

Review comment:
       & I am not fully certain - if `getOrWriteInitialOffset` will exist after 
a few iterations - as this class matures to support more options. So, 
conceptually, I wanted to keep this separation - that there is a tangible 
`InitialOffsetStore` - which is providing us with storing and checking on 
`initialOffset`. Did this make sense!?

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);

Review comment:
       hmm. this is probably why `SparkBatchScan` also have like literally only 
1 debug log. let me remove this unused variable for now. 

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);

Review comment:
       did a scan on the file and couldn't really find a useful place to log. 
after invoking these interface methods - spark is already logging. this might 
be the reason why other Batch files doesn't have a single info log. removed 
this unused variable for now. pls. feel free to point me at specific places or 
reopen this as you see fit.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(), SPLIT_OPEN_FILE_COST, 
SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+    InitialOffsetStore initialOffsetStore = 
InitialOffsetStore.getInstance(table, checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?

Review comment:
       As @aokolnychyi expressed a concern on reliability of snapshot summary 
props - I switched the code to use the actual manifest stats (which is the 
actual **master of this data**). 
   @RussellSpitzer - I will add this potential optimization possibility into 
**What Next** list - which I will translate into an issue - once this PR makes 
it in..

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+  private final boolean localityPreferred;
+  private final InitialOffsetStore initialOffsetStore;
+  private final StreamingOffset initialOffset;
+
+  SparkMicroBatchStream(JavaSparkContext sparkContext,
+                        Table table, boolean caseSensitive, Schema 
expectedSchema,
+                        CaseInsensitiveStringMap options, String 
checkpointLocation) {
+    this.sparkContext = sparkContext;
+    this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), 
table.location(), options);
+    this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, 
SparkReadOptions.SPLIT_SIZE, null))
+        .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_SIZE, SPLIT_SIZE_DEFAULT));
+    this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, 
SparkReadOptions.LOOKBACK, null))
+        .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), 
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT));
+    this.splitOpenFileCost = Optional.ofNullable(
+        Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, 
null))
+        .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), 
SPLIT_OPEN_FILE_COST,
+            SPLIT_OPEN_FILE_COST_DEFAULT));
+    this.initialOffsetStore = InitialOffsetStore.getInstance(table, 
checkpointLocation);
+    this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+  }
+
+  @Override
+  public Offset latestOffset() {
+    table.refresh();
+    Snapshot latestSnapshot = table.currentSnapshot();
+    if (latestSnapshot == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // a readStream on an Iceberg table can be started from 2 types of 
snapshots
+    // 1. a valid starting Snapshot:
+    //      when this valid starting Snapshot is the initialOffset - then, 
scanAllFiles must be set to true;
+    //      for all StreamingOffsets following this - scanAllFiles must be set 
to false
+    // 2. START_OFFSET:
+    //      if the stream started on the table from START_OFFSET - it implies 
- that all the subsequent Snapshots added
+    //      will have all files as net New manifests & hence scanAllFiles can 
be false.
+    boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) 
&&
+        latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+    String positionValue = scanAllFiles ?
+        latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) :
+        latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP);
+
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), positionValue != null ? 
Long.parseLong(positionValue) : 0, scanAllFiles);
+  }
+
+  @Override
+  public InputPartition[] planInputPartitions(Offset start, Offset end) {
+    if (end.equals(StreamingOffset.START_OFFSET)) {
+      return new InputPartition[0];
+    }
+
+    // broadcast the table metadata as input partitions will be sent to 
executors
+    Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(SerializableTable.copyOf(table));
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+    Preconditions.checkState(
+        end instanceof StreamingOffset,
+        "The end offset passed to planInputPartitions() is not an instance of 
StreamingOffset.");
+
+    Preconditions.checkState(
+        start instanceof StreamingOffset,
+        "The start offset passed to planInputPartitions() is not an instance 
of StreamingOffset.");
+
+    StreamingOffset endOffset = (StreamingOffset) end;
+    StreamingOffset startOffset = (StreamingOffset) start;
+
+    List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, 
endOffset);
+
+    CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+        CloseableIterable.withNoopClose(fileScanTasks),
+        splitSize);
+    List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+        TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, 
splitOpenFileCost));
+    InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+    for (int i = 0; i < combinedScanTasks.size(); i++) {
+      readTasks[i] = new ReadTask(
+          combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
+          caseSensitive, localityPreferred);
+    }
+
+    return readTasks;
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new ReaderFactory(0);
+  }
+
+  @Override
+  public Offset initialOffset() {
+    return initialOffset;
+  }
+
+  @Override
+  public Offset deserializeOffset(String json) {
+    return StreamingOffset.fromJson(json);
+  }
+
+  @Override
+  public void commit(Offset end) {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  private static StreamingOffset getOrWriteInitialOffset(InitialOffsetStore 
initialOffsetStore) {
+    if (initialOffsetStore.isOffsetStoreInitialized()) {
+      return initialOffsetStore.getInitialOffset();
+    }
+
+    return initialOffsetStore.addInitialOffset();
+  }
+
+  private List<FileScanTask> calculateFileScanTasks(StreamingOffset 
startOffset, StreamingOffset endOffset) {
+    List<FileScanTask> fileScanTasks = new ArrayList<>();
+    MicroBatch latestMicroBatch = null;
+    StreamingOffset batchStartOffset = 
StreamingOffset.START_OFFSET.equals(startOffset) ?
+        new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 
0, false) :
+        startOffset;
+
+    do {
+      final StreamingOffset currentOffset =
+          latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
+          getNextAvailableSnapshot(latestMicroBatch.snapshotId()) :
+              batchStartOffset;
+
+      latestMicroBatch = 
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+          .caseSensitive(caseSensitive)
+          .specsById(table.specs())
+          .generate(currentOffset.position(), Long.MAX_VALUE, 
currentOffset.shouldScanAllFiles());
+
+      fileScanTasks.addAll(latestMicroBatch.tasks());
+    } while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
+
+    return fileScanTasks;
+  }
+
+  private StreamingOffset getNextAvailableSnapshot(long snapshotId) {
+    Snapshot previousSnapshot = table.snapshot(snapshotId);
+    Snapshot pointer = table.currentSnapshot();
+    while (pointer != null && previousSnapshot.snapshotId() != 
pointer.parentId()) {
+      
Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND),
+          "Encountered Snapshot DataOperation other than APPEND.");
+
+      pointer = table.snapshot(pointer.parentId());
+    }
+
+    Preconditions.checkState(pointer != null,
+        "Cannot read data from snapshot which has already expired: %s", 
snapshotId);
+
+    return new StreamingOffset(pointer.snapshotId(), 0L, false);
+  }
+
+  interface InitialOffsetStore {

Review comment:
       resolving this - as this is a refactor / code-readability related 
question and i provided a bit reasonable explanation. 
   pl. definitely reopen or ping me on slack - if you feel strongly that we 
should not do this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to