kbendick commented on a change in pull request #3039:
URL: https://github.com/apache/iceberg/pull/3039#discussion_r697713667
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -106,7 +107,9 @@
this.splitOpenFileCost = Spark3Util.propertyAsLong(
options, SparkReadOptions.FILE_OPEN_COST, tableSplitOpenFileCost);
- InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table,
checkpointLocation);
+ this.fromTimestamp = Spark3Util.propertyAsLong(options,
SparkReadOptions.STREAM_FROM_TIMESTAMP, -1L);
+
+ InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table,
checkpointLocation, this.fromTimestamp);
Review comment:
Nit: I believe it would be more consistent to drop the usage of `this`
in `this.fromTimestamp`. We don't seem to use `this` in other locations in this
file or elsewhere, but there could be a good reason for this. 🙂
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -224,11 +227,13 @@ private boolean shouldProcess(Snapshot snapshot) {
private final Table table;
private final FileIO io;
private final String initialOffsetLocation;
+ private final long fromTimestamp;
Review comment:
Nit: Does the naming of this field generate any warnings about mirroring
the same name as the external class? Namely from Error Prone upon compilation?
##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
##########
@@ -53,4 +53,7 @@ private SparkReadOptions() {
// skip snapshots of type delete while reading stream out of iceberg table
public static final String STREAMING_SKIP_DELETE_SNAPSHOTS =
"streaming-skip-delete-snapshots";
+
+ // Timestamp in milliseconds; start a stream from the snapshot that occurs
after this timestamp
+ public static final String STREAM_FROM_TIMESTAMP = "stream-from-timestamp";
Review comment:
A few nits / questions here:
1) Are there any concerns with the users timezone or anything? I'm still
wrapping my head around all of the code for streaming as source with Spark, so
forgive me if this is covered already. But the raw timestamp I'm assuming is
supposed to be in millis from the epoch in UTC or can users configure their
setup so that table snapshots have non-UTC timestamps? I've always used UTC so
this might not actually be a concern.
2) Looking at the `oldestSnapshot` code, it seems like it's grabbing at the
first snapshot it can find that is older than the timestamp passed in. Is the
phrase `from the snapshot that occurs after this timestamp` correct or possibly
a little ambiguous? I might just need more coffee, but I would have expected
`after this timestamp` to mean we grab the first snapshot that occurred later
than this value.
3) This might be something to handle in another PR, but should we consider
allowing for timestamps as something other than strings representing timestamps
in millis? Like either as timestamp formatted strings, and not just millis, or
even as `NOW() - INTERVAL '5 days'` kind of thing, which would likely need to
be added as a spark resolution rule.
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -65,12 +65,16 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
}
/**
- * Traverses the history of the table's current snapshot and finds the
oldest Snapshot.
- * @return null if there is no current snapshot in the table, else the
oldest Snapshot.
+ * Traverses the history of the table's current snapshot and finds the
oldest Snapshot after the timestamp.
+ * @return null if there is no current snapshot in the table, else the
oldest Snapshot after the timestamp.
Review comment:
Nit: Could you document for the Javadoc that the timestamp here is
expected in milliseconds?
Also, I'm a bit unclear If this is grabbing the snapshot that occurs just
prior to this timestamp or just after. Possibly I'm just expecting it to grab
one just prior to (or exactly at) the given timestamp, or maybe I just need
more coffee 😝
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -165,6 +165,32 @@ public void
testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadingStreamFromTimestamp() throws Exception {
+ List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
+ new SimpleRecord(-2, "minustwo"),
+ new SimpleRecord(-1, "minusone"),
+ new SimpleRecord(0, "zero"));
+ appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
+
+ table.refresh();
+ long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
+
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expected, tableIdentifier);
+
+ table.refresh();
+
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .option(SparkReadOptions.STREAM_FROM_TIMESTAMP,
Long.toString(streamStartTimestamp))
+ .load(tableIdentifier);
+ List<SimpleRecord> actual = processAvailable(df);
+
+
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+ }
Review comment:
Non-blocking question:
Could we possibly add a test that specifies a timestamp that isn't just one
above the current snapshots timestamp? I see there are then multiple append
based snapshots, but it would be nice to have a test that uses a snapshot that
maybe isn't so clean (though if that leads to a flakey test then let's not
introduce that unless there's a way to make it not flakey).
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -65,12 +65,16 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
}
/**
- * Traverses the history of the table's current snapshot and finds the
oldest Snapshot.
- * @return null if there is no current snapshot in the table, else the
oldest Snapshot.
+ * Traverses the history of the table's current snapshot and finds the
oldest Snapshot after the timestamp.
+ * @return null if there is no current snapshot in the table, else the
oldest Snapshot after the timestamp.
*/
- public static Snapshot oldestSnapshot(Table table) {
+ public static Snapshot oldestSnapshot(Table table, long timestamp) {
Snapshot current = table.currentSnapshot();
- while (current != null && current.parentId() != null) {
+ if (current == null || current.timestampMillis() < timestamp) {
+ return null;
+ }
+
+ while (current.parentId() != null &&
table.snapshot(current.parentId()).timestampMillis() > timestamp) {
Review comment:
What would happen if there was a snapshot exactly at this timestamp? Do
we want to allow users to specify an exact starting timestamp (assuming they
know the exact timestamp of the snapshot they're looking for)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]