rdblue commented on a change in pull request #3039:
URL: https://github.com/apache/iceberg/pull/3039#discussion_r753832515



##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -101,6 +101,41 @@ public static Snapshot oldestAncestor(Table table) {
     return ancestorsOf(start, lookup);
   }
 
+  /**
+   * Traverses the history of the table's current snapshot
+   * and finds the oldest ancestor snapshot after or equal to the timestamp in 
milliseconds.
+   * @return null if there is no current snapshot in the table,
+   * else the oldest ancestor snapshot after or equal to the timestamp in 
milliseconds.
+   */
+  public static Snapshot oldestAncestorAfter(Table table, Long 
timestampMillis) {
+    Snapshot current = table.currentSnapshot();
+    long timestamp = timestampMillis == null ? -1L : timestampMillis;
+    if (current == null || current.timestampMillis() < timestamp) {
+      return null;
+    }
+
+    for (Snapshot snapshot : currentAncestors(table)) {
+      if (snapshot.timestampMillis() < timestamp) {
+        break;
+      }
+
+      current = snapshot;
+    }
+
+    return current;

Review comment:
       I don't think this logic is correct. In the version that I wrote, the 
method will throw an `IllegalStateException` if it cannot determine whether the 
ancestor was actually the first snapshot after the given timestamp. It is fine 
if the behavior you want is to ignore that case in practice, but the utility 
methods in `SnapshotUtil` should be more careful. What you should do instead is 
call `oldestAncestor` if this fails to find one and you want to default to the 
oldest ancestor.

##########
File path: 
spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -62,6 +62,7 @@
 
 import static org.apache.iceberg.expressions.Expressions.ref;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.junit.jupiter.api.Assertions.assertTrue;

Review comment:
       Can you revert unnecessary changes?

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -211,4 +211,10 @@ public boolean handleTimestampWithoutZone() {
         
.defaultValue(SparkSQLProperties.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_DEFAULT)
         .parse();
   }
+
+  public Long fromTimestamp() {

Review comment:
       I think this needs a better name. It isn't clear what 
`options.fromTimestamp()` means in other contexts.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -246,7 +269,7 @@ private void writeOffset(StreamingOffset offset, OutputFile 
file) {
         writer.flush();
       } catch (IOException ioException) {
         throw new UncheckedIOException(
-            String.format("Failed writing offset to: %s", 
initialOffsetLocation), ioException);
+                String.format("Failed writing offset to: %s", 
initialOffsetLocation), ioException);

Review comment:
       Please fix formatting. Looks like your settings are incorrect for 
continuation indents and that is introducing a lot of unnecessary changes, as 
well as incorrect style.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to