[ 
https://issues.apache.org/jira/browse/FLINK-25416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge updated FLINK-25416:
----------------------------
    Description: 
*Background information*

Current AvroParquet implementation AvroParquetRecordFormat uses the high level 
API ParquetReader that does not provide offset information, which turns out the 
restoreReader logic has big room to improve.

Beyond AvroParquetRecordFormat there is another format implementation 
ParquetVectorizedInputFormat w.r.t. the parquet which is coupled tightly with 
the Table API.

It would be better to provide an unified Parquet BulkFormat with one 
implementation that can support both Table API and DataStream API.

 

*Some thoughts*

Use the low level API {{ParquetFileReader}} with {{BulkFormat}} directly like 
'ParquetVectorizedInputFormat' did instead of with {{StreamFormat}} for the 
following reasons:
 * the read logic is built in the internal low level class 
{{InternalParquetRecordReader}} with package private visibility in 
parquet-hadoop lib which uses another low level class {{ParquetFileReader}} 
internally. This makes the implementation of StreamFormat very complicated. I 
think the design idea of StreamFormat is to simplify the implementation. They 
do not seem to work together.

 * {{{}ParquetFileReader{}}}reads data in batch mode, i.e. {{{}PageReadStore 
pages = reader.readNextFilteredRowGroup();{}}}. If we build these logic into 
StreamFormat({{{}AvroParquetRecordFormat{}}} in this case), 
{{AvroParquetRecordFormat}} has to take over the role 
{{InternalParquetRecordReader}} does, including but not limited to

 # 
 ## read {{PageReadStore}} in batch mode.
 ## manage {{{}PageReadStore{}}}, i.e. read next page when all records in the 
current page have been consumed and cache it.
 ## manage the read index within the current {{PageReadStore}} because 
StreamFormat has its own setting for read size, etc.
All of these make {{AvroParquetRecordFormat}} become the {{BulkFormat}} instead 
of {{StreamFormat}}

 * {{StreamFormat}} can only be used via {{{}StreamFormatAdapter{}}}, which 
means everything we will do with the low level APIs for parquet-hadoop lib 
should have no conflict with the built-in logic provided by 
{{{}StreamFormatAdapter{}}}.

Now we could see if we build these logics into a {{StreamFormat}} 
implementation, i.e. {{{}AvroParquetRecordFormat{}}}, all convenient built-in 
logic provided by the {{StreamFormatAdapter}} turns into obstacles. There is 
also a violation of single responsibility principle, i.e. 
{{{}AvroParquetRecordFormat }}will take some responsibility of 
\{{{}BulkFormat{}}}. These might be the reasons why 
'ParquetVectorizedInputFormat' implemented {{BulkFormat}} instead of 
{{{}StreamFormat{}}}.

In order to build a unified parquet implementation for both Table API and 
DataStream API, it makes more sense to consider building these code into a 
{{BulkFormat}} implementation class. Since the output data types are different, 
{{RowData}} vs. {{{}Avro{}}}, extra converter logic should be introduced into 
the architecture design. Depending on how complicated the issue will be and how 
big the impact it will have on the current code base, a new FLIP might be 
required. 

Following code piece were suggested by Arvid Heise for the next optimized 
AvroParquetReader:
{code:java}
// Injected
            GenericData model = GenericData.get();
            org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();

            // Low level reader - fetch metadata
            ParquetFileReader reader = null;
            MessageType fileSchema = reader.getFileMetaData().getSchema();
            Map<String, String> metaData = 
reader.getFileMetaData().getKeyValueMetaData();

            // init Avro specific things
            AvroReadSupport<T> readSupport = new AvroReadSupport<>(model);
            ReadSupport.ReadContext readContext =
                    readSupport.init(
                            new InitContext(
                                  conf,
                                    metaData.entrySet().stream()
                                            .collect(Collectors.toMap(e -> 
e.getKey(), e -> Collections.singleton(e.getValue()))),
                                    fileSchema));
            RecordMaterializer<T> recordMaterializer = 
readSupport.prepareForRead(conf, metaData, fileSchema, readContext);
            MessageType requestedSchema = readContext.getRequestedSchema();

            // prepare record reader
            ColumnIOFactory columnIOFactory = new 
ColumnIOFactory(reader.getFileMetaData().getCreatedBy());
            MessageColumnIO columnIO = 
columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);

            // for recovery
            while (...) {
              reader.skipNextRowGroup();
            }

            // for reading
            PageReadStore pages;
            for (int block = 0; (pages = reader.readNextRowGroup()) != null; 
block++) {
                RecordReader<T> recordReader = columnIO.getRecordReader(pages, 
recordMaterializer);
                for (int i = 0; i < pages.getRowCount(); i++) {
                    T record = recordReader.read();
                    emit record;
                }
            } {code}

*Required features*
 - [Support complex types | 
https://lists.apache.org/thread/4n5s85tw1ffxwn5p26hdf6dfd6xql2by]
 - [Support 
projection|https://stackoverflow.com/questions/36852162/how-to-read-specific-fields-from-avro-parquet-file-in-java/36871563#36871563]
- Support mixed data model, e.g. read with reflectData from the parquet file 
created with GenericData.

  was:
*Background information*

Current AvroParquet implementation AvroParquetRecordFormat uses the high level 
API ParquetReader that does not provide offset information, which turns out the 
restoreReader logic has big room to improve.

Beyond AvroParquetRecordFormat there is another format implementation 
ParquetVectorizedInputFormat w.r.t. the parquet which is coupled tightly with 
the Table API.

It would be better to provide an unified Parquet BulkFormat with one 
implementation that can support both Table API and DataStream API.

 

*Some thoughts*

Use the low level API {{ParquetFileReader}} with {{BulkFormat}} directly like 
'ParquetVectorizedInputFormat' did instead of with {{StreamFormat}} for the 
following reasons:
 * the read logic is built in the internal low level class 
{{InternalParquetRecordReader}} with package private visibility in 
parquet-hadoop lib which uses another low level class {{ParquetFileReader}} 
internally. This makes the implementation of StreamFormat very complicated. I 
think the design idea of StreamFormat is to simplify the implementation. They 
do not seem to work together.

 * {{{}ParquetFileReader{}}}reads data in batch mode, i.e. {{{}PageReadStore 
pages = reader.readNextFilteredRowGroup();{}}}. If we build these logic into 
StreamFormat({{{}AvroParquetRecordFormat{}}} in this case), 
{{AvroParquetRecordFormat}} has to take over the role 
{{InternalParquetRecordReader}} does, including but not limited to

 ## read {{PageReadStore}} in batch mode.
 ## manage {{{}PageReadStore{}}}, i.e. read next page when all records in the 
current page have been consumed and cache it.
 ## manage the read index within the current {{PageReadStore}} because 
StreamFormat has its own setting for read size, etc.
All of these make {{AvroParquetRecordFormat}} become the {{BulkFormat}} instead 
of {{StreamFormat}}

 * {{StreamFormat}} can only be used via {{{}StreamFormatAdapter{}}}, which 
means everything we will do with the low level APIs for parquet-hadoop lib 
should have no conflict with the built-in logic provided by 
{{{}StreamFormatAdapter{}}}.

Now we could see if we build these logics into a {{StreamFormat}} 
implementation, i.e. {{{}AvroParquetRecordFormat{}}}, all convenient built-in 
logic provided by the {{StreamFormatAdapter}} turns into obstacles. There is 
also a violation of single responsibility principle, i.e. 
{{AvroParquetRecordFormat }}will take some responsibility of 
{{{}BulkFormat{}}}. These might be the reasons why 
'ParquetVectorizedInputFormat' implemented {{BulkFormat}} instead of 
{{{}StreamFormat{}}}.

In order to build a unified parquet implementation for both Table API and 
DataStream API, it makes more sense to consider building these code into a 
{{BulkFormat}} implementation class. Since the output data types are different, 
{{RowData}} vs. {{{}Avro{}}}, extra converter logic should be introduced into 
the architecture design. Depending on how complicated the issue will be and how 
big the impact it will have on the current code base, a new FLIP might be 
required. 

Following code piece were suggested by Arvid Heise for the next optimized 
AvroParquetReader:
{code:java}
// Injected
            GenericData model = GenericData.get();
            org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();

            // Low level reader - fetch metadata
            ParquetFileReader reader = null;
            MessageType fileSchema = reader.getFileMetaData().getSchema();
            Map<String, String> metaData = 
reader.getFileMetaData().getKeyValueMetaData();

            // init Avro specific things
            AvroReadSupport<T> readSupport = new AvroReadSupport<>(model);
            ReadSupport.ReadContext readContext =
                    readSupport.init(
                            new InitContext(
                                  conf,
                                    metaData.entrySet().stream()
                                            .collect(Collectors.toMap(e -> 
e.getKey(), e -> Collections.singleton(e.getValue()))),
                                    fileSchema));
            RecordMaterializer<T> recordMaterializer = 
readSupport.prepareForRead(conf, metaData, fileSchema, readContext);
            MessageType requestedSchema = readContext.getRequestedSchema();

            // prepare record reader
            ColumnIOFactory columnIOFactory = new 
ColumnIOFactory(reader.getFileMetaData().getCreatedBy());
            MessageColumnIO columnIO = 
columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);

            // for recovery
            while (...) {
              reader.skipNextRowGroup();
            }

            // for reading
            PageReadStore pages;
            for (int block = 0; (pages = reader.readNextRowGroup()) != null; 
block++) {
                RecordReader<T> recordReader = columnIO.getRecordReader(pages, 
recordMaterializer);
                for (int i = 0; i < pages.getRowCount(); i++) {
                    T record = recordReader.read();
                    emit record;
                }
            } {code}


> Build unified Parquet BulkFormat for both Table API and DataStream API
> ----------------------------------------------------------------------
>
>                 Key: FLINK-25416
>                 URL: https://issues.apache.org/jira/browse/FLINK-25416
>             Project: Flink
>          Issue Type: New Feature
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: Jing Ge
>            Assignee: Jing Ge
>            Priority: Major
>              Labels: Umbrella
>
> *Background information*
> Current AvroParquet implementation AvroParquetRecordFormat uses the high 
> level API ParquetReader that does not provide offset information, which turns 
> out the restoreReader logic has big room to improve.
> Beyond AvroParquetRecordFormat there is another format implementation 
> ParquetVectorizedInputFormat w.r.t. the parquet which is coupled tightly with 
> the Table API.
> It would be better to provide an unified Parquet BulkFormat with one 
> implementation that can support both Table API and DataStream API.
>  
> *Some thoughts*
> Use the low level API {{ParquetFileReader}} with {{BulkFormat}} directly like 
> 'ParquetVectorizedInputFormat' did instead of with {{StreamFormat}} for the 
> following reasons:
>  * the read logic is built in the internal low level class 
> {{InternalParquetRecordReader}} with package private visibility in 
> parquet-hadoop lib which uses another low level class {{ParquetFileReader}} 
> internally. This makes the implementation of StreamFormat very complicated. I 
> think the design idea of StreamFormat is to simplify the implementation. They 
> do not seem to work together.
>  * {{{}ParquetFileReader{}}}reads data in batch mode, i.e. {{{}PageReadStore 
> pages = reader.readNextFilteredRowGroup();{}}}. If we build these logic into 
> StreamFormat({{{}AvroParquetRecordFormat{}}} in this case), 
> {{AvroParquetRecordFormat}} has to take over the role 
> {{InternalParquetRecordReader}} does, including but not limited to
>  # 
>  ## read {{PageReadStore}} in batch mode.
>  ## manage {{{}PageReadStore{}}}, i.e. read next page when all records in the 
> current page have been consumed and cache it.
>  ## manage the read index within the current {{PageReadStore}} because 
> StreamFormat has its own setting for read size, etc.
> All of these make {{AvroParquetRecordFormat}} become the {{BulkFormat}} 
> instead of {{StreamFormat}}
>  * {{StreamFormat}} can only be used via {{{}StreamFormatAdapter{}}}, which 
> means everything we will do with the low level APIs for parquet-hadoop lib 
> should have no conflict with the built-in logic provided by 
> {{{}StreamFormatAdapter{}}}.
> Now we could see if we build these logics into a {{StreamFormat}} 
> implementation, i.e. {{{}AvroParquetRecordFormat{}}}, all convenient built-in 
> logic provided by the {{StreamFormatAdapter}} turns into obstacles. There is 
> also a violation of single responsibility principle, i.e. 
> {{{}AvroParquetRecordFormat }}will take some responsibility of 
> \{{{}BulkFormat{}}}. These might be the reasons why 
> 'ParquetVectorizedInputFormat' implemented {{BulkFormat}} instead of 
> {{{}StreamFormat{}}}.
> In order to build a unified parquet implementation for both Table API and 
> DataStream API, it makes more sense to consider building these code into a 
> {{BulkFormat}} implementation class. Since the output data types are 
> different, {{RowData}} vs. {{{}Avro{}}}, extra converter logic should be 
> introduced into the architecture design. Depending on how complicated the 
> issue will be and how big the impact it will have on the current code base, a 
> new FLIP might be required. 
> Following code piece were suggested by Arvid Heise for the next optimized 
> AvroParquetReader:
> {code:java}
> // Injected
>             GenericData model = GenericData.get();
>             org.apache.hadoop.conf.Configuration conf = new 
> org.apache.hadoop.conf.Configuration();
>             // Low level reader - fetch metadata
>             ParquetFileReader reader = null;
>             MessageType fileSchema = reader.getFileMetaData().getSchema();
>             Map<String, String> metaData = 
> reader.getFileMetaData().getKeyValueMetaData();
>             // init Avro specific things
>             AvroReadSupport<T> readSupport = new AvroReadSupport<>(model);
>             ReadSupport.ReadContext readContext =
>                     readSupport.init(
>                             new InitContext(
>                                   conf,
>                                     metaData.entrySet().stream()
>                                             .collect(Collectors.toMap(e -> 
> e.getKey(), e -> Collections.singleton(e.getValue()))),
>                                     fileSchema));
>             RecordMaterializer<T> recordMaterializer = 
> readSupport.prepareForRead(conf, metaData, fileSchema, readContext);
>             MessageType requestedSchema = readContext.getRequestedSchema();
>             // prepare record reader
>             ColumnIOFactory columnIOFactory = new 
> ColumnIOFactory(reader.getFileMetaData().getCreatedBy());
>             MessageColumnIO columnIO = 
> columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);
>             // for recovery
>             while (...) {
>               reader.skipNextRowGroup();
>             }
>             // for reading
>             PageReadStore pages;
>             for (int block = 0; (pages = reader.readNextRowGroup()) != null; 
> block++) {
>                 RecordReader<T> recordReader = 
> columnIO.getRecordReader(pages, recordMaterializer);
>                 for (int i = 0; i < pages.getRowCount(); i++) {
>                     T record = recordReader.read();
>                     emit record;
>                 }
>             } {code}
> *Required features*
>  - [Support complex types | 
> https://lists.apache.org/thread/4n5s85tw1ffxwn5p26hdf6dfd6xql2by]
>  - [Support 
> projection|https://stackoverflow.com/questions/36852162/how-to-read-specific-fields-from-avro-parquet-file-in-java/36871563#36871563]
> - Support mixed data model, e.g. read with reflectData from the parquet file 
> created with GenericData.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to