AHeise commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r869023333


##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -106,7 +110,79 @@ public Builder(
             Preconditions.checkNotNull(resultTypeInfo, "Result type 
information must not be null.");
             this.rowResultType = rowResultType;
             this.resultTypeInfo = resultTypeInfo;
-            this.csvSchema = CsvRowSchemaConverter.convert(rowReadType);
+            final RowType optimizedReadType = optimizeCsvRead(rowReadType, 
rowResultType);
+            this.csvSchema = CsvRowSchemaConverter.convert(optimizedReadType);
+        }
+
+        /**
+         * Optimizes CSV reads. If the field is skipped from the results, it 
does not make sense to
+         * parse it as a specific data type. For such fields, the VARCHAR data 
type is enforced.
+         * This causes the corresponding CSV column to be interpreted as a 
STRING and spares
+         * unnecessary parsing processing.
+         */
+        private RowType optimizeCsvRead(RowType rowReadType, RowType 
rowResultType) {
+            final List<String> rowReadTypeFields = rowReadType.getFieldNames();
+            final List<String> rowResultTypeFields = 
rowResultType.getFieldNames();
+
+            if (rowResultTypeFields.size() < rowReadTypeFields.size()) {
+                // Some fields are filtered from the results.
+                final List<RowType.RowField> fields = rowReadType.getFields();
+
+                final Collection<String> filteredFieldsNames =
+                        getFilteredFieldsNames(rowReadTypeFields, 
rowResultTypeFields);
+
+                final Set<Integer> filteredFieldsIndices =
+                        filteredFieldsNames.stream()
+                                .map(rowReadType::getFieldIndex)
+                                .collect(Collectors.toSet());
+
+                // This type corresponds to the CsvSchema.ColumnType.STRING in 
the converter
+                final VarCharType varCharType = new VarCharType();
+                final List<RowType.RowField> optimizedRowFields =
+                        fields.stream()
+                                .map(
+                                        field -> {
+                                            int fieldIndex =
+                                                    
rowReadType.getFieldIndex(field.getName());
+                                            if 
(filteredFieldsIndices.contains(fieldIndex)) {
+                                                return new RowType.RowField(
+                                                        field.getName(),
+                                                        varCharType,
+                                                        
field.getDescription().orElse(null));
+                                            } else {
+                                                return field;
+                                            }
+                                        })
+                                .collect(Collectors.toList());
+                return new RowType(rowReadType.isNullable(), 
optimizedRowFields);
+            } else {
+                return rowReadType;
+            }
+        }
+
+        /**
+         * Detects fields that need to be filtered from the produced rows (for 
instance, due to a
+         * projection pushdown).
+         */
+        private static Collection<String> getFilteredFieldsNames(
+                List<String> rowReadTypeFields, List<String> 
rowResultTypeFields) {
+
+            final Collection<String> diff = new ArrayList<>();
+
+            int i = 0, j = 0;
+            while (i < rowReadTypeFields.size()) {
+                final String rowReadFieldName = rowReadTypeFields.get(i);
+                final String rowResultFieldName = rowResultTypeFields.get(j);
+                if (rowReadFieldName.equals(rowResultFieldName)) {
+                    i++;
+                    j++;
+                } else {
+                    // This field is filtered from the results
+                    diff.add(rowReadTypeFields.get(i));
+                    i++;
+                }
+            }
+            return diff;
         }

Review Comment:
   Instead of reverse engineering the projection, why not pass Projection to 
the ctor?



##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -106,7 +110,79 @@ public Builder(
             Preconditions.checkNotNull(resultTypeInfo, "Result type 
information must not be null.");
             this.rowResultType = rowResultType;
             this.resultTypeInfo = resultTypeInfo;
-            this.csvSchema = CsvRowSchemaConverter.convert(rowReadType);
+            final RowType optimizedReadType = optimizeCsvRead(rowReadType, 
rowResultType);
+            this.csvSchema = CsvRowSchemaConverter.convert(optimizedReadType);

Review Comment:
   Another and imho easier option would be to just modify the csvSchema.
   
   ```
   builder = csvSchema.rebuild()
   for each field not in projection:
     set builder column type to string
   csvSchema = builder.build()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to