[
https://issues.apache.org/jira/browse/FLINK-25416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jing Ge updated FLINK-25416:
----------------------------
Labels: Umbrella (was: )
> Build unified Parquet BulkFormat for both Table API and DataStream API
> ----------------------------------------------------------------------
>
> Key: FLINK-25416
> URL: https://issues.apache.org/jira/browse/FLINK-25416
> Project: Flink
> Issue Type: New Feature
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Reporter: Jing Ge
> Assignee: Jing Ge
> Priority: Major
> Labels: Umbrella
>
> *Background information*
> Current AvroParquet implementation AvroParquetRecordFormat uses the high
> level API ParquetReader that does not provide offset information, which turns
> out the restoreReader logic has big room to improve.
> Beyond AvroParquetRecordFormat there is another format implementation
> ParquetVectorizedInputFormat w.r.t. the parquet which is coupled tightly with
> the Table API.
> It would be better to provide an unified Parquet BulkFormat with one
> implementation that can support both Table API and DataStream API.
>
> *Some thoughts*
> Use the low level API {{ParquetFileReader}} with {{BulkFormat}} directly like
> 'ParquetVectorizedInputFormat' did instead of with {{StreamFormat}} for the
> following reasons:
> * the read logic is built in the internal low level class
> {{InternalParquetRecordReader}} with package private visibility in
> parquet-hadoop lib which uses another low level class {{ParquetFileReader}}
> internally. This makes the implementation of StreamFormat very complicated. I
> think the design idea of StreamFormat is to simplify the implementation. They
> do not seem to work together.
> * {{{}ParquetFileReader{}}}reads data in batch mode, i.e. {{{}PageReadStore
> pages = reader.readNextFilteredRowGroup();{}}}. If we build these logic into
> StreamFormat({{{}AvroParquetRecordFormat{}}} in this case),
> {{AvroParquetRecordFormat}} has to take over the role
> {{InternalParquetRecordReader}} does, including but not limited to
> ## read {{PageReadStore}} in batch mode.
> ## manage {{{}PageReadStore{}}}, i.e. read next page when all records in the
> current page have been consumed and cache it.
> ## manage the read index within the current {{PageReadStore}} because
> StreamFormat has its own setting for read size, etc.
> All of these make {{AvroParquetRecordFormat}} become the {{BulkFormat}}
> instead of {{StreamFormat}}
> * {{StreamFormat}} can only be used via {{{}StreamFormatAdapter{}}}, which
> means everything we will do with the low level APIs for parquet-hadoop lib
> should have no conflict with the built-in logic provided by
> {{{}StreamFormatAdapter{}}}.
> Now we could see if we build these logics into a {{StreamFormat}}
> implementation, i.e. {{{}AvroParquetRecordFormat{}}}, all convenient built-in
> logic provided by the {{StreamFormatAdapter}} turns into obstacles. There is
> also a violation of single responsibility principle, i.e.
> {{AvroParquetRecordFormat }}will take some responsibility of
> {{{}BulkFormat{}}}. These might be the reasons why
> 'ParquetVectorizedInputFormat' implemented {{BulkFormat}} instead of
> {{{}StreamFormat{}}}.
> In order to build a unified parquet implementation for both Table API and
> DataStream API, it makes more sense to consider building these code into a
> {{BulkFormat}} implementation class. Since the output data types are
> different, {{RowData}} vs. {{{}Avro{}}}, extra converter logic should be
> introduced into the architecture design. Depending on how complicated the
> issue will be and how big the impact it will have on the current code base, a
> new FLIP might be required.
> Following code piece were suggested by Arvid Heise for the next optimized
> AvroParquetReader:
> {code:java}
> // Injected
> GenericData model = GenericData.get();
> org.apache.hadoop.conf.Configuration conf = new
> org.apache.hadoop.conf.Configuration();
> // Low level reader - fetch metadata
> ParquetFileReader reader = null;
> MessageType fileSchema = reader.getFileMetaData().getSchema();
> Map<String, String> metaData =
> reader.getFileMetaData().getKeyValueMetaData();
> // init Avro specific things
> AvroReadSupport<T> readSupport = new AvroReadSupport<>(model);
> ReadSupport.ReadContext readContext =
> readSupport.init(
> new InitContext(
> conf,
> metaData.entrySet().stream()
> .collect(Collectors.toMap(e ->
> e.getKey(), e -> Collections.singleton(e.getValue()))),
> fileSchema));
> RecordMaterializer<T> recordMaterializer =
> readSupport.prepareForRead(conf, metaData, fileSchema, readContext);
> MessageType requestedSchema = readContext.getRequestedSchema();
> // prepare record reader
> ColumnIOFactory columnIOFactory = new
> ColumnIOFactory(reader.getFileMetaData().getCreatedBy());
> MessageColumnIO columnIO =
> columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);
> // for recovery
> while (...) {
> reader.skipNextRowGroup();
> }
> // for reading
> PageReadStore pages;
> for (int block = 0; (pages = reader.readNextRowGroup()) != null;
> block++) {
> RecordReader<T> recordReader =
> columnIO.getRecordReader(pages, recordMaterializer);
> for (int i = 0; i < pages.getRowCount(); i++) {
> T record = recordReader.read();
> emit record;
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)