stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r739936992



##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,16 +42,45 @@
   private final FileScanTaskReader<T> fileScanTaskReader;
 
   private final InputFilesDecryptor inputFilesDecryptor;
+  private final CombinedScanTask combinedTask;
+
   private Iterator<FileScanTask> tasks;
   private CloseableIterator<T> currentIterator;
+  private Position position;
 
   public DataIterator(FileScanTaskReader<T> fileScanTaskReader, 
CombinedScanTask task,
                       FileIO io, EncryptionManager encryption) {
     this.fileScanTaskReader = fileScanTaskReader;
 
     this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
+    this.combinedTask = task;
+
     this.tasks = task.files().iterator();
     this.currentIterator = CloseableIterator.empty();
+    // fileOffset starts at -1 because we started
+    // from an empty iterator that is not from the split files.
+    this.position = new Position(-1L, 0L);
+  }
+
+  public void seek(Position startingPosition) {
+    // skip files
+    Preconditions.checkArgument(startingPosition.fileOffset() < 
combinedTask.files().size(),
+        "Checkpointed file offset is %d, while CombinedScanTask has %d files",
+        startingPosition.fileOffset(), combinedTask.files().size());
+    for (long i = 0L; i < startingPosition.fileOffset(); ++i) {

Review comment:
       integer would certainly be sufficient. I was using `long` to match the 
type in `RecordAndPosition` from flink-connector-files module. I have already 
tried to move away from depending on code from flink-connector-files. This 
looks like a miss due to the flink-table-planner-blink_2.12 pulling in 
flink-connector-files transitively.
   
   The `long offset` from Flink's `RecordAndPosition` actually meant byte 
offset within a file. I will define our own `RecordAndPosition` and change 
`fileOffset` to `int` type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to