AHeise commented on a change in pull request #15725:
URL: https://github.com/apache/flink/pull/15725#discussion_r647334244



##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
##########
@@ -97,7 +97,15 @@
      */
     protected ParquetInputFormat(Path path, MessageType messageType) {
         super(path);
-        this.expectedFileSchema = checkNotNull(messageType, "messageType");
+        if (messageType == null) {
+            try {
+                this.expectedFileSchema = readParquetSchemaFromFile(path);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            this.expectedFileSchema = messageType;
+        }

Review comment:
       The constructor is not meant to do any I/O operations. I propose to 
simply leave the `expectedFileSchema` at null.
   `#open` would then simply use reader schema = writer schema, which was 
probably you intention.

##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java
##########
@@ -40,10 +40,20 @@
  */
 public class ParquetMapInputFormat extends ParquetInputFormat<Map> {
 
+    /**
+     * @deprecated using this constructor is deprecated as the {@link 
MessageType} can be determined
+     *     from the file and does not need to be provided by the user. Prefer 
using the constructor
+     *     without MessageType parameter.
+     */
+    @Deprecated
     public ParquetMapInputFormat(Path path, MessageType messageType) {
         super(path, messageType);

Review comment:
       Since we do not check `messageType != null` anymore in base class, we 
need to add a `Preconditions#checkNotNull` here.

##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
##########
@@ -160,13 +168,16 @@ public void setFilterPredicate(FilterPredicate 
filterPredicate) {
     public void open(FileInputSplit split) throws IOException {
         // reset the flag when open a new split
         this.skipThisSplit = false;
+        // TODO why is this needed to read the parquet schema from file in 
open() ? Can't we use the
+        //  already extracted schema (extracted in the constructor) ?
         org.apache.hadoop.conf.Configuration configuration =
                 new org.apache.hadoop.conf.Configuration();
         InputFile inputFile =
                 HadoopInputFile.fromPath(
                         new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
         ParquetReadOptions options = ParquetReadOptions.builder().build();
         ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+
         MessageType fileSchema = fileReader.getFileMetaData().getSchema();
         MessageType readSchema = getReadSchema(fileSchema, split.getPath());

Review comment:
       Here we should check if `fieldNames` is null and fill them. If set, call 
`getReadSchema` as is.

##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
##########
@@ -160,13 +168,16 @@ public void setFilterPredicate(FilterPredicate 
filterPredicate) {
     public void open(FileInputSplit split) throws IOException {
         // reset the flag when open a new split
         this.skipThisSplit = false;
+        // TODO why is this needed to read the parquet schema from file in 
open() ? Can't we use the
+        //  already extracted schema (extracted in the constructor) ?
         org.apache.hadoop.conf.Configuration configuration =
                 new org.apache.hadoop.conf.Configuration();
         InputFile inputFile =
                 HadoopInputFile.fromPath(
                         new 
org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
         ParquetReadOptions options = ParquetReadOptions.builder().build();
         ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+
         MessageType fileSchema = fileReader.getFileMetaData().getSchema();

Review comment:
       Here we read the write schema already. No need to add 
`readParquetSchemaFromFile`.

##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
##########
@@ -160,13 +168,16 @@ public void setFilterPredicate(FilterPredicate 
filterPredicate) {
     public void open(FileInputSplit split) throws IOException {
         // reset the flag when open a new split
         this.skipThisSplit = false;
+        // TODO why is this needed to read the parquet schema from file in 
open() ? Can't we use the
+        //  already extracted schema (extracted in the constructor) ?

Review comment:
       See above.

##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java
##########
@@ -40,10 +40,20 @@
  */
 public class ParquetMapInputFormat extends ParquetInputFormat<Map> {
 
+    /**
+     * @deprecated using this constructor is deprecated as the {@link 
MessageType} can be determined
+     *     from the file and does not need to be provided by the user. Prefer 
using the constructor
+     *     without MessageType parameter.
+     */
+    @Deprecated

Review comment:
       Remove comment, see high-level review comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to