jackye1995 commented on code in PR #4479:
URL: https://github.com/apache/iceberg/pull/4479#discussion_r1120732086
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -282,6 +310,128 @@ private static StreamingOffset
determineStartingOffset(Table table, Long fromTim
}
}
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
+ // calculate end offset get snapshotId from the startOffset
+ Preconditions.checkArgument(
+ startOffset instanceof StreamingOffset,
+ "Invalid start offset: %s is not a StreamingOffset",
+ startOffset);
+
+ table.refresh();
+ if (table.currentSnapshot() == null) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ // end offset can expand to multiple snapshots
+ StreamingOffset startingOffset = (StreamingOffset) startOffset;
+
+ if (startOffset.equals(StreamingOffset.START_OFFSET)) {
+ startingOffset = determineStartingOffset(table, fromTimestamp);
+ }
+
+ Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
+ int startPosOfSnapOffset = (int) startingOffset.position();
+
+ boolean scanAllFiles = startingOffset.shouldScanAllFiles();
+
+ boolean isOk = true;
Review Comment:
nit: can we be more concrete about this naming?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -282,6 +310,128 @@ private static StreamingOffset
determineStartingOffset(Table table, Long fromTim
}
}
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
+ // calculate end offset get snapshotId from the startOffset
+ Preconditions.checkArgument(
+ startOffset instanceof StreamingOffset,
+ "Invalid start offset: %s is not a StreamingOffset",
+ startOffset);
+
+ table.refresh();
+ if (table.currentSnapshot() == null) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ // end offset can expand to multiple snapshots
+ StreamingOffset startingOffset = (StreamingOffset) startOffset;
+
+ if (startOffset.equals(StreamingOffset.START_OFFSET)) {
+ startingOffset = determineStartingOffset(table, fromTimestamp);
+ }
+
+ Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
+ int startPosOfSnapOffset = (int) startingOffset.position();
+
+ boolean scanAllFiles = startingOffset.shouldScanAllFiles();
+
+ boolean isOk = true;
Review Comment:
nit: can we be more concrete about this naming? What do you mean by ok?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]