namrathamyske commented on code in PR #4479:
URL: https://github.com/apache/iceberg/pull/4479#discussion_r1231281836


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -283,6 +302,128 @@ private static StreamingOffset 
determineStartingOffset(Table table, Long fromTim
     }
   }
 
+  @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  public Offset latestOffset(Offset startOffset, ReadLimit limit) {
+    // calculate end offset get snapshotId from the startOffset
+    Preconditions.checkArgument(
+        startOffset instanceof StreamingOffset,
+        "Invalid start offset: %s is not a StreamingOffset",
+        startOffset);
+
+    table.refresh();
+    if (table.currentSnapshot() == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // end offset can expand to multiple snapshots
+    StreamingOffset startingOffset = (StreamingOffset) startOffset;
+
+    if (startOffset.equals(StreamingOffset.START_OFFSET)) {
+      startingOffset = determineStartingOffset(table, fromTimestamp);
+    }
+
+    Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
+    int startPosOfSnapOffset = (int) startingOffset.position();
+
+    boolean scanAllFiles = startingOffset.shouldScanAllFiles();
+
+    boolean shouldContinueReading = true;
+    int curFilesAdded = 0;
+    int curRecordCount = 0;
+    int curPos = 0;
+
+    // Note : we produce nextOffset with pos as non-inclusive
+    while (shouldContinueReading) {
+      // generate manifest index for the curSnapshot
+      List<Pair<ManifestFile, Integer>> indexedManifests =
+          MicroBatches.skippedManifestIndexesFromSnapshot(
+              table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
+      // this is under assumption we will be able to add at-least 1 file in 
the new offset
+      for (int idx = 0; idx < indexedManifests.size() && 
shouldContinueReading; idx++) {
+        // be rest assured curPos >= startFileIndex
+        curPos = indexedManifests.get(idx).second();
+        try (CloseableIterable<FileScanTask> taskIterable =
+                MicroBatches.openManifestFile(
+                    table.io(),
+                    table.specs(),
+                    caseSensitive,
+                    curSnapshot,
+                    indexedManifests.get(idx).first(),
+                    scanAllFiles);
+            CloseableIterator<FileScanTask> taskIter = 
taskIterable.iterator()) {
+          while (taskIter.hasNext()) {
+            FileScanTask task = taskIter.next();
+            if (curPos >= startPosOfSnapOffset) {
+              // TODO : use readLimit provided in function param, the 
readLimits are derived from

Review Comment:
   @singhpk234 any reason we don't use this from function parameter and marked 
this as TODO? I see that `limit` is not used in the whole function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to