afedulov commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r859663940


##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema(
     @Internal
     public static class Builder {
 
-        private final RowType rowType;
+        private final RowType rowResultType;
         private final TypeInformation<RowData> resultTypeInfo;
         private CsvSchema csvSchema;
         private boolean ignoreParseErrors;
 
+        /**
+         * Creates a CSV deserialization schema for the given {@link 
TypeInformation} with optional
+         * parameters.
+         */
+        public Builder(
+                RowType rowReadType,
+                RowType rowResultType,

Review Comment:
   Good point, the JavaDoc of the fields was originally not there so I missed 
it.
   The naming comes from the original signature:
   ```
   public Builder(RowType rowType, TypeInformation<RowData> resultTypeInfo) 
   ```
   The ideas is to underline using which CsvSchema the data is going to be read 
from the file and what is the expected output of the result. Projection seems a 
bit a too specific term used in the SQL optimization world. I can see this 
being used outside of this scope (simple filtering for whatever other reasons 
or expansion of the nested fields according to a "wider" row), depending on the 
converter used.
   I've added the the missing javadoc.
   



##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema(
     @Internal
     public static class Builder {
 
-        private final RowType rowType;
+        private final RowType rowResultType;
         private final TypeInformation<RowData> resultTypeInfo;
         private CsvSchema csvSchema;
         private boolean ignoreParseErrors;
 
+        /**
+         * Creates a CSV deserialization schema for the given {@link 
TypeInformation} with optional
+         * parameters.
+         */
+        public Builder(
+                RowType rowReadType,
+                RowType rowResultType,
+                TypeInformation<RowData> resultTypeInfo) {
+            Preconditions.checkNotNull(rowReadType, "RowType must not be 
null.");
+            Preconditions.checkNotNull(rowResultType, "RowType must not be 
null.");
+            Preconditions.checkNotNull(resultTypeInfo, "Result type 
information must not be null.");
+            this.rowResultType = rowResultType;
+            this.resultTypeInfo = resultTypeInfo;
+            this.csvSchema = CsvRowSchemaConverter.convert(rowReadType);

Review Comment:
   We rely on field names for the conversion:
   
https://github.com/apache/flink/blob/c31452bab1fb3b9a02ab616e4c5fe5e87346dfb4/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L78-L84



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to