RussellSpitzer commented on a change in pull request #3039:
URL: https://github.com/apache/iceberg/pull/3039#discussion_r765848482



##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -195,31 +200,47 @@ public void stop() {
 
   private boolean shouldProcess(Snapshot snapshot) {
     String op = snapshot.operation();
-    switch (op) {
-      case DataOperations.APPEND:
-        return true;
-      case DataOperations.REPLACE:
-        return false;
-      case DataOperations.DELETE:
-        Preconditions.checkState(skipDelete,
-            "Cannot process delete snapshot : %s. Set read option %s to allow 
skipping snapshots of type delete",
-            snapshot.snapshotId(), 
SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
-        return false;
-      default:
-        throw new IllegalStateException(String.format(
-            "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), 
snapshot.snapshotId()));
+    Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete,
+        "Cannot process delete snapshot: %s", snapshot.snapshotId());
+    Preconditions.checkState(
+        op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) 
|| op.equals(DataOperations.REPLACE),
+        "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), 
snapshot.snapshotId());
+    return op.equals(DataOperations.APPEND);
+  }
+
+  private static boolean isStreamEmpty(Table table) {
+    return table.currentSnapshot() == null;
+  }
+
+  private static boolean isStreamNotEmpty(Table table) {
+    return table.currentSnapshot() != null;
+  }
+
+  private static boolean isFutureStartTime(Table table, Long 
streamStartTimeStampMillis) {
+    if (streamStartTimeStampMillis == null) {
+      return false;
     }
+
+    return table.currentSnapshot().timestampMillis() < 
streamStartTimeStampMillis;
+  }
+
+  private static StreamingOffset initialFutureStartOffset(Table table) {
+    Preconditions.checkNotNull(table, "Cannot process future start offset with 
invalid table input.");
+    Snapshot latestSnapshot = table.currentSnapshot();
+    return new StreamingOffset(latestSnapshot.snapshotId(), 
Iterables.size(latestSnapshot.addedFiles()) + 1, false);
   }
 
   private static class InitialOffsetStore {
     private final Table table;
     private final FileIO io;
     private final String initialOffsetLocation;
+    private final Long fromTimestamp;
 
-    InitialOffsetStore(Table table, String checkpointLocation) {
+    InitialOffsetStore(Table table, String checkpointLocation, Long 
fromTimestamp) {
       this.table = table;
       this.io = table.io();
       this.initialOffsetLocation = SLASH.join(checkpointLocation, "offsets/0");
+      this.fromTimestamp = fromTimestamp == null ? -1L : fromTimestamp;

Review comment:
       -1 is in the 1970's and while I hope no one ever backdates snapshots it 
may be good to default to a large negative here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to