This is an automated email from the ASF dual-hosted git repository.

bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b6de7acdb2 Spark 4.1: Refactor SparkMicroBatchStream to SyncPlanner 
(#15298)
b6de7acdb2 is described below

commit b6de7acdb23704ef5165fb674d6814060f60e754
Author: Ruijing Li <[email protected]>
AuthorDate: Sat Feb 14 18:48:51 2026 -0800

    Spark 4.1: Refactor SparkMicroBatchStream to SyncPlanner (#15298)
---
 .../spark/source/BaseSparkMicroBatchPlanner.java   | 137 ++++++++
 .../iceberg/spark/source/MicroBatchUtils.java      |  67 ++++
 .../spark/source/SparkMicroBatchPlanner.java       |  47 +++
 .../spark/source/SparkMicroBatchStream.java        | 347 ++-------------------
 .../spark/source/SyncSparkMicroBatchPlanner.java   | 240 ++++++++++++++
 .../spark/source/TestMicroBatchPlanningUtils.java  | 100 ++++++
 6 files changed, 619 insertions(+), 319 deletions(-)

diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..70c0129484
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Locale;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
+import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class BaseSparkMicroBatchPlanner implements SparkMicroBatchPlanner {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseSparkMicroBatchPlanner.class);
+  private final Table table;
+  private final SparkReadConf readConf;
+
+  BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf) {
+    this.table = table;
+    this.readConf = readConf;
+  }
+
+  protected Table table() {
+    return table;
+  }
+
+  protected SparkReadConf readConf() {
+    return readConf;
+  }
+
+  protected boolean shouldProcess(Snapshot snapshot) {
+    String op = snapshot.operation();
+    switch (op) {
+      case DataOperations.APPEND:
+        return true;
+      case DataOperations.REPLACE:
+        return false;
+      case DataOperations.DELETE:
+        Preconditions.checkState(
+            readConf.streamingSkipDeleteSnapshots(),
+            "Cannot process delete snapshot: %s, to ignore deletes, set 
%s=true",
+            snapshot.snapshotId(),
+            SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
+        return false;
+      case DataOperations.OVERWRITE:
+        Preconditions.checkState(
+            readConf.streamingSkipOverwriteSnapshots(),
+            "Cannot process overwrite snapshot: %s, to ignore overwrites, set 
%s=true",
+            snapshot.snapshotId(),
+            SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
+        return false;
+      default:
+        throw new IllegalStateException(
+            String.format(
+                "Cannot process unknown snapshot operation: %s (snapshot id 
%s)",
+                op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
+    }
+  }
+
+  /**
+   * Get the next snapshot skipping over rewrite and delete snapshots.
+   *
+   * @param curSnapshot the current snapshot
+   * @return the next valid snapshot (not a rewrite or delete snapshot), 
returns null if all
+   *     remaining snapshots should be skipped.
+   */
+  protected Snapshot nextValidSnapshot(Snapshot curSnapshot) {
+    Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, 
curSnapshot.snapshotId());
+
+    // skip over rewrite and delete snapshots
+    while (!shouldProcess(nextSnapshot)) {
+      LOG.debug("Skipping snapshot: {}", nextSnapshot);
+      // if the currentSnapShot was also the mostRecentSnapshot then break
+      // avoids snapshotAfter throwing exception since there are no more 
snapshots to process
+      if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+        return null;
+      }
+      nextSnapshot = SnapshotUtil.snapshotAfter(table, 
nextSnapshot.snapshotId());
+    }
+    return nextSnapshot;
+  }
+
+  static class UnpackedLimits {
+    private long maxRows = Integer.MAX_VALUE;
+    private long maxFiles = Integer.MAX_VALUE;
+
+    UnpackedLimits(ReadLimit limit) {
+      if (limit instanceof CompositeReadLimit) {
+        ReadLimit[] compositeLimits = ((CompositeReadLimit) 
limit).getReadLimits();
+        for (ReadLimit individualLimit : compositeLimits) {
+          if (individualLimit instanceof ReadMaxRows) {
+            ReadMaxRows readMaxRows = (ReadMaxRows) individualLimit;
+            this.maxRows = Math.min(this.maxRows, readMaxRows.maxRows());
+          } else if (individualLimit instanceof ReadMaxFiles) {
+            ReadMaxFiles readMaxFiles = (ReadMaxFiles) individualLimit;
+            this.maxFiles = Math.min(this.maxFiles, readMaxFiles.maxFiles());
+          }
+        }
+      } else if (limit instanceof ReadMaxRows) {
+        this.maxRows = ((ReadMaxRows) limit).maxRows();
+      } else if (limit instanceof ReadMaxFiles) {
+        this.maxFiles = ((ReadMaxFiles) limit).maxFiles();
+      }
+    }
+
+    public long getMaxRows() {
+      return maxRows;
+    }
+
+    public long getMaxFiles() {
+      return maxFiles;
+    }
+  }
+}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
new file mode 100644
index 0000000000..360e0e8fb6
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+
+class MicroBatchUtils {
+
+  private MicroBatchUtils() {}
+
+  static StreamingOffset determineStartingOffset(Table table, long 
fromTimestamp) {
+    if (table.currentSnapshot() == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    if (fromTimestamp == Long.MIN_VALUE) {
+      // start from the oldest snapshot, since default value is MIN_VALUE
+      // avoids looping to find first snapshot
+      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+    }
+
+    if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    try {
+      Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, 
fromTimestamp);
+      if (snapshot != null) {
+        return new StreamingOffset(snapshot.snapshotId(), 0, false);
+      } else {
+        return StreamingOffset.START_OFFSET;
+      }
+    } catch (IllegalStateException e) {
+      // could not determine the first snapshot after the timestamp. use the 
oldest ancestor instead
+      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+    }
+  }
+
+  static long addedFilesCount(Table table, Snapshot snapshot) {
+    long addedFilesCount =
+        PropertyUtil.propertyAsLong(snapshot.summary(), 
SnapshotSummary.ADDED_FILES_PROP, -1);
+    return addedFilesCount == -1
+        ? Iterables.size(snapshot.addedDataFiles(table.io()))
+        : addedFilesCount;
+  }
+}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..1986ddac5d
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.FileScanTask;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+
+interface SparkMicroBatchPlanner {
+  /**
+   * Return the {@link FileScanTask}s for data added between the start and end 
offsets.
+   *
+   * @param startOffset the offset to start planning from
+   * @param endOffset the offset to plan up to
+   * @return file scan tasks for data in the offset range
+   */
+  List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset 
endOffset);
+
+  /**
+   * Return the latest offset the stream can advance to from {@code 
startOffset}, respecting the
+   * given {@link ReadLimit}.
+   *
+   * @param startOffset the current offset of the stream
+   * @param limit the read limit bounding how far ahead to advance
+   * @return the latest available offset, or {@code null} if no new data is 
available
+   */
+  StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit limit);
+
+  /** Stop the planner and release any resources. */
+  void stop();
+}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 4c1b4b65a8..036a205c97 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -26,44 +26,29 @@ import java.io.OutputStreamWriter;
 import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-import java.util.Locale;
 import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.MicroBatches;
-import org.apache.iceberg.MicroBatches.MicroBatch;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.connector.read.InputPartition;
 import org.apache.spark.sql.connector.read.PartitionReaderFactory;
-import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
 import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
 import org.apache.spark.sql.connector.read.streaming.Offset;
 import org.apache.spark.sql.connector.read.streaming.ReadLimit;
-import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
-import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
 import 
org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,6 +59,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = 
Types.StructType.of();
 
   private final Table table;
+  private final SparkReadConf readConf;
   private final boolean caseSensitive;
   private final String expectedSchema;
   private final Broadcast<Table> tableBroadcast;
@@ -82,12 +68,11 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   private final long splitOpenFileCost;
   private final boolean localityPreferred;
   private final StreamingOffset initialOffset;
-  private final boolean skipDelete;
-  private final boolean skipOverwrite;
   private final long fromTimestamp;
   private final int maxFilesPerMicroBatch;
   private final int maxRecordsPerMicroBatch;
   private final boolean cacheDeleteFilesOnExecutors;
+  private SparkMicroBatchPlanner planner;
   private StreamingOffset lastOffsetForTriggerAvailableNow;
 
   SparkMicroBatchStream(
@@ -97,6 +82,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
       Schema expectedSchema,
       String checkpointLocation) {
     this.table = table;
+    this.readConf = readConf;
     this.caseSensitive = readConf.caseSensitive();
     this.expectedSchema = SchemaParser.toJson(expectedSchema);
     this.localityPreferred = readConf.localityEnabled();
@@ -112,9 +98,6 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
     InitialOffsetStore initialOffsetStore =
         new InitialOffsetStore(table, checkpointLocation, fromTimestamp);
     this.initialOffset = initialOffsetStore.initialOffset();
-
-    this.skipDelete = readConf.streamingSkipDeleteSnapshots();
-    this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots();
   }
 
   @Override
@@ -130,7 +113,8 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
 
     Snapshot latestSnapshot = table.currentSnapshot();
 
-    return new StreamingOffset(latestSnapshot.snapshotId(), 
addedFilesCount(latestSnapshot), false);
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), MicroBatchUtils.addedFilesCount(table, 
latestSnapshot), false);
   }
 
   @Override
@@ -149,7 +133,12 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
     StreamingOffset endOffset = (StreamingOffset) end;
     StreamingOffset startOffset = (StreamingOffset) start;
 
-    List<FileScanTask> fileScanTasks = planFiles(startOffset, endOffset);
+    // Initialize planner if not already done (for resume scenarios)
+    if (planner == null) {
+      initializePlanner(startOffset, endOffset);
+    }
+
+    List<FileScanTask> fileScanTasks = planner.planFiles(startOffset, 
endOffset);
 
     CloseableIterable<FileScanTask> splitTasks =
         
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTasks), 
splitSize);
@@ -198,164 +187,18 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   public void commit(Offset end) {}
 
   @Override
-  public void stop() {}
-
-  private List<FileScanTask> planFiles(StreamingOffset startOffset, 
StreamingOffset endOffset) {
-    List<FileScanTask> fileScanTasks = Lists.newArrayList();
-    StreamingOffset batchStartOffset =
-        StreamingOffset.START_OFFSET.equals(startOffset)
-            ? determineStartingOffset(table, fromTimestamp)
-            : startOffset;
-
-    StreamingOffset currentOffset = null;
-
-    // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
-    do {
-      long endFileIndex;
-      if (currentOffset == null) {
-        currentOffset = batchStartOffset;
-      } else {
-        Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, 
currentOffset.snapshotId());
-        // it may happen that we need to read this snapshot partially in case 
it's equal to
-        // endOffset.
-        if (currentOffset.snapshotId() != endOffset.snapshotId()) {
-          currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, 
false);
-        } else {
-          currentOffset = endOffset;
-        }
-      }
-
-      Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
-
-      validateCurrentSnapshotExists(snapshot, currentOffset);
-
-      if (!shouldProcess(snapshot)) {
-        LOG.debug("Skipping snapshot: {} of table {}", 
currentOffset.snapshotId(), table.name());
-        continue;
-      }
-
-      Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
-      if (currentOffset.snapshotId() == endOffset.snapshotId()) {
-        endFileIndex = endOffset.position();
-      } else {
-        endFileIndex = addedFilesCount(currentSnapshot);
-      }
-
-      MicroBatch latestMicroBatch =
-          MicroBatches.from(currentSnapshot, table.io())
-              .caseSensitive(caseSensitive)
-              .specsById(table.specs())
-              .generate(
-                  currentOffset.position(),
-                  endFileIndex,
-                  Long.MAX_VALUE,
-                  currentOffset.shouldScanAllFiles());
-
-      fileScanTasks.addAll(latestMicroBatch.tasks());
-    } while (currentOffset.snapshotId() != endOffset.snapshotId());
-
-    return fileScanTasks;
-  }
-
-  private boolean shouldProcess(Snapshot snapshot) {
-    String op = snapshot.operation();
-    switch (op) {
-      case DataOperations.APPEND:
-        return true;
-      case DataOperations.REPLACE:
-        return false;
-      case DataOperations.DELETE:
-        Preconditions.checkState(
-            skipDelete,
-            "Cannot process delete snapshot: %s, to ignore deletes, set 
%s=true",
-            snapshot.snapshotId(),
-            SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
-        return false;
-      case DataOperations.OVERWRITE:
-        Preconditions.checkState(
-            skipOverwrite,
-            "Cannot process overwrite snapshot: %s, to ignore overwrites, set 
%s=true",
-            snapshot.snapshotId(),
-            SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
-        return false;
-      default:
-        throw new IllegalStateException(
-            String.format(
-                "Cannot process unknown snapshot operation: %s (snapshot id 
%s)",
-                op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
+  public void stop() {
+    if (planner != null) {
+      planner.stop();
     }
   }
 
-  private static StreamingOffset determineStartingOffset(Table table, Long 
fromTimestamp) {
-    if (table.currentSnapshot() == null) {
-      return StreamingOffset.START_OFFSET;
-    }
-
-    if (fromTimestamp == null) {
-      // match existing behavior and start from the oldest snapshot
-      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
-    }
-
-    if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
-      return StreamingOffset.START_OFFSET;
-    }
-
-    try {
-      Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, 
fromTimestamp);
-      if (snapshot != null) {
-        return new StreamingOffset(snapshot.snapshotId(), 0, false);
-      } else {
-        return StreamingOffset.START_OFFSET;
-      }
-    } catch (IllegalStateException e) {
-      // could not determine the first snapshot after the timestamp. use the 
oldest ancestor instead
-      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
-    }
-  }
-
-  private static int getMaxFiles(ReadLimit readLimit) {
-    if (readLimit instanceof ReadMaxFiles) {
-      return ((ReadMaxFiles) readLimit).maxFiles();
-    }
-
-    if (readLimit instanceof CompositeReadLimit) {
-      // We do not expect a CompositeReadLimit to contain a nested 
CompositeReadLimit.
-      // In fact, it should only be a composite of two or more of ReadMinRows, 
ReadMaxRows and
-      // ReadMaxFiles, with no more than one of each.
-      ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
-      for (ReadLimit limit : limits) {
-        if (limit instanceof ReadMaxFiles) {
-          return ((ReadMaxFiles) limit).maxFiles();
-        }
-      }
-    }
-
-    // there is no ReadMaxFiles, so return the default
-    return Integer.MAX_VALUE;
-  }
-
-  private static int getMaxRows(ReadLimit readLimit) {
-    if (readLimit instanceof ReadMaxRows) {
-      long maxRows = ((ReadMaxRows) readLimit).maxRows();
-      return Math.toIntExact(maxRows);
-    }
-
-    if (readLimit instanceof CompositeReadLimit) {
-      ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
-      for (ReadLimit limit : limits) {
-        if (limit instanceof ReadMaxRows) {
-          long maxRows = ((ReadMaxRows) limit).maxRows();
-          return Math.toIntExact(maxRows);
-        }
-      }
-    }
-
-    // there is no ReadMaxRows, so return the default
-    return Integer.MAX_VALUE;
+  private void initializePlanner(StreamingOffset startOffset, StreamingOffset 
endOffset) {
+    this.planner =
+        new SyncSparkMicroBatchPlanner(table, readConf, 
lastOffsetForTriggerAvailableNow);
   }
 
   @Override
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public Offset latestOffset(Offset startOffset, ReadLimit limit) {
     // calculate end offset get snapshotId from the startOffset
     Preconditions.checkArgument(
@@ -363,152 +206,12 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
         "Invalid start offset: %s is not a StreamingOffset",
         startOffset);
 
-    table.refresh();
-    if (table.currentSnapshot() == null) {
-      return StreamingOffset.START_OFFSET;
-    }
-
-    if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
-      return StreamingOffset.START_OFFSET;
-    }
-
-    // end offset can expand to multiple snapshots
-    StreamingOffset startingOffset = (StreamingOffset) startOffset;
-
-    if (startOffset.equals(StreamingOffset.START_OFFSET)) {
-      startingOffset = determineStartingOffset(table, fromTimestamp);
-    }
-
-    Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
-    validateCurrentSnapshotExists(curSnapshot, startingOffset);
-
-    // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
-    long latestSnapshotId =
-        lastOffsetForTriggerAvailableNow != null
-            ? lastOffsetForTriggerAvailableNow.snapshotId()
-            : table.currentSnapshot().snapshotId();
-
-    int startPosOfSnapOffset = (int) startingOffset.position();
-
-    boolean scanAllFiles = startingOffset.shouldScanAllFiles();
-
-    boolean shouldContinueReading = true;
-    int curFilesAdded = 0;
-    long curRecordCount = 0;
-    int curPos = 0;
-
-    // Note : we produce nextOffset with pos as non-inclusive
-    while (shouldContinueReading) {
-      // generate manifest index for the curSnapshot
-      List<Pair<ManifestFile, Integer>> indexedManifests =
-          MicroBatches.skippedManifestIndexesFromSnapshot(
-              table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
-      // this is under assumption we will be able to add at-least 1 file in 
the new offset
-      for (int idx = 0; idx < indexedManifests.size() && 
shouldContinueReading; idx++) {
-        // be rest assured curPos >= startFileIndex
-        curPos = indexedManifests.get(idx).second();
-        try (CloseableIterable<FileScanTask> taskIterable =
-                MicroBatches.openManifestFile(
-                    table.io(),
-                    table.specs(),
-                    caseSensitive,
-                    curSnapshot,
-                    indexedManifests.get(idx).first(),
-                    scanAllFiles);
-            CloseableIterator<FileScanTask> taskIter = 
taskIterable.iterator()) {
-          while (taskIter.hasNext()) {
-            FileScanTask task = taskIter.next();
-            if (curPos >= startPosOfSnapOffset) {
-              if ((curFilesAdded + 1) > getMaxFiles(limit)) {
-                // On including the file it might happen that we might exceed, 
the configured
-                // soft limit on the number of records, since this is a soft 
limit its acceptable.
-                shouldContinueReading = false;
-                break;
-              }
-
-              curFilesAdded += 1;
-              curRecordCount += task.file().recordCount();
-
-              if (curRecordCount >= getMaxRows(limit)) {
-                // we included the file, so increment the number of files
-                // read in the current snapshot.
-                ++curPos;
-                shouldContinueReading = false;
-                break;
-              }
-            }
-            ++curPos;
-          }
-        } catch (IOException ioe) {
-          LOG.warn("Failed to close task iterable", ioe);
-        }
-      }
-      // if the currentSnapShot was also the latestSnapshot then break
-      if (curSnapshot.snapshotId() == latestSnapshotId) {
-        break;
-      }
-
-      // if everything was OK and we consumed complete snapshot then move to 
next snapshot
-      if (shouldContinueReading) {
-        Snapshot nextValid = nextValidSnapshot(curSnapshot);
-        if (nextValid == null) {
-          // nextValid implies all the remaining snapshots should be skipped.
-          break;
-        }
-        // we found the next available snapshot, continue from there.
-        curSnapshot = nextValid;
-        startPosOfSnapOffset = -1;
-        // if anyhow we are moving to next snapshot we should only scan 
addedFiles
-        scanAllFiles = false;
-      }
+    // Initialize planner if not already done
+    if (planner == null) {
+      initializePlanner((StreamingOffset) startOffset, null);
     }
 
-    StreamingOffset latestStreamingOffset =
-        new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);
-
-    // if no new data arrived, then return null.
-    return latestStreamingOffset.equals(startingOffset) ? null : 
latestStreamingOffset;
-  }
-
-  /**
-   * Get the next snapshot skiping over rewrite and delete snapshots.
-   *
-   * @param curSnapshot the current snapshot
-   * @return the next valid snapshot (not a rewrite or delete snapshot), 
returns null if all
-   *     remaining snapshots should be skipped.
-   */
-  private Snapshot nextValidSnapshot(Snapshot curSnapshot) {
-    Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, 
curSnapshot.snapshotId());
-    // skip over rewrite and delete snapshots
-    while (!shouldProcess(nextSnapshot)) {
-      LOG.debug("Skipping snapshot: {} of table {}", 
nextSnapshot.snapshotId(), table.name());
-      // if the currentSnapShot was also the mostRecentSnapshot then break
-      if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
-        return null;
-      }
-      nextSnapshot = SnapshotUtil.snapshotAfter(table, 
nextSnapshot.snapshotId());
-    }
-    return nextSnapshot;
-  }
-
-  private long addedFilesCount(Snapshot snapshot) {
-    long addedFilesCount =
-        PropertyUtil.propertyAsLong(snapshot.summary(), 
SnapshotSummary.ADDED_FILES_PROP, -1);
-    // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP,
-    // iterate through addedFiles iterator to find addedFilesCount.
-    return addedFilesCount == -1
-        ? Iterables.size(snapshot.addedDataFiles(table.io()))
-        : addedFilesCount;
-  }
-
-  private void validateCurrentSnapshotExists(Snapshot snapshot, 
StreamingOffset currentOffset) {
-    if (snapshot == null) {
-      throw new IllegalStateException(
-          String.format(
-              Locale.ROOT,
-              "Cannot load current offset at snapshot %d, the snapshot was 
expired or removed",
-              currentOffset.snapshotId()));
-    }
+    return planner.latestOffset((StreamingOffset) startOffset, limit);
   }
 
   @Override
@@ -536,6 +239,12 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
         (StreamingOffset) latestOffset(initialOffset, 
ReadLimit.allAvailable());
 
     LOG.info("lastOffset for Trigger.AvailableNow is {}", 
lastOffsetForTriggerAvailableNow.json());
+
+    // Reset planner so it gets recreated with the cap on next call
+    if (planner != null) {
+      planner.stop();
+      planner = null;
+    }
   }
 
   private static class InitialOffsetStore {
@@ -558,7 +267,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
       }
 
       table.refresh();
-      StreamingOffset offset = determineStartingOffset(table, fromTimestamp);
+      StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table, 
fromTimestamp);
 
       OutputFile outputFile = io.newOutputFile(initialOffsetLocation);
       writeOffset(offset, outputFile);
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..fe662e2b83
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SyncSparkMicroBatchPlanner.class);
+
+  private final boolean caseSensitive;
+  private final long fromTimestamp;
+  private final StreamingOffset lastOffsetForTriggerAvailableNow;
+
+  SyncSparkMicroBatchPlanner(
+      Table table, SparkReadConf readConf, StreamingOffset 
lastOffsetForTriggerAvailableNow) {
+    super(table, readConf);
+    this.caseSensitive = readConf().caseSensitive();
+    this.fromTimestamp = readConf().streamFromTimestamp();
+    this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
+  }
+
+  @Override
+  public List<FileScanTask> planFiles(StreamingOffset startOffset, 
StreamingOffset endOffset) {
+    List<FileScanTask> fileScanTasks = Lists.newArrayList();
+    StreamingOffset batchStartOffset =
+        StreamingOffset.START_OFFSET.equals(startOffset)
+            ? MicroBatchUtils.determineStartingOffset(table(), fromTimestamp)
+            : startOffset;
+
+    StreamingOffset currentOffset = null;
+
+    // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
+    do {
+      long endFileIndex;
+      if (currentOffset == null) {
+        currentOffset = batchStartOffset;
+      } else {
+        Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table(), 
currentOffset.snapshotId());
+        // it may happen that we need to read this snapshot partially in case 
it's equal to
+        // endOffset.
+        if (currentOffset.snapshotId() != endOffset.snapshotId()) {
+          currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, 
false);
+        } else {
+          currentOffset = endOffset;
+        }
+      }
+
+      Snapshot snapshot = table().snapshot(currentOffset.snapshotId());
+
+      validateCurrentSnapshotExists(snapshot, currentOffset);
+
+      if (!shouldProcess(snapshot)) {
+        LOG.debug("Skipping snapshot: {} of table {}", 
currentOffset.snapshotId(), table().name());
+        continue;
+      }
+
+      Snapshot currentSnapshot = table().snapshot(currentOffset.snapshotId());
+      if (currentOffset.snapshotId() == endOffset.snapshotId()) {
+        endFileIndex = endOffset.position();
+      } else {
+        endFileIndex = MicroBatchUtils.addedFilesCount(table(), 
currentSnapshot);
+      }
+
+      MicroBatch latestMicroBatch =
+          MicroBatches.from(currentSnapshot, table().io())
+              .caseSensitive(caseSensitive)
+              .specsById(table().specs())
+              .generate(
+                  currentOffset.position(),
+                  endFileIndex,
+                  Long.MAX_VALUE,
+                  currentOffset.shouldScanAllFiles());
+
+      fileScanTasks.addAll(latestMicroBatch.tasks());
+    } while (currentOffset.snapshotId() != endOffset.snapshotId());
+
+    return fileScanTasks;
+  }
+
+  @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  public StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit 
limit) {
+    table().refresh();
+    if (table().currentSnapshot() == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    if (table().currentSnapshot().timestampMillis() < fromTimestamp) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // end offset can expand to multiple snapshots
+    StreamingOffset startingOffset = startOffset;
+
+    if (startOffset.equals(StreamingOffset.START_OFFSET)) {
+      startingOffset = MicroBatchUtils.determineStartingOffset(table(), 
fromTimestamp);
+    }
+
+    Snapshot curSnapshot = table().snapshot(startingOffset.snapshotId());
+    validateCurrentSnapshotExists(curSnapshot, startingOffset);
+
+    // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
+    long latestSnapshotId =
+        lastOffsetForTriggerAvailableNow != null
+            ? lastOffsetForTriggerAvailableNow.snapshotId()
+            : table().currentSnapshot().snapshotId();
+
+    int startPosOfSnapOffset = (int) startingOffset.position();
+
+    boolean scanAllFiles = startingOffset.shouldScanAllFiles();
+
+    boolean shouldContinueReading = true;
+    int curFilesAdded = 0;
+    long curRecordCount = 0;
+    int curPos = 0;
+
+    // Extract limits once to avoid repeated calls in tight loop
+    UnpackedLimits unpackedLimits = new UnpackedLimits(limit);
+    long maxFiles = unpackedLimits.getMaxFiles();
+    long maxRows = unpackedLimits.getMaxRows();
+
+    // Note : we produce nextOffset with pos as non-inclusive
+    while (shouldContinueReading) {
+      // generate manifest index for the curSnapshot
+      List<Pair<ManifestFile, Integer>> indexedManifests =
+          MicroBatches.skippedManifestIndexesFromSnapshot(
+              table().io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
+      // this is under assumption we will be able to add at-least 1 file in 
the new offset
+      for (int idx = 0; idx < indexedManifests.size() && 
shouldContinueReading; idx++) {
+        // be rest assured curPos >= startFileIndex
+        curPos = indexedManifests.get(idx).second();
+        try (CloseableIterable<FileScanTask> taskIterable =
+                MicroBatches.openManifestFile(
+                    table().io(),
+                    table().specs(),
+                    caseSensitive,
+                    curSnapshot,
+                    indexedManifests.get(idx).first(),
+                    scanAllFiles);
+            CloseableIterator<FileScanTask> taskIter = 
taskIterable.iterator()) {
+          while (taskIter.hasNext()) {
+            FileScanTask task = taskIter.next();
+            if (curPos >= startPosOfSnapOffset) {
+              if ((curFilesAdded + 1) > maxFiles) {
+                // On including the file it might happen that we might exceed, 
the configured
+                // soft limit on the number of records, since this is a soft 
limit its acceptable.
+                shouldContinueReading = false;
+                break;
+              }
+
+              curFilesAdded += 1;
+              curRecordCount += task.file().recordCount();
+
+              if (curRecordCount >= maxRows) {
+                // we included the file, so increment the number of files
+                // read in the current snapshot.
+                ++curPos;
+                shouldContinueReading = false;
+                break;
+              }
+            }
+            ++curPos;
+          }
+        } catch (IOException ioe) {
+          LOG.warn("Failed to close task iterable", ioe);
+        }
+      }
+      // if the currentSnapShot was also the latestSnapshot then break
+      if (curSnapshot.snapshotId() == latestSnapshotId) {
+        break;
+      }
+
+      // if everything was OK and we consumed complete snapshot then move to 
next snapshot
+      if (shouldContinueReading) {
+        Snapshot nextValid = nextValidSnapshot(curSnapshot);
+        if (nextValid == null) {
+          // nextValid implies all the remaining snapshots should be skipped.
+          break;
+        }
+        // we found the next available snapshot, continue from there.
+        curSnapshot = nextValid;
+        startPosOfSnapOffset = -1;
+        // if anyhow we are moving to next snapshot we should only scan 
addedFiles
+        scanAllFiles = false;
+      }
+    }
+
+    StreamingOffset latestStreamingOffset =
+        new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);
+
+    // if no new data arrived, then return null.
+    return latestStreamingOffset.equals(startingOffset) ? null : 
latestStreamingOffset;
+  }
+
+  @Override
+  public void stop() {}
+
+  private void validateCurrentSnapshotExists(Snapshot snapshot, 
StreamingOffset currentOffset) {
+    if (snapshot == null) {
+      throw new IllegalStateException(
+          String.format(
+              Locale.ROOT,
+              "Cannot load current offset at snapshot %d, the snapshot was 
expired or removed",
+              currentOffset.snapshotId()));
+    }
+  }
+}
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
new file mode 100644
index 0000000000..a9ce340fd4
--- /dev/null
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestMicroBatchPlanningUtils extends CatalogTestBase {
+
+  private Table table;
+
+  @BeforeEach
+  public void setupTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql(
+        "CREATE TABLE %s "
+            + "(id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (bucket(3, id))",
+        tableName);
+    this.table = validationCatalog.loadTable(tableIdent);
+  }
+
+  @AfterEach
+  public void dropTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @TestTemplate
+  public void testUnpackedLimitsCompositeChoosesMinimum() {
+    ReadLimit[] limits =
+        new ReadLimit[] {
+          ReadLimit.maxRows(10), ReadLimit.maxRows(4), ReadLimit.maxFiles(8), 
ReadLimit.maxFiles(2)
+        };
+
+    ReadLimit composite = ReadLimit.compositeLimit(limits);
+
+    BaseSparkMicroBatchPlanner.UnpackedLimits unpacked =
+        new BaseSparkMicroBatchPlanner.UnpackedLimits(composite);
+
+    assertThat(unpacked.getMaxRows()).isEqualTo(4);
+    assertThat(unpacked.getMaxFiles()).isEqualTo(2);
+  }
+
+  @TestTemplate
+  public void testDetermineStartingOffsetWithTimestampBetweenSnapshots() {
+    sql("INSERT INTO %s VALUES (1, 'one')", tableName);
+    table.refresh();
+    long snapshot1Time = table.currentSnapshot().timestampMillis();
+
+    sql("INSERT INTO %s VALUES (2, 'two')", tableName);
+    table.refresh();
+    long snapshot2Id = table.currentSnapshot().snapshotId();
+
+    StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table, 
snapshot1Time + 1);
+
+    assertThat(offset.snapshotId()).isEqualTo(snapshot2Id);
+    assertThat(offset.position()).isEqualTo(0L);
+    assertThat(offset.shouldScanAllFiles()).isFalse();
+  }
+
+  @TestTemplate
+  public void testAddedFilesCountUsesSummaryWhenPresent() {
+    sql("INSERT INTO %s VALUES (1, 'one')", tableName);
+    table.refresh();
+
+    long expectedAddedFiles =
+        
Long.parseLong(table.currentSnapshot().summary().get(SnapshotSummary.ADDED_FILES_PROP));
+
+    long actual = MicroBatchUtils.addedFilesCount(table, 
table.currentSnapshot());
+
+    assertThat(actual).isEqualTo(expectedAddedFiles);
+  }
+}


Reply via email to