AHeise commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r862629212


##########
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java:
##########
@@ -227,6 +228,24 @@ public void testInvalidIgnoreParseError() {
         createTableSink(SCHEMA, options);
     }
 
+    @Test
+    public void testProjectionPushdown() throws IOException {

Review Comment:
   When testing projections, it's always good to test cases where you drop 
early columns. Only then you test if your converter is working as expected.
   
   ```java
       @Test
       public void testProjectionPushdown() throws IOException {
           final Map<String, String> options = getAllOptions();
   
           final Projection projection =
                   Projection.fromFieldNames(PHYSICAL_DATA_TYPE, 
Arrays.asList("b", "c"));
   
           final int[][] projectionMatrix = projection.toNestedIndexes();
           DeserializationSchema<RowData> actualDeser =
                   createDeserializationSchema(options, projectionMatrix);
   
           String data = "a1;2;false";
           RowData deserialized = actualDeser.deserialize(data.getBytes());
           GenericRowData expected = GenericRowData.of(2, false);
   
           assertEquals(deserialized, expected);
       }
   ```



##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -65,13 +65,14 @@ public final class CsvRowDataDeserializationSchema 
implements DeserializationSch
     private final boolean ignoreParseErrors;
 
     private CsvRowDataDeserializationSchema(
-            RowType rowType,
+            RowType rowResultType,
             TypeInformation<RowData> resultTypeInfo,
             CsvSchema csvSchema,
             boolean ignoreParseErrors) {
         this.resultTypeInfo = resultTypeInfo;
         this.runtimeConverter =
-                new 
CsvToRowDataConverters(ignoreParseErrors).createRowConverter(rowType, true);
+                new CsvToRowDataConverters(ignoreParseErrors)
+                        .createRowConverter(rowResultType, true);

Review Comment:
   This should be done in the builder. The ctor should be as simple as possible 
in a builder pattern.



##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema(
     @Internal
     public static class Builder {
 
-        private final RowType rowType;
+        private final RowType rowResultType;
         private final TypeInformation<RowData> resultTypeInfo;
         private CsvSchema csvSchema;
         private boolean ignoreParseErrors;
 
+        /**
+         * Creates a CSV deserialization schema for the given {@link 
TypeInformation} with optional
+         * parameters.
+         */
+        public Builder(
+                RowType rowReadType,
+                RowType rowResultType,
+                TypeInformation<RowData> resultTypeInfo) {
+            Preconditions.checkNotNull(rowReadType, "RowType must not be 
null.");
+            Preconditions.checkNotNull(rowResultType, "RowType must not be 
null.");
+            Preconditions.checkNotNull(resultTypeInfo, "Result type 
information must not be null.");
+            this.rowResultType = rowResultType;
+            this.resultTypeInfo = resultTypeInfo;
+            this.csvSchema = CsvRowSchemaConverter.convert(rowReadType);

Review Comment:
   We can change the `ColumnType` of the columns in `CsvSchema` to `STRING` for 
those columns that are projected out.
   Currently, we would parse the data and then discard it. If the ignored 
column was a `STRING`, it would be significantly faster for numeric fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to