kbendick commented on a change in pull request #3039:
URL: https://github.com/apache/iceberg/pull/3039#discussion_r703961666



##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -106,7 +107,9 @@
     this.splitOpenFileCost = Spark3Util.propertyAsLong(
         options, SparkReadOptions.FILE_OPEN_COST, tableSplitOpenFileCost);
 
-    InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, 
checkpointLocation);
+    this.fromTimestamp = Spark3Util.propertyAsLong(options, 
SparkReadOptions.STREAM_FROM_TIMESTAMP, -1L);
+
+    InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, 
checkpointLocation, this.fromTimestamp);

Review comment:
       Yes. In line 112 I don't think `this` should be used. In line 110, it's 
necessary.
   
   I don't personally care much, but it's consistent with everything else I see 
in the library etc.

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -65,12 +65,16 @@ public static boolean ancestorOf(Table table, long 
snapshotId, long ancestorSnap
   }
 
   /**
-   * Traverses the history of the table's current snapshot and finds the 
oldest Snapshot.
-   * @return null if there is no current snapshot in the table, else the 
oldest Snapshot.
+   * Traverses the history of the table's current snapshot and finds the 
oldest Snapshot after or equal to the timestamp in milliseconds.
+   * @return null if there is no current snapshot in the table, else the 
oldest Snapshot after or equal to the timestamp in milliseconds.
    */
-  public static Snapshot oldestSnapshot(Table table) {
+  public static Snapshot oldestSnapshot(Table table, long timestampMillis) {

Review comment:
       Once we confirm this is definitely the same semantics of 
`as-of-timestamp`, can we just rename this to be `snapshotAsOfTimstamp`?

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -224,11 +227,13 @@ private boolean shouldProcess(Snapshot snapshot) {
     private final Table table;
     private final FileIO io;
     private final String initialOffsetLocation;
+    private final long fromTimestamp;

Review comment:
       Nah that's perfect. You'd see it there. I checked it out and didn't see 
it either, so you're good. 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to