StephanEwen opened a new pull request #13344: URL: https://github.com/apache/flink/pull/13344
## What is the purpose of the change This PR mainly introduces a **new File Data Source** based on the *FLIP-27 source API*, under `flink-connectors/flink-connector-files`. These changes are in commit 2f758f14b79e8879921b534db215c1e10c9571b1 *(Note that this does not go into the existing `flink-connectors/flink-connector-filesystem` project because having this in a new module is a cleaner separation, and the new API needs fewer dependencies)* To allow the File Source API to also efficiently support formats like Parquet and ORC, some changes were made to the *Split Reader API*, mainly in changing the structure through which fetched records are communicated between the Split Reader and the main execution thread. This is in To verify that this Source API is viable, this PR has some input format implementations, like a `TextInputFormat` (reading String lines), and stubs for a CVS reader and a vectorized ORC reader. ## Pointers for Reviewing - More efficient Record Handover in Split Reader API There are mainly two relevant parts, as outlined below. The rest is straightforward adjustments. The core change is in the [RecordsWithSplitIds](https://github.com/apache/flink/commit/887280ca02887942a898fe194cbcbcfb6ef626ef#diff-7fe478d4170bc2b285251c6cbf0a4580) class. These changes also lead to a changed (I believe somewhat simpler) implementation of [SourceReaderBase.pollNext(ReaderOutput)](https://github.com/apache/flink/commit/887280ca02887942a898fe194cbcbcfb6ef626ef#diff-3f2f17977a25abfc1cf8e8d60e6c2ca3) ## Pointers for Reviewing - File Source API The main attention should probably go to the core API interfaces and classes, because these will be harder to change in the future: - `FileSource` as the main entry point in the API - The reader formats - `StreamFormat` (easy to implement, but not full control over object reuse, etc. Used for Text, CSV, ...) - `BulkFormat` (directly read batches, most efficient but more involved to implement, used for ORC, Parquet, ...) - The interfaces that define file discovery and assignment: `FileEnumerator` and `FileSplitAssigner` A good way to get a feeling for the API is to look at the [FileSourceTextLinesITCase](https://github.com/StephanEwen/flink/blob/merge_file_source/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java) To check whether the abstraction for *reader formats* (the file readers / decoders) is intuitive and efficient, I'd recommend to look at these examples: - [TextLineFormat](https://github.com/apache/flink/commit/2f758f14b79e8879921b534db215c1e10c9571b1#diff-95ed21a09986ce6fe44662154832b181) making use of a `StreamFormat` - *CSV Reader* (4908d0466df5efd8d0e09e8ec76fd64be041454b) making use of a `StreamFormat` - *ORC Vectorized Readers (f6126b919214a98a11179d1d6444af742ffe257a) both for the case of returning a `VectorizedColumnBatch` as one, and for the case of returning `Rows`. It is worth noting that the `BulkFormat.Reader` is fairly close to the `SplitReader` itself (fetching a batch of records). This is on purpose, to support high efficiency readers. The main difference is that the `SplitReader` has awareness for splits and the `BulkFormat.Reader` has not. ## Testing this change This change can be tested by creating a `DataStream` API program that reads files, following this pattern: *Batch* ```java FileSource<String> source = FileSource .forRecordStreamFormat(new TextLineFormat(), new Path("file:///some/sample/file")) .build(); DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); ``` *Streaming* ```java FileSource<String> source = FileSource .forRecordStreamFormat(new TextLineFormat(), new Path("file:///some/sample/file")) .monitorContinuously(Duration.ofSeconds(5)) .build(); DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); ``` See [FileSourceTextLinesITCase](https://github.com/StephanEwen/flink/blob/merge_file_source/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java) for an end-to-end example. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **yes** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? **not documented** The API is not fully cross-reviewed yet and might change. Docs will be added as soon as the File Source API is approved. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
