XComp commented on code in PR #21149:
URL: https://github.com/apache/flink/pull/21149#discussion_r1102396477
##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java:
##########
@@ -123,7 +122,8 @@ public ParquetReader createReader(final Configuration
config, final SplitT split
FilterCompat.Filter filter = getFilter(hadoopConfig.conf());
List<BlockMetaData> blocks = filterRowGroups(filter,
footer.getBlocks(), fileSchema);
- MessageType requestedSchema = clipParquetSchema(fileSchema);
+ HashSet<Integer> unknownFieldsIndices = new HashSet<>();
Review Comment:
```suggestion
Set<Integer> unknownFieldsIndices = new HashSet<>();
```
nit: Generally, it's good practice to use interfaces instead of classes in
declarations.
##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java:
##########
@@ -309,6 +311,8 @@ private class ParquetReader implements BulkFormat.Reader<T>
{
private final MessageType requestedSchema;
+ private final HashSet<Integer> unknownFieldsIndices;
Review Comment:
```suggestion
private final Set<Integer> unknownFieldsIndices;
```
nit: again, interfaces should be prefered.
##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,6 +294,80 @@ void testProjectionReadUnknownField(int rowGroupSize)
throws IOException {
});
}
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testProjectionReadUnknownFieldAcrossFiles(int rowGroupSize) throws
IOException {
+ int number = 1000;
+ List<RowData> records = new ArrayList<>(number);
+ for (int i = 0; i < number; i++) {
+ Integer v = i;
+ records.add(newRow(v));
+ }
+ // create parquet files for both legacy and new
+ // f7 and f99 don’t exist in legacy parquet file
+ // f99 doesn't exist in new parquet file
+ // create legacy parquet file
+ RowType legacyParquetType =
+ new RowType(
+ IntStream.range(0, 6) // assume f7 not exist in legacy
parquet file.
Review Comment:
```suggestion
IntStream.range(0, 6) // assume f7 and following
columns do not exist in legacy parquet file.
```
Am I missing something here or should is the comment not reflecting the
statement? :thinking: Don't we only create a `RowType` for the first 6 columns?
:thinking:
##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,6 +294,80 @@ void testProjectionReadUnknownField(int rowGroupSize)
throws IOException {
});
}
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testProjectionReadUnknownFieldAcrossFiles(int rowGroupSize) throws
IOException {
+ int number = 1000;
Review Comment:
What's the intention behind having such a large testdata set? Is it because
we iterating over multiple group sizes that are provided through the
`parameter()` test data method source? If the `1000` was selected because of
that, it would be worth creating a static final field `MAX_ROW_GROUP_SIZE` and
use this here as well. That would make it clearer why the value `1000` was
selected.
##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,6 +294,80 @@ void testProjectionReadUnknownField(int rowGroupSize)
throws IOException {
});
}
+ @ParameterizedTest
+ @MethodSource("parameters")
Review Comment:
Out of curiosity: What do we gain by having so much test data in terms of
`rowGroupSize`? Isn't the test here the same for a groupSize of 10 and 20 for
instance which means we're not gaining anything from the test runs? Or am I
missing something here?
##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java:
##########
@@ -170,7 +171,8 @@ public boolean isSplittable() {
}
/** Clips `parquetSchema` according to `fieldNames`. */
- private MessageType clipParquetSchema(GroupType parquetSchema) {
+ private MessageType clipParquetSchema(
+ GroupType parquetSchema, Set<Integer> unknownFieldsIndices) {
Review Comment:
```suggestion
GroupType parquetSchema, Collection<Integer>
unknownFieldsIndices) {
```
nit: We're only using `add` in this method. Therefore, we could make the
method signature even more relaxed.
##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,6 +294,80 @@ void testProjectionReadUnknownField(int rowGroupSize)
throws IOException {
});
}
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testProjectionReadUnknownFieldAcrossFiles(int rowGroupSize) throws
IOException {
+ int number = 1000;
+ List<RowData> records = new ArrayList<>(number);
+ for (int i = 0; i < number; i++) {
+ Integer v = i;
+ records.add(newRow(v));
+ }
Review Comment:
```suggestion
final List<RowData> records =
IntStream.range(0,
1000).mapToObj(this::newRow).collect(Collectors.toList());
```
nit: your proposal works fine. I just got puzzled by the `Integer v = i`
line which is obsolete. It made me think how to shorten the overall block.
...`newRow` could be even made `static`.
##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,6 +294,80 @@ void testProjectionReadUnknownField(int rowGroupSize)
throws IOException {
});
}
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testProjectionReadUnknownFieldAcrossFiles(int rowGroupSize) throws
IOException {
+ int number = 1000;
+ List<RowData> records = new ArrayList<>(number);
+ for (int i = 0; i < number; i++) {
+ Integer v = i;
+ records.add(newRow(v));
+ }
+ // create parquet files for both legacy and new
+ // f7 and f99 don’t exist in legacy parquet file
+ // f99 doesn't exist in new parquet file
+ // create legacy parquet file
+ RowType legacyParquetType =
+ new RowType(
+ IntStream.range(0, 6) // assume f7 not exist in legacy
parquet file.
+ .mapToObj(i -> ROW_TYPE.getFields().get(i))
+ .collect(Collectors.toList()));
+ Path legacyParquetPath =
+ createTempParquetFile(
+ new File(folder, "legacy"), legacyParquetType,
records, rowGroupSize);
+ // create new parquet file
+ Path newParquetPath = createTempParquetFile(new File(folder, "new"),
records, rowGroupSize);
+
+ // test reader
+ String[] readerColumnNames = new String[] {"f7", "f2", "f4", "f99"};
+ LogicalType[] fieldTypes =
+ new LogicalType[] {
+ new DoubleType(), new TinyIntType(), new IntType(), new
VarCharType()
+ };
Review Comment:
```suggestion
final LogicalType[] fieldTypes = Arrays.stream(readerColumnNames)
.map(ROW_TYPE::getFieldIndex)
.map(ROW_TYPE::getTypeAt)
.toArray(LogicalType[]::new);
```
Could we use that instead? This would make the test more robust against test
data changes (i.e. the data types are dynamically derived from the static
`ROW_TYPE` instead of hard-coded). WDYT?
##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java:
##########
@@ -335,10 +339,12 @@ private class ParquetReader implements
BulkFormat.Reader<T> {
private ParquetReader(
ParquetFileReader reader,
MessageType requestedSchema,
+ HashSet<Integer> unknownFieldsIndices,
Review Comment:
```suggestion
Set<Integer> unknownFieldsIndices,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]