[ 
https://issues.apache.org/jira/browse/BEAM-11460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anant Damle updated BEAM-11460:
-------------------------------
    Description: 
Data engineers encounter times when schema of Parquet file is unknown at the 
time of writing the pipeline or multiple schema may be present in different 
files. Reading Parquet files using ParquetIO requires providing an Avro 
(equivalent) schema, Many a times its not possible to know the schema of the 
Parquet files.

On the other hand 
[AvroIO|https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html]
 supports reading unknow schema files by providing a parse function : 
{{*#parseGenericRecords(SerializableFunction<GenericRecord,T>)*}}

Supporting this functionality in ParquetIO is simple and requires minimal 
changes to the ParquetIO surface.

 

Pipeline p = ...;

PCollection<String> filepatterns = p.apply(...);

PCollection<Foo> records =
    filepatterns
        .apply(FileIO.matchAll())
        .apply(FileIO.readMatches())
        .apply(ParquetIO.parseGenericRecords(new 
SerializableFunction<GenericRecord, Foo>() {
            public Foo apply(GenericRecord record) {
                // If needed, access the schema of the record using 
record.getSchema()
                return ...;
             }
         }));

  was:
Data engineers encounter times when schema of Parquet file is unknown at the 
time of writing the pipeline or multiple schema may be present in different 
files. Reading Parquet files using ParquetIO requires providing an Avro 
(equivalent) schema, Many a times its not possible to know the schema of the 
Parquet files.

On the other hand 
[AvroIO|https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html]
 supports reading unknow schema files by providing a parse function : 
{{*#parseGenericRecords(SerializableFunction<GenericRecord,T>)*}}

Supporting this functionality in ParquetIO is simple and requires minimal 
changes to the ParquetIO surface.

 
{{ Pipeline p = ...;}}{{PCollection<String> filepatterns = 
p.apply(...);}}{{PCollection<Foo> records =}}
{{     filepatterns}}
{{       .apply(FileIO.matchAll())}}
{{       .apply(FileIO.readMatches())}}
{{       .apply(ParquetIO.parseGenericRecords(new 
SerializableFunction<GenericRecord, Foo>() {}}
{{           public Foo apply(GenericRecord record)}}{{{           }}
{{            // If needed, access the schema of the record using 
record.getSchema()           }}
{{            return ...;           }}
{{          }}}

{{      }));}}


> Support reading Parquet files with unknown schema
> -------------------------------------------------
>
>                 Key: BEAM-11460
>                 URL: https://issues.apache.org/jira/browse/BEAM-11460
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-parquet
>            Reporter: Anant Damle
>            Priority: P1
>              Labels: Parquet
>             Fix For: 2.27.0
>
>   Original Estimate: 336h
>          Time Spent: 10m
>  Remaining Estimate: 335h 50m
>
> Data engineers encounter times when schema of Parquet file is unknown at the 
> time of writing the pipeline or multiple schema may be present in different 
> files. Reading Parquet files using ParquetIO requires providing an Avro 
> (equivalent) schema, Many a times its not possible to know the schema of the 
> Parquet files.
> On the other hand 
> [AvroIO|https://beam.apache.org/releases/javadoc/2.26.0/org/apache/beam/sdk/io/AvroIO.html]
>  supports reading unknow schema files by providing a parse function : 
> {{*#parseGenericRecords(SerializableFunction<GenericRecord,T>)*}}
> Supporting this functionality in ParquetIO is simple and requires minimal 
> changes to the ParquetIO surface.
>  
> Pipeline p = ...;
> PCollection<String> filepatterns = p.apply(...);
> PCollection<Foo> records =
>     filepatterns
>         .apply(FileIO.matchAll())
>         .apply(FileIO.readMatches())
>         .apply(ParquetIO.parseGenericRecords(new 
> SerializableFunction<GenericRecord, Foo>() {
>             public Foo apply(GenericRecord record) {
>                 // If needed, access the schema of the record using 
> record.getSchema()
>                 return ...;
>              }
>          }));



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to