singhpk234 commented on code in PR #4479:
URL: https://github.com/apache/iceberg/pull/4479#discussion_r843551400
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -251,6 +273,91 @@ private static StreamingOffset
determineStartingOffset(Table table, Long fromTim
}
}
+ @Override
+ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
+ // calculate end offset get snapshotId from the startOffset
+ Preconditions.checkArgument(
+ startOffset instanceof StreamingOffset, "Invalid start offset: %s is
not a StreamingOffset", startOffset);
+
+ table.refresh();
+ if (table.currentSnapshot() == null) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ // end offset can expand to multiple snapshots
+ StreamingOffset startingOffset = (StreamingOffset) startOffset;
+
+ if (startOffset.equals(StreamingOffset.START_OFFSET)) {
+ startingOffset = determineStartingOffset(table, fromTimestamp);
+ }
+
+ Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
+ int startPosOfSnapOffset = (int) startingOffset.position();
+
+ boolean isOk = true;
+ int curFilesAdded = 0;
+ int curRecordCount = 0;
+ int curPos = 0;
+
+ // Note : we produce nextOffset with pos as non-inclusive
+ while (isOk) {
+ // start iterating the current Snapshot's added Files
+ // this is under assumption we will be able to add at-least 1 file in
the new offset
+ curPos = 0;
+ for (DataFile dataFile : curSnapshot.addedFiles()) {
+ GenericDataFile genericDataFile = (GenericDataFile) dataFile;
+ if (curPos >= startPosOfSnapOffset) {
+ // TODO : use readLimit provided in function param, the readLimits
are derived from these 2 properties.
+ if ((curFilesAdded + 1) > maxFilesPerMicroBatch ||
+ (curRecordCount + genericDataFile.recordCount()) >
maxRecordsPerMicroBatch) {
+ isOk = false;
+ break;
+ }
+
+ curFilesAdded += 1;
+ curRecordCount += genericDataFile.recordCount();
+ }
+
+ ++curPos;
+ }
+ // if the currentSnapShot was also the mostRecentSnapshot then break
+ if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+ break;
+ }
Review Comment:
Thinking of achieving the same via manifest index like we did it here :
https://github.com/apache/iceberg/blob/56d8f07ce6cf7ac99f439848ebf7c8b86f5046df/core/src/main/java/org/apache/iceberg/MicroBatches.java#L116-L121
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]