afedulov commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r859663940
##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema(
@Internal
public static class Builder {
- private final RowType rowType;
+ private final RowType rowResultType;
private final TypeInformation<RowData> resultTypeInfo;
private CsvSchema csvSchema;
private boolean ignoreParseErrors;
+ /**
+ * Creates a CSV deserialization schema for the given {@link
TypeInformation} with optional
+ * parameters.
+ */
+ public Builder(
+ RowType rowReadType,
+ RowType rowResultType,
Review Comment:
Good point, the JavaDoc of the fields was originally not there so I missed
it.
The naming comes from the original signature:
```
public Builder(RowType rowType, TypeInformation<RowData> resultTypeInfo)
```
The ideas is to underline using which CsvSchema the data is going to be read
from the file and what is the expected output of the result. Projection seems a
bit a too specific term used in the SQL optimization world. I can see this
being used outside of this scope (simple filtering for whatever other reasons
or expansion of the nested fields according to a "wider" row), depending on the
converter used.
I've added the the missing javadoc.
##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema(
@Internal
public static class Builder {
- private final RowType rowType;
+ private final RowType rowResultType;
private final TypeInformation<RowData> resultTypeInfo;
private CsvSchema csvSchema;
private boolean ignoreParseErrors;
+ /**
+ * Creates a CSV deserialization schema for the given {@link
TypeInformation} with optional
+ * parameters.
+ */
+ public Builder(
+ RowType rowReadType,
+ RowType rowResultType,
+ TypeInformation<RowData> resultTypeInfo) {
+ Preconditions.checkNotNull(rowReadType, "RowType must not be
null.");
+ Preconditions.checkNotNull(rowResultType, "RowType must not be
null.");
+ Preconditions.checkNotNull(resultTypeInfo, "Result type
information must not be null.");
+ this.rowResultType = rowResultType;
+ this.resultTypeInfo = resultTypeInfo;
+ this.csvSchema = CsvRowSchemaConverter.convert(rowReadType);
Review Comment:
We rely on field names for the conversion:
https://github.com/apache/flink/blob/c31452bab1fb3b9a02ab616e4c5fe5e87346dfb4/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L78-L84
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]