rdblue commented on a change in pull request #3039:
URL: https://github.com/apache/iceberg/pull/3039#discussion_r753832515
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -101,6 +101,41 @@ public static Snapshot oldestAncestor(Table table) {
return ancestorsOf(start, lookup);
}
+ /**
+ * Traverses the history of the table's current snapshot
+ * and finds the oldest ancestor snapshot after or equal to the timestamp in
milliseconds.
+ * @return null if there is no current snapshot in the table,
+ * else the oldest ancestor snapshot after or equal to the timestamp in
milliseconds.
+ */
+ public static Snapshot oldestAncestorAfter(Table table, Long
timestampMillis) {
+ Snapshot current = table.currentSnapshot();
+ long timestamp = timestampMillis == null ? -1L : timestampMillis;
+ if (current == null || current.timestampMillis() < timestamp) {
+ return null;
+ }
+
+ for (Snapshot snapshot : currentAncestors(table)) {
+ if (snapshot.timestampMillis() < timestamp) {
+ break;
+ }
+
+ current = snapshot;
+ }
+
+ return current;
Review comment:
I don't think this logic is correct. In the version that I wrote, the
method will throw an `IllegalStateException` if it cannot determine whether the
ancestor was actually the first snapshot after the given timestamp. It is fine
if the behavior you want is to ignore that case in practice, but the utility
methods in `SnapshotUtil` should be more careful. What you should do instead is
call `oldestAncestor` if this fails to find one and you want to default to the
oldest ancestor.
##########
File path:
spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -62,6 +62,7 @@
import static org.apache.iceberg.expressions.Expressions.ref;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.junit.jupiter.api.Assertions.assertTrue;
Review comment:
Can you revert unnecessary changes?
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -211,4 +211,10 @@ public boolean handleTimestampWithoutZone() {
.defaultValue(SparkSQLProperties.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE_DEFAULT)
.parse();
}
+
+ public Long fromTimestamp() {
Review comment:
I think this needs a better name. It isn't clear what
`options.fromTimestamp()` means in other contexts.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -246,7 +269,7 @@ private void writeOffset(StreamingOffset offset, OutputFile
file) {
writer.flush();
} catch (IOException ioException) {
throw new UncheckedIOException(
- String.format("Failed writing offset to: %s",
initialOffsetLocation), ioException);
+ String.format("Failed writing offset to: %s",
initialOffsetLocation), ioException);
Review comment:
Please fix formatting. Looks like your settings are incorrect for
continuation indents and that is introducing a lot of unnecessary changes, as
well as incorrect style.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]