XComp commented on code in PR #21149:
URL: https://github.com/apache/flink/pull/21149#discussion_r1102396477


##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java:
##########
@@ -123,7 +122,8 @@ public ParquetReader createReader(final Configuration 
config, final SplitT split
         FilterCompat.Filter filter = getFilter(hadoopConfig.conf());
         List<BlockMetaData> blocks = filterRowGroups(filter, 
footer.getBlocks(), fileSchema);
 
-        MessageType requestedSchema = clipParquetSchema(fileSchema);
+        HashSet<Integer> unknownFieldsIndices = new HashSet<>();

Review Comment:
   ```suggestion
           Set<Integer> unknownFieldsIndices = new HashSet<>();
   ```
   nit: Generally, it's good practice to use interfaces instead of classes in 
declarations.



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java:
##########
@@ -309,6 +311,8 @@ private class ParquetReader implements BulkFormat.Reader<T> 
{
 
         private final MessageType requestedSchema;
 
+        private final HashSet<Integer> unknownFieldsIndices;

Review Comment:
   ```suggestion
           private final Set<Integer> unknownFieldsIndices;
   ```
   nit: again, interfaces should be prefered.



##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,6 +294,80 @@ void testProjectionReadUnknownField(int rowGroupSize) 
throws IOException {
                 });
     }
 
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testProjectionReadUnknownFieldAcrossFiles(int rowGroupSize) throws 
IOException {
+        int number = 1000;
+        List<RowData> records = new ArrayList<>(number);
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            records.add(newRow(v));
+        }
+        // create parquet files for both legacy and new
+        // f7 and f99 don’t exist in legacy parquet file
+        // f99 doesn't exist in new parquet file
+        // create legacy parquet file
+        RowType legacyParquetType =
+                new RowType(
+                        IntStream.range(0, 6) // assume f7 not exist in legacy 
parquet file.

Review Comment:
   ```suggestion
                           IntStream.range(0, 6) // assume f7 and following 
columns do not exist in legacy parquet file.
   ```
   Am I missing something here or should is the comment not reflecting the 
statement? :thinking: Don't we only create a `RowType` for the first 6 columns? 
:thinking:



##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,6 +294,80 @@ void testProjectionReadUnknownField(int rowGroupSize) 
throws IOException {
                 });
     }
 
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testProjectionReadUnknownFieldAcrossFiles(int rowGroupSize) throws 
IOException {
+        int number = 1000;

Review Comment:
   What's the intention behind having such a large testdata set? Is it because 
we iterating over multiple group sizes that are provided through the 
`parameter()` test data method source? If the `1000` was selected because of 
that, it would be worth creating a static final field `MAX_ROW_GROUP_SIZE` and 
use this here as well. That would make it clearer why the value `1000` was 
selected.



##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,6 +294,80 @@ void testProjectionReadUnknownField(int rowGroupSize) 
throws IOException {
                 });
     }
 
+    @ParameterizedTest
+    @MethodSource("parameters")

Review Comment:
   Out of curiosity: What do we gain by having so much test data in terms of 
`rowGroupSize`? Isn't the test here the same for a groupSize of 10 and 20 for 
instance which means we're not gaining anything from the test runs? Or am I 
missing something here?



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java:
##########
@@ -170,7 +171,8 @@ public boolean isSplittable() {
     }
 
     /** Clips `parquetSchema` according to `fieldNames`. */
-    private MessageType clipParquetSchema(GroupType parquetSchema) {
+    private MessageType clipParquetSchema(
+            GroupType parquetSchema, Set<Integer> unknownFieldsIndices) {

Review Comment:
   ```suggestion
               GroupType parquetSchema, Collection<Integer> 
unknownFieldsIndices) {
   ```
   nit: We're only using `add` in this method. Therefore, we could make the 
method signature even more relaxed. 



##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,6 +294,80 @@ void testProjectionReadUnknownField(int rowGroupSize) 
throws IOException {
                 });
     }
 
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testProjectionReadUnknownFieldAcrossFiles(int rowGroupSize) throws 
IOException {
+        int number = 1000;
+        List<RowData> records = new ArrayList<>(number);
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            records.add(newRow(v));
+        }

Review Comment:
   ```suggestion
           final List<RowData> records =
                   IntStream.range(0, 
1000).mapToObj(this::newRow).collect(Collectors.toList());
   ```
   nit: your proposal works fine. I just got puzzled by the `Integer v = i` 
line which is obsolete. It made me think how to shorten the overall block. 
...`newRow` could be even made `static`.



##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,6 +294,80 @@ void testProjectionReadUnknownField(int rowGroupSize) 
throws IOException {
                 });
     }
 
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testProjectionReadUnknownFieldAcrossFiles(int rowGroupSize) throws 
IOException {
+        int number = 1000;
+        List<RowData> records = new ArrayList<>(number);
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            records.add(newRow(v));
+        }
+        // create parquet files for both legacy and new
+        // f7 and f99 don’t exist in legacy parquet file
+        // f99 doesn't exist in new parquet file
+        // create legacy parquet file
+        RowType legacyParquetType =
+                new RowType(
+                        IntStream.range(0, 6) // assume f7 not exist in legacy 
parquet file.
+                                .mapToObj(i -> ROW_TYPE.getFields().get(i))
+                                .collect(Collectors.toList()));
+        Path legacyParquetPath =
+                createTempParquetFile(
+                        new File(folder, "legacy"), legacyParquetType, 
records, rowGroupSize);
+        // create new parquet file
+        Path newParquetPath = createTempParquetFile(new File(folder, "new"), 
records, rowGroupSize);
+
+        // test reader
+        String[] readerColumnNames = new String[] {"f7", "f2", "f4", "f99"};
+        LogicalType[] fieldTypes =
+                new LogicalType[] {
+                    new DoubleType(), new TinyIntType(), new IntType(), new 
VarCharType()
+                };

Review Comment:
   ```suggestion
           final LogicalType[] fieldTypes = Arrays.stream(readerColumnNames)
                           .map(ROW_TYPE::getFieldIndex)
                           .map(ROW_TYPE::getTypeAt)
                           .toArray(LogicalType[]::new);
   ```
   Could we use that instead? This would make the test more robust against test 
data changes (i.e. the data types are dynamically derived from the static 
`ROW_TYPE` instead of hard-coded). WDYT?



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java:
##########
@@ -335,10 +339,12 @@ private class ParquetReader implements 
BulkFormat.Reader<T> {
         private ParquetReader(
                 ParquetFileReader reader,
                 MessageType requestedSchema,
+                HashSet<Integer> unknownFieldsIndices,

Review Comment:
   ```suggestion
                   Set<Integer> unknownFieldsIndices,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to