AHeise commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r862629212
##########
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java:
##########
@@ -227,6 +228,24 @@ public void testInvalidIgnoreParseError() {
createTableSink(SCHEMA, options);
}
+ @Test
+ public void testProjectionPushdown() throws IOException {
Review Comment:
When testing projections, it's always good to test cases where you drop
early columns. Only then you test if your converter is working as expected.
```java
@Test
public void testProjectionPushdown() throws IOException {
final Map<String, String> options = getAllOptions();
final Projection projection =
Projection.fromFieldNames(PHYSICAL_DATA_TYPE,
Arrays.asList("b", "c"));
final int[][] projectionMatrix = projection.toNestedIndexes();
DeserializationSchema<RowData> actualDeser =
createDeserializationSchema(options, projectionMatrix);
String data = "a1;2;false";
RowData deserialized = actualDeser.deserialize(data.getBytes());
GenericRowData expected = GenericRowData.of(2, false);
assertEquals(deserialized, expected);
}
```
##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -65,13 +65,14 @@ public final class CsvRowDataDeserializationSchema
implements DeserializationSch
private final boolean ignoreParseErrors;
private CsvRowDataDeserializationSchema(
- RowType rowType,
+ RowType rowResultType,
TypeInformation<RowData> resultTypeInfo,
CsvSchema csvSchema,
boolean ignoreParseErrors) {
this.resultTypeInfo = resultTypeInfo;
this.runtimeConverter =
- new
CsvToRowDataConverters(ignoreParseErrors).createRowConverter(rowType, true);
+ new CsvToRowDataConverters(ignoreParseErrors)
+ .createRowConverter(rowResultType, true);
Review Comment:
This should be done in the builder. The ctor should be as simple as possible
in a builder pattern.
##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema(
@Internal
public static class Builder {
- private final RowType rowType;
+ private final RowType rowResultType;
private final TypeInformation<RowData> resultTypeInfo;
private CsvSchema csvSchema;
private boolean ignoreParseErrors;
+ /**
+ * Creates a CSV deserialization schema for the given {@link
TypeInformation} with optional
+ * parameters.
+ */
+ public Builder(
+ RowType rowReadType,
+ RowType rowResultType,
+ TypeInformation<RowData> resultTypeInfo) {
+ Preconditions.checkNotNull(rowReadType, "RowType must not be
null.");
+ Preconditions.checkNotNull(rowResultType, "RowType must not be
null.");
+ Preconditions.checkNotNull(resultTypeInfo, "Result type
information must not be null.");
+ this.rowResultType = rowResultType;
+ this.resultTypeInfo = resultTypeInfo;
+ this.csvSchema = CsvRowSchemaConverter.convert(rowReadType);
Review Comment:
We can change the `ColumnType` of the columns in `CsvSchema` to `STRING` for
those columns that are projected out.
Currently, we would parse the data and then discard it. If the ignored
column was a `STRING`, it would be significantly faster for numeric fields.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]