slinkydeveloper commented on a change in pull request #17598:
URL: https://github.com/apache/flink/pull/17598#discussion_r770301625
##########
File path:
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
##########
@@ -125,128 +70,88 @@ public String factoryIdentifier() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(FIELD_DELIMITER);
- options.add(DISABLE_QUOTE_CHARACTER);
- options.add(QUOTE_CHARACTER);
- options.add(ALLOW_COMMENTS);
- options.add(IGNORE_PARSE_ERRORS);
- options.add(ARRAY_ELEMENT_DELIMITER);
- options.add(ESCAPE_CHARACTER);
- options.add(NULL_LITERAL);
- return options;
- }
-
- // ------------------------------------------------------------------------
- // Validation
- // ------------------------------------------------------------------------
-
- static void validateFormatOptions(ReadableConfig tableOptions) {
- final boolean hasQuoteCharacter =
tableOptions.getOptional(QUOTE_CHARACTER).isPresent();
- final boolean isDisabledQuoteCharacter =
tableOptions.get(DISABLE_QUOTE_CHARACTER);
- if (isDisabledQuoteCharacter && hasQuoteCharacter) {
- throw new ValidationException(
- "Format cannot define a quote character and disabled quote
character at the same time.");
- }
- // Validate the option value must be a single char.
- validateCharacterVal(tableOptions, FIELD_DELIMITER, true);
- validateCharacterVal(tableOptions, ARRAY_ELEMENT_DELIMITER);
- validateCharacterVal(tableOptions, QUOTE_CHARACTER);
- validateCharacterVal(tableOptions, ESCAPE_CHARACTER);
+ return CsvCommons.optionalOptions();
}
- /** Validates the option {@code option} value must be a Character. */
- private static void validateCharacterVal(
- ReadableConfig tableOptions, ConfigOption<String> option) {
- validateCharacterVal(tableOptions, option, false);
- }
+ @Override
+ public BulkDecodingFormat<RowData> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions)
{
- /**
- * Validates the option {@code option} value must be a Character.
- *
- * @param tableOptions the table options
- * @param option the config option
- * @param unescape whether to unescape the option value
- */
- private static void validateCharacterVal(
- ReadableConfig tableOptions, ConfigOption<String> option, boolean
unescape) {
- if (tableOptions.getOptional(option).isPresent()) {
- final String value =
- unescape
- ?
StringEscapeUtils.unescapeJava(tableOptions.get(option))
- : tableOptions.get(option);
- if (value.length() != 1) {
- throw new ValidationException(
- String.format(
- "Option '%s.%s' must be a string with single
character, but was: %s",
- IDENTIFIER, option.key(),
tableOptions.get(option)));
- }
- }
+ return new CsvBulkDecodingFormat(formatOptions);
}
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
+ private static class CsvBulkDecodingFormat
+ implements BulkDecodingFormat<RowData>,
+ ProjectableDecodingFormat<BulkFormat<RowData,
FileSourceSplit>> {
- private static void configureDeserializationSchema(
- ReadableConfig formatOptions,
CsvRowDataDeserializationSchema.Builder schemaBuilder) {
- formatOptions
- .getOptional(FIELD_DELIMITER)
- .map(delimiter ->
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
- .ifPresent(schemaBuilder::setFieldDelimiter);
+ private final ReadableConfig formatOptions;
- if (formatOptions.get(DISABLE_QUOTE_CHARACTER)) {
- schemaBuilder.disableQuoteCharacter();
- } else {
- formatOptions
- .getOptional(QUOTE_CHARACTER)
- .map(quote -> quote.charAt(0))
- .ifPresent(schemaBuilder::setQuoteCharacter);
+ public CsvBulkDecodingFormat(ReadableConfig formatOptions) {
+ this.formatOptions = formatOptions;
}
-
formatOptions.getOptional(ALLOW_COMMENTS).ifPresent(schemaBuilder::setAllowComments);
-
- formatOptions
- .getOptional(IGNORE_PARSE_ERRORS)
- .ifPresent(schemaBuilder::setIgnoreParseErrors);
-
- formatOptions
- .getOptional(ARRAY_ELEMENT_DELIMITER)
- .ifPresent(schemaBuilder::setArrayElementDelimiter);
-
- formatOptions
- .getOptional(ESCAPE_CHARACTER)
- .map(escape -> escape.charAt(0))
- .ifPresent(schemaBuilder::setEscapeCharacter);
+ @Override
+ // TODO: is it possible to avoid the cast with a reasonable effort?
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
+ Context context, DataType physicalDataType, int[][]
projections) {
+
+ final DataType projectedDataDype =
Projection.of(projections).project(physicalDataType);
+ final RowType rowTypeProjected = (RowType)
projectedDataDype.getLogicalType();
+
+ final RowType rowType = (RowType)
physicalDataType.getLogicalType();
Review comment:
no, because `RowType` is a `LogicalType`. `physicalRowType` makes clear
that this one is the physical type, and not the projected one
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]