StephanEwen opened a new pull request #13344:
URL: https://github.com/apache/flink/pull/13344


   ## What is the purpose of the change
   
   This PR mainly introduces a **new File Data Source** based on the *FLIP-27 
source API*, under `flink-connectors/flink-connector-files`. These changes are 
in commit 2f758f14b79e8879921b534db215c1e10c9571b1
   
   *(Note that this does not go into the existing 
`flink-connectors/flink-connector-filesystem` project because having this in a 
new module is a cleaner separation, and the new API needs fewer dependencies)*
   
   To allow the File Source API to also efficiently support formats like 
Parquet and ORC, some changes were made to the *Split Reader API*, mainly in 
changing the structure through which fetched records are communicated between 
the Split Reader and the main execution thread. This is in 
   
   To verify that this Source API is viable, this PR has some input format 
implementations, like a `TextInputFormat` (reading String lines), and stubs for 
a CVS reader and a vectorized ORC reader.
   
   ## Pointers for Reviewing - More efficient Record Handover in Split Reader 
API
   
   There are mainly two relevant parts, as outlined below. The rest is 
straightforward adjustments.
   
   The core change is in the 
[RecordsWithSplitIds](https://github.com/apache/flink/commit/887280ca02887942a898fe194cbcbcfb6ef626ef#diff-7fe478d4170bc2b285251c6cbf0a4580)
 class.
   
   These changes also lead to a changed (I believe somewhat simpler) 
implementation of 
[SourceReaderBase.pollNext(ReaderOutput)](https://github.com/apache/flink/commit/887280ca02887942a898fe194cbcbcfb6ef626ef#diff-3f2f17977a25abfc1cf8e8d60e6c2ca3)
   
   ## Pointers for Reviewing - File Source API
   
   The main attention should probably go to the core API interfaces and 
classes, because these will be harder to change in the future:
     - `FileSource` as the main entry point in the API
     - The reader formats
         - `StreamFormat` (easy to implement, but not full control over object 
reuse, etc. Used for Text, CSV, ...)
         - `BulkFormat` (directly read batches, most efficient but more 
involved to implement, used for ORC, Parquet, ...)
     - The interfaces that define file discovery and assignment: 
`FileEnumerator` and `FileSplitAssigner`
   
   A good way to get a feeling for the API is to look at the 
[FileSourceTextLinesITCase](https://github.com/StephanEwen/flink/blob/merge_file_source/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java)
   
   To check whether the abstraction for *reader formats* (the file readers / 
decoders) is intuitive and efficient, I'd recommend to look at these examples:
     - 
[TextLineFormat](https://github.com/apache/flink/commit/2f758f14b79e8879921b534db215c1e10c9571b1#diff-95ed21a09986ce6fe44662154832b181)
 making use of a `StreamFormat`
     - *CSV Reader* (4908d0466df5efd8d0e09e8ec76fd64be041454b) making use of a 
`StreamFormat`
     - *ORC Vectorized Readers (f6126b919214a98a11179d1d6444af742ffe257a) both 
for the case of returning a `VectorizedColumnBatch` as one, and for the case of 
returning `Rows`.
   
   It is worth noting that the `BulkFormat.Reader` is fairly close to the 
`SplitReader` itself (fetching a batch of records). This is on purpose, to 
support high efficiency readers. The main difference is that the `SplitReader` 
has awareness for splits and the `BulkFormat.Reader` has not.
   
   ## Testing this change
   
   This change can be tested by creating a `DataStream` API program that reads 
files, following this pattern:
   
   *Batch*
   ```java
   FileSource<String> source = FileSource
       .forRecordStreamFormat(new TextLineFormat(), new 
Path("file:///some/sample/file"))
       .build();
   
   DataStream<String> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "file-source");
   ```
   
   *Streaming*
   ```java
   FileSource<String> source = FileSource
       .forRecordStreamFormat(new TextLineFormat(), new 
Path("file:///some/sample/file"))
       .monitorContinuously(Duration.ofSeconds(5))
       .build();
   
   DataStream<String> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "file-source");
   ```
   
   See 
[FileSourceTextLinesITCase](https://github.com/StephanEwen/flink/blob/merge_file_source/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java)
 for an end-to-end example.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **yes**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **yes**
     - If yes, how is the feature documented? **not documented**
   
   The API is not fully cross-reviewed yet and might change. Docs will be added 
as soon as the File Source API is approved.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to