Stephan Ewen created FLINK-19161:
------------------------------------

             Summary: Port File Sources to FLIP-27 API
                 Key: FLINK-19161
                 URL: https://issues.apache.org/jira/browse/FLINK-19161
             Project: Flink
          Issue Type: Sub-task
          Components: Connectors / FileSystem
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen
             Fix For: 1.12.0


Porting the File sources to the FLIP-27 API means combining the
  - FileInputFormat from the DataSet Batch API
  - The Monitoring File Source from the DataStream API.

The two currently share the same reader code already and partial enumeration 
code.

*Structure*

The new File Source will have three components:

  - File enumerators that discover the files.
  - File split assigners that decide which reader gets what split
  - File Reader Formats, which deal with the decoding.


The main difference between the Bounded (Batch) version and the unbounded 
(Streaming) version is that the streaming version repeatedly invokes the file 
enumerator to search for new files.

*Checkpointing Enumerators*

The enumerators need to checkpoint the not-yet-assigned splits, plus, if they 
are in continuous discovery mode (streaming) the paths / timestamps already 
processed.

*Checkpointing Readers*

The new File Source needs to ensure that every reader can be checkpointed.
Some readers may be able to expose the position in the input file that 
corresponds to the latest emitted record, but many will not be able to do that 
due to
  - storing compresses record batches
  - using buffered decoders where exact position information is not accessible

We therefore suggest to expose a mechanism that combines seekable file offsets 
and records to read and skip after that offset. In the extreme cases, files can 
work only with seekable positions or only with records-to-skip. Some sources, 
like Avro, can have periodic seek points (sync markers) and count 
records-to-skip after these markers.

*Efficient and Convenient Readers*

To balance efficiency (batch vectorized reading of ORC / Parquet for vectorized 
query processing) and convenience (plug in 3-rd party CSV decoder over stream) 
we offer three abstraction for record readers

  - Bulk Formats that run over a file Path and return a iterable batch at a 
time _(most efficient)_

  - File Record formats which read files record-by-record. The source framework 
hands over a pre-defined-size batch from Split Reader to Record Emitter.

  - Stream Formats that decode an input stream and rely on the source framework 
to decide how to batch record handover _(most convenient)_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to