stevenzwu commented on a change in pull request #2305:
URL: https://github.com/apache/iceberg/pull/2305#discussion_r740413976
##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,18 +42,47 @@
private final FileScanTaskReader<T> fileScanTaskReader;
private final InputFilesDecryptor inputFilesDecryptor;
- private Iterator<FileScanTask> tasks;
+ private final CombinedScanTask combinedTask;
+ private final Position position;
+
+ private Iterator<FileScanTask> fileTasksIterator;
private CloseableIterator<T> currentIterator;
public DataIterator(FileScanTaskReader<T> fileScanTaskReader,
CombinedScanTask task,
FileIO io, EncryptionManager encryption) {
this.fileScanTaskReader = fileScanTaskReader;
this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
- this.tasks = task.files().iterator();
+ this.combinedTask = task;
+ // fileOffset starts at -1 because we started
+ // from an empty iterator that is not from the split files.
+ this.position = new Position(-1, 0L);
Review comment:
I wouldn't say that `seek` capability is FLIP-27 specific. If we think
`DataIterator` as reading a list of files/splits from `CombinedScanTask`, it is
like a file API where `seek` is pretty common. It is needed to achieve
exactly-once semantics on source reading.
Thanks a lot for the `SeekableDataIterator`. I feel that leaving these two
empty abstract methods in the base `DataIterator` is a little weird
```
protected void advanceRecord()
protected void advanceTask()
```
Overall, I still think adding `seek` capability to `DataIterator` is natural
(for file-like read APIs)
##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')
+ compileOnly "org.apache.flink:flink-connector-base"
Review comment:
@openinx Maybe follow-up on the other [comment
discussion](https://github.com/apache/iceberg/pull/3354/files#r740053509) here.
With the SplitEnumerator API change, looks like I need to put FLIP-27 source
in the `v1.13` folder. What should we do with future versions (like 1.14)? do
we copy the FLIP-27 source code `v1.13` to `v1.14` folder?
##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -41,18 +42,47 @@
private final FileScanTaskReader<T> fileScanTaskReader;
private final InputFilesDecryptor inputFilesDecryptor;
- private Iterator<FileScanTask> tasks;
+ private final CombinedScanTask combinedTask;
+ private final Position position;
+
+ private Iterator<FileScanTask> fileTasksIterator;
private CloseableIterator<T> currentIterator;
public DataIterator(FileScanTaskReader<T> fileScanTaskReader,
CombinedScanTask task,
FileIO io, EncryptionManager encryption) {
this.fileScanTaskReader = fileScanTaskReader;
this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
- this.tasks = task.files().iterator();
+ this.combinedTask = task;
+ // fileOffset starts at -1 because we started
+ // from an empty iterator that is not from the split files.
+ this.position = new Position(-1, 0L);
Review comment:
I wouldn't say that `seek` capability is FLIP-27 specific. If we think
`DataIterator` as reading a list of files/splits from `CombinedScanTask`, it is
like a file API where `seek` is pretty common. It is needed to achieve
exactly-once processing semantics. e.g., if we were to implement exactly once
semantics for the current streaming source, I would imagine we need this as
well.
Thanks a lot for the `SeekableDataIterator`. I feel that leaving these two
empty abstract methods in the base `DataIterator` is a little weird
```
protected void advanceRecord()
protected void advanceTask()
```
Overall, I still think adding `seek` capability to `DataIterator` is natural
(for file-like read APIs)
##########
File path: flink/build.gradle
##########
@@ -28,6 +28,7 @@ project(':iceberg-flink') {
implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')
+ compileOnly "org.apache.flink:flink-connector-base"
Review comment:
@openinx Maybe follow-up on the other [comment
discussion](https://github.com/apache/iceberg/pull/3354/files#r740053509) here.
With the SplitEnumerator API change, looks like I need to put FLIP-27 source
in the `v1.13` folder. What should we do with future versions (like 1.14)? do
we copy the FLIP-27 source code from `v1.13` to `v1.14` folder?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]