rdblue commented on a change in pull request #2752:
URL: https://github.com/apache/iceberg/pull/2752#discussion_r661051560



##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -169,35 +174,64 @@ public void stop() {
 
   private List<FileScanTask> planFiles(StreamingOffset startOffset, 
StreamingOffset endOffset) {
     List<FileScanTask> fileScanTasks = Lists.newArrayList();
-    MicroBatch latestMicroBatch = null;
     StreamingOffset batchStartOffset = 
StreamingOffset.START_OFFSET.equals(startOffset) ?
         new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 
0, false) :
         startOffset;
 
+    StreamingOffset currentOffset = null;
+
     do {
-      StreamingOffset currentOffset =
-          latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
-              new 
StreamingOffset(snapshotAfter(latestMicroBatch.snapshotId()), 0L, false) :
-              batchStartOffset;
+      if (currentOffset == null) {
+        currentOffset = batchStartOffset;
+      } else {
+        Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, 
currentOffset.snapshotId());
+        boolean shouldSkip = shouldSkip(snapshotAfter);
+
+        currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, 
false);
+
+        if (shouldSkip) {
+          continue;
+        }
+      }
 
-      latestMicroBatch = 
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+      MicroBatch latestMicroBatch = 
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
           .caseSensitive(caseSensitive)
           .specsById(table.specs())
           .generate(currentOffset.position(), Long.MAX_VALUE, 
currentOffset.shouldScanAllFiles());
 
       fileScanTasks.addAll(latestMicroBatch.tasks());
-    } while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
+    } while (currentOffset.snapshotId() != endOffset.snapshotId());
 
     return fileScanTasks;
   }
 
-  private long snapshotAfter(long snapshotId) {
-    Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, snapshotId);
+  private boolean shouldSkip(Snapshot snapshot) {
+    if (snapshot.operation().equals(DataOperations.DELETE)) {
+      return shouldSkipDelete(snapshot);

Review comment:
       I think it would be cleaner to embed the `shouldSkip` methods here 
rather than having a method that is a precondition and a static `return true`:
   
   ```java
     private boolean shouldProcess(Snapshot snapshot) {
       DataOperation op = snapshot.operation();
       Preconditions.checkState(op != DataOperations.DELETE || skipDelete,
           "Cannot process delete snapshot: %s", snapshot.snapshotId());
       Preconditions.checkState(op == DataOperations.APPEND || op == 
DataOperations.REPLACE,
           "Cannot process %s snapshot: %s", op.toString.toLower(Locale.ROOT), 
snapshot.snapshotId());
       return op == DataOperations.APPEND;
     }
   ```
   
   I also used `shouldProcess` instead of `shouldSkip` to make it easier to 
read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to