JingsongLi commented on a change in pull request #11962: URL: https://github.com/apache/flink/pull/11962#discussion_r417999723
########## File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java ########## @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.Objects; + +/** + * Deserialization schema from CSV to Flink Table & SQL internal data structures. + * + * <p>Deserializes a <code>byte[]</code> message as a {@link JsonNode} and + * converts it to {@link RowData}. + * + * <p>Failure during deserialization are forwarded as wrapped {@link IOException}s. + */ +@Internal +public final class CsvRowDataDeserializationSchema implements DeserializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + /** Type information describing the result type. */ + private final TypeInformation<RowData> resultTypeInfo; + + /** Runtime instance that performs the actual work. */ + private final DeserializationRuntimeConverter runtimeConverter; + + /** Schema describing the input CSV data. */ + private final CsvSchema csvSchema; + + /** Object reader used to read rows. It is configured by {@link CsvSchema}. */ + private final ObjectReader objectReader; + + /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + private final boolean ignoreParseErrors; + + private CsvRowDataDeserializationSchema( + RowType rowType, + TypeInformation<RowData> resultTypeInfo, + CsvSchema csvSchema, + boolean ignoreParseErrors) { + this.resultTypeInfo = resultTypeInfo; + this.runtimeConverter = createRowConverter(rowType, true); + this.csvSchema = CsvRowSchemaConverter.convert(rowType); + this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + this.ignoreParseErrors = ignoreParseErrors; + } + + /** + * A builder for creating a {@link CsvRowDataDeserializationSchema}. + */ + @Internal + public static class Builder { + + private final RowType rowType; + private final TypeInformation<RowData> resultTypeInfo; + private CsvSchema csvSchema; + private boolean ignoreParseErrors; + + /** + * Creates a CSV deserialization schema for the given {@link TypeInformation} with + * optional parameters. + */ + public Builder(RowType rowType, TypeInformation<RowData> resultTypeInfo) { + Preconditions.checkNotNull(rowType, "RowType must not be null."); + Preconditions.checkNotNull(resultTypeInfo, "Result type information must not be null."); + this.rowType = rowType; + this.resultTypeInfo = resultTypeInfo; + this.csvSchema = CsvRowSchemaConverter.convert(rowType); + } + + public Builder setFieldDelimiter(char delimiter) { + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(delimiter).build(); + return this; + } + + public Builder setAllowComments(boolean allowComments) { + this.csvSchema = this.csvSchema.rebuild().setAllowComments(allowComments).build(); + return this; + } + + public Builder setArrayElementDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter, "Array element delimiter must not be null."); + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build(); + return this; + } + + public Builder setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + return this; + } + + public Builder setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + return this; + } + + public Builder setNullLiteral(String nullLiteral) { + Preconditions.checkNotNull(nullLiteral, "Null literal must not be null."); + this.csvSchema = this.csvSchema.rebuild().setNullValue(nullLiteral).build(); + return this; + } + + public Builder setIgnoreParseErrors(boolean ignoreParseErrors) { + this.ignoreParseErrors = ignoreParseErrors; + return this; + } + + public CsvRowDataDeserializationSchema build() { + return new CsvRowDataDeserializationSchema( + rowType, + resultTypeInfo, + csvSchema, + ignoreParseErrors); + } + } + + @Override + public RowData deserialize(byte[] message) throws IOException { + try { + final JsonNode root = objectReader.readValue(message); + return (RowData) runtimeConverter.convert(root); + } catch (Throwable t) { + if (ignoreParseErrors) { + return null; + } + throw new IOException("Failed to deserialize CSV row '" + new String(message) + "'.", t); + } + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public TypeInformation<RowData> getProducedType() { + return resultTypeInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || o.getClass() != this.getClass()) { + return false; + } + final CsvRowDataDeserializationSchema that = (CsvRowDataDeserializationSchema) o; + final CsvSchema otherSchema = that.csvSchema; + + return resultTypeInfo.equals(that.resultTypeInfo) && + ignoreParseErrors == that.ignoreParseErrors && + csvSchema.getColumnSeparator() == otherSchema.getColumnSeparator() && + csvSchema.allowsComments() == otherSchema.allowsComments() && + csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator()) && + csvSchema.getQuoteChar() == otherSchema.getQuoteChar() && + csvSchema.getEscapeChar() == otherSchema.getEscapeChar() && + Arrays.equals(csvSchema.getNullValue(), otherSchema.getNullValue()); + } + + @Override + public int hashCode() { + return Objects.hash( + resultTypeInfo, + ignoreParseErrors, + csvSchema.getColumnSeparator(), + csvSchema.allowsComments(), + csvSchema.getArrayElementSeparator(), + csvSchema.getQuoteChar(), + csvSchema.getEscapeChar(), + csvSchema.getNullValue()); + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** + * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL + * internal data structures. + */ + @FunctionalInterface + private interface DeserializationRuntimeConverter extends Serializable { + Object convert(JsonNode jsonNode); + } + + private DeserializationRuntimeConverter createRowConverter(RowType rowType, boolean isTopLevel) { + final DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream() + .map(RowType.RowField::getType) + .map(this::createNullableConverter) + .toArray(DeserializationRuntimeConverter[]::new); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + final int arity = fieldNames.length; + + return jsonNode -> { + int nodeSize = jsonNode.size(); + + validateArity(arity, nodeSize, ignoreParseErrors); + + GenericRowData row = new GenericRowData(arity); + for (int i = 0; i < arity; i++) { + JsonNode field; + // Jackson only supports mapping by name in the first level + if (isTopLevel) { + field = jsonNode.get(fieldNames[i]); + } else { + field = jsonNode.get(i); + } + if (field == null) { + row.setField(i, null); + } else { + row.setField(i, fieldConverters[i].convert(field)); + } + } + return row; + }; + } + + /** + * Creates a runtime converter which is null safe. + */ + private DeserializationRuntimeConverter createNullableConverter(LogicalType type) { + final DeserializationRuntimeConverter converter = createConverter(type); + return jsonNode -> { + if (jsonNode == null || jsonNode.isNull()) { + return null; + } + try { + return converter.convert(jsonNode); + } catch (Throwable t) { + if (!ignoreParseErrors) { + throw t; + } + return null; + } + }; + } + + /** + * Creates a runtime converter which assuming input object is not null. + */ + private DeserializationRuntimeConverter createConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return jsonNode -> null; + case BOOLEAN: + return this::convertToBoolean; + case TINYINT: + return jsonNode -> Byte.parseByte(jsonNode.asText().trim()); + case SMALLINT: + return jsonNode -> Short.parseShort(jsonNode.asText().trim()); + case INTEGER: + case INTERVAL_YEAR_MONTH: + return this::convertToInt; + case BIGINT: + case INTERVAL_DAY_TIME: + return this::convertToLong; + case DATE: + return this::convertToDate; + case TIME_WITHOUT_TIME_ZONE: + return this::convertToTime; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return this::convertToTimestamp; + case FLOAT: + return this::convertToFloat; + case DOUBLE: + return this::convertToDouble; + case CHAR: + case VARCHAR: + return this::convertToString; + case BINARY: + case VARBINARY: + return this::convertToBytes; + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ARRAY: + return createArrayConverter((ArrayType) type); + case ROW: + return createRowConverter((RowType) type, false); + case MAP: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private boolean convertToBoolean(JsonNode jsonNode) { + if (jsonNode.isBoolean()) { Review comment: Just use `jsonNode.isBoolean() ? jsonNode.asBoolean() : Boolean.parseBoolean(jsonNode.asText().trim())`? Same below. ########## File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java ########## @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.Objects; + +/** + * Deserialization schema from CSV to Flink Table & SQL internal data structures. + * + * <p>Deserializes a <code>byte[]</code> message as a {@link JsonNode} and + * converts it to {@link RowData}. + * + * <p>Failure during deserialization are forwarded as wrapped {@link IOException}s. + */ +@Internal +public final class CsvRowDataDeserializationSchema implements DeserializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + /** Type information describing the result type. */ + private final TypeInformation<RowData> resultTypeInfo; + + /** Runtime instance that performs the actual work. */ + private final DeserializationRuntimeConverter runtimeConverter; + + /** Schema describing the input CSV data. */ + private final CsvSchema csvSchema; + + /** Object reader used to read rows. It is configured by {@link CsvSchema}. */ + private final ObjectReader objectReader; + + /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + private final boolean ignoreParseErrors; + + private CsvRowDataDeserializationSchema( + RowType rowType, + TypeInformation<RowData> resultTypeInfo, + CsvSchema csvSchema, + boolean ignoreParseErrors) { + this.resultTypeInfo = resultTypeInfo; + this.runtimeConverter = createRowConverter(rowType, true); + this.csvSchema = CsvRowSchemaConverter.convert(rowType); + this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + this.ignoreParseErrors = ignoreParseErrors; + } + + /** + * A builder for creating a {@link CsvRowDataDeserializationSchema}. + */ + @Internal + public static class Builder { + + private final RowType rowType; + private final TypeInformation<RowData> resultTypeInfo; + private CsvSchema csvSchema; + private boolean ignoreParseErrors; + + /** + * Creates a CSV deserialization schema for the given {@link TypeInformation} with + * optional parameters. + */ + public Builder(RowType rowType, TypeInformation<RowData> resultTypeInfo) { + Preconditions.checkNotNull(rowType, "RowType must not be null."); + Preconditions.checkNotNull(resultTypeInfo, "Result type information must not be null."); + this.rowType = rowType; + this.resultTypeInfo = resultTypeInfo; + this.csvSchema = CsvRowSchemaConverter.convert(rowType); + } + + public Builder setFieldDelimiter(char delimiter) { + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(delimiter).build(); + return this; + } + + public Builder setAllowComments(boolean allowComments) { + this.csvSchema = this.csvSchema.rebuild().setAllowComments(allowComments).build(); + return this; + } + + public Builder setArrayElementDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter, "Array element delimiter must not be null."); + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build(); + return this; + } + + public Builder setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + return this; + } + + public Builder setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + return this; + } + + public Builder setNullLiteral(String nullLiteral) { + Preconditions.checkNotNull(nullLiteral, "Null literal must not be null."); + this.csvSchema = this.csvSchema.rebuild().setNullValue(nullLiteral).build(); + return this; + } + + public Builder setIgnoreParseErrors(boolean ignoreParseErrors) { + this.ignoreParseErrors = ignoreParseErrors; + return this; + } + + public CsvRowDataDeserializationSchema build() { + return new CsvRowDataDeserializationSchema( + rowType, + resultTypeInfo, + csvSchema, + ignoreParseErrors); + } + } + + @Override + public RowData deserialize(byte[] message) throws IOException { + try { + final JsonNode root = objectReader.readValue(message); + return (RowData) runtimeConverter.convert(root); + } catch (Throwable t) { + if (ignoreParseErrors) { + return null; + } + throw new IOException("Failed to deserialize CSV row '" + new String(message) + "'.", t); + } + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public TypeInformation<RowData> getProducedType() { + return resultTypeInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || o.getClass() != this.getClass()) { + return false; + } + final CsvRowDataDeserializationSchema that = (CsvRowDataDeserializationSchema) o; + final CsvSchema otherSchema = that.csvSchema; + + return resultTypeInfo.equals(that.resultTypeInfo) && + ignoreParseErrors == that.ignoreParseErrors && + csvSchema.getColumnSeparator() == otherSchema.getColumnSeparator() && + csvSchema.allowsComments() == otherSchema.allowsComments() && + csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator()) && + csvSchema.getQuoteChar() == otherSchema.getQuoteChar() && + csvSchema.getEscapeChar() == otherSchema.getEscapeChar() && + Arrays.equals(csvSchema.getNullValue(), otherSchema.getNullValue()); + } + + @Override + public int hashCode() { + return Objects.hash( + resultTypeInfo, + ignoreParseErrors, + csvSchema.getColumnSeparator(), + csvSchema.allowsComments(), + csvSchema.getArrayElementSeparator(), + csvSchema.getQuoteChar(), + csvSchema.getEscapeChar(), + csvSchema.getNullValue()); + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** + * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL + * internal data structures. + */ + @FunctionalInterface + private interface DeserializationRuntimeConverter extends Serializable { + Object convert(JsonNode jsonNode); + } + + private DeserializationRuntimeConverter createRowConverter(RowType rowType, boolean isTopLevel) { + final DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream() + .map(RowType.RowField::getType) + .map(this::createNullableConverter) + .toArray(DeserializationRuntimeConverter[]::new); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + final int arity = fieldNames.length; + + return jsonNode -> { + int nodeSize = jsonNode.size(); + + validateArity(arity, nodeSize, ignoreParseErrors); + + GenericRowData row = new GenericRowData(arity); + for (int i = 0; i < arity; i++) { + JsonNode field; + // Jackson only supports mapping by name in the first level + if (isTopLevel) { + field = jsonNode.get(fieldNames[i]); + } else { + field = jsonNode.get(i); + } + if (field == null) { + row.setField(i, null); + } else { + row.setField(i, fieldConverters[i].convert(field)); + } + } + return row; + }; + } + + /** + * Creates a runtime converter which is null safe. + */ + private DeserializationRuntimeConverter createNullableConverter(LogicalType type) { + final DeserializationRuntimeConverter converter = createConverter(type); + return jsonNode -> { + if (jsonNode == null || jsonNode.isNull()) { + return null; + } + try { + return converter.convert(jsonNode); + } catch (Throwable t) { + if (!ignoreParseErrors) { + throw t; + } + return null; + } + }; + } + + /** + * Creates a runtime converter which assuming input object is not null. + */ + private DeserializationRuntimeConverter createConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return jsonNode -> null; + case BOOLEAN: + return this::convertToBoolean; + case TINYINT: + return jsonNode -> Byte.parseByte(jsonNode.asText().trim()); + case SMALLINT: + return jsonNode -> Short.parseShort(jsonNode.asText().trim()); + case INTEGER: + case INTERVAL_YEAR_MONTH: + return this::convertToInt; + case BIGINT: + case INTERVAL_DAY_TIME: + return this::convertToLong; + case DATE: + return this::convertToDate; + case TIME_WITHOUT_TIME_ZONE: + return this::convertToTime; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return this::convertToTimestamp; + case FLOAT: + return this::convertToFloat; + case DOUBLE: + return this::convertToDouble; + case CHAR: + case VARCHAR: + return this::convertToString; + case BINARY: + case VARBINARY: + return this::convertToBytes; + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ARRAY: + return createArrayConverter((ArrayType) type); + case ROW: + return createRowConverter((RowType) type, false); + case MAP: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private boolean convertToBoolean(JsonNode jsonNode) { + if (jsonNode.isBoolean()) { + // avoid redundant toString and parseBoolean, for better performance Review comment: Why we can't use `node.asBoolean()` directly? ########## File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java ########## @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.util.Arrays; +import java.util.Objects; + +import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; +import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME; + +/** + * Serialization schema that serializes an object of Flink Table & SQL internal data structure + * into a CSV bytes. + * + * <p>Serializes the input row into a {@link JsonNode} and + * converts it into <code>byte[]</code>. + * + * <p>Result <code>byte[]</code> messages can be deserialized using {@link CsvRowDataDeserializationSchema}. + */ +@PublicEvolving +public final class CsvRowDataSerializationSchema implements SerializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + /** Logical row type describing the input CSV data. */ + private final RowType rowType; + + /** Runtime instance that performs the actual work. */ + private final SerializationRuntimeConverter runtimeConverter; + + /** CsvMapper used to write {@link JsonNode} into bytes. */ + private final CsvMapper csvMapper; + + /** Schema describing the input CSV data. */ + private final CsvSchema csvSchema; + + /** Object writer used to write rows. It is configured by {@link CsvSchema}. */ + private final ObjectWriter objectWriter; + + /** Reusable object node. */ + private transient ObjectNode root; + + private CsvRowDataSerializationSchema( + RowType rowType, + CsvSchema csvSchema) { + this.rowType = rowType; + this.runtimeConverter = createRowConverter(rowType, true); + this.csvMapper = new CsvMapper(); + this.csvSchema = csvSchema; + this.objectWriter = csvMapper.writer(csvSchema); + } + + /** + * A builder for creating a {@link CsvRowDataSerializationSchema}. + */ + @PublicEvolving + public static class Builder { + + private final RowType rowType; + private CsvSchema csvSchema; + + /** + * Creates a {@link CsvRowDataSerializationSchema} expecting the given {@link RowType}. + * + * @param rowType logical row type used to create schema. + */ + public Builder(RowType rowType) { + Preconditions.checkNotNull(rowType, "Row type must not be null."); + + this.rowType = rowType; + this.csvSchema = CsvRowSchemaConverter.convert(rowType); + } + + public Builder setFieldDelimiter(char c) { + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(c).build(); + return this; + } + + public Builder setLineDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter, "Delimiter must not be null."); + if (!delimiter.equals("\n") && !delimiter.equals("\r") && !delimiter.equals("\r\n") && !delimiter.equals("")) { + throw new IllegalArgumentException( + "Unsupported new line delimiter. Only \\n, \\r, \\r\\n, or empty string are supported."); + } + this.csvSchema = this.csvSchema.rebuild().setLineSeparator(delimiter).build(); + return this; + } + + public Builder setArrayElementDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter, "Delimiter must not be null."); + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build(); + return this; + } + + public Builder disableQuoteCharacter() { + this.csvSchema = this.csvSchema.rebuild().disableQuoteChar().build(); + return this; + } + + public Builder setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + return this; + } + + public Builder setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + return this; + } + + public Builder setNullLiteral(String s) { + this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build(); + return this; + } + + public CsvRowDataSerializationSchema build() { + return new CsvRowDataSerializationSchema( + rowType, + csvSchema); + } + } + + @Override + public byte[] serialize(RowData row) { + if (root == null) { + root = csvMapper.createObjectNode(); + } + try { + runtimeConverter.convert(csvMapper, root, row); + return objectWriter.writeValueAsBytes(root); + } catch (Throwable t) { + throw new RuntimeException("Could not serialize row '" + row + "'.", t); + } + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != this.getClass()) { + return false; + } + if (this == o) { + return true; + } + final CsvRowDataSerializationSchema that = (CsvRowDataSerializationSchema) o; + final CsvSchema otherSchema = that.csvSchema; + + return rowType.equals(that.rowType) && + csvSchema.getColumnSeparator() == otherSchema.getColumnSeparator() && + Arrays.equals(csvSchema.getLineSeparator(), otherSchema.getLineSeparator()) && + csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator()) && + csvSchema.getQuoteChar() == otherSchema.getQuoteChar() && + csvSchema.getEscapeChar() == otherSchema.getEscapeChar() && + Arrays.equals(csvSchema.getNullValue(), otherSchema.getNullValue()); + } + + @Override + public int hashCode() { + return Objects.hash( + rowType, + csvSchema.getColumnSeparator(), + csvSchema.getLineSeparator(), + csvSchema.getArrayElementSeparator(), + csvSchema.getQuoteChar(), + csvSchema.getEscapeChar(), + csvSchema.getNullValue()); + } + + // -------------------------------------------------------------------------------- + // Runtime Converters + // -------------------------------------------------------------------------------- + + /** + * Runtime converter that converts objects of Flink Table & SQL internal data structures + * to corresponding {@link JsonNode}s. + */ + private interface SerializationRuntimeConverter extends Serializable { Review comment: I think you should use something like `ParquetRowDataWriter.FieldWriter`. ########## File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java ########## @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.Objects; + +/** + * Deserialization schema from CSV to Flink Table & SQL internal data structures. + * + * <p>Deserializes a <code>byte[]</code> message as a {@link JsonNode} and + * converts it to {@link RowData}. + * + * <p>Failure during deserialization are forwarded as wrapped {@link IOException}s. + */ +@Internal +public final class CsvRowDataDeserializationSchema implements DeserializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + /** Type information describing the result type. */ + private final TypeInformation<RowData> resultTypeInfo; + + /** Runtime instance that performs the actual work. */ + private final DeserializationRuntimeConverter runtimeConverter; + + /** Schema describing the input CSV data. */ + private final CsvSchema csvSchema; + + /** Object reader used to read rows. It is configured by {@link CsvSchema}. */ + private final ObjectReader objectReader; + + /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + private final boolean ignoreParseErrors; + + private CsvRowDataDeserializationSchema( + RowType rowType, + TypeInformation<RowData> resultTypeInfo, + CsvSchema csvSchema, + boolean ignoreParseErrors) { + this.resultTypeInfo = resultTypeInfo; + this.runtimeConverter = createRowConverter(rowType, true); + this.csvSchema = CsvRowSchemaConverter.convert(rowType); + this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + this.ignoreParseErrors = ignoreParseErrors; + } + + /** + * A builder for creating a {@link CsvRowDataDeserializationSchema}. + */ + @Internal + public static class Builder { + + private final RowType rowType; + private final TypeInformation<RowData> resultTypeInfo; + private CsvSchema csvSchema; + private boolean ignoreParseErrors; + + /** + * Creates a CSV deserialization schema for the given {@link TypeInformation} with + * optional parameters. + */ + public Builder(RowType rowType, TypeInformation<RowData> resultTypeInfo) { + Preconditions.checkNotNull(rowType, "RowType must not be null."); + Preconditions.checkNotNull(resultTypeInfo, "Result type information must not be null."); + this.rowType = rowType; + this.resultTypeInfo = resultTypeInfo; + this.csvSchema = CsvRowSchemaConverter.convert(rowType); + } + + public Builder setFieldDelimiter(char delimiter) { + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(delimiter).build(); + return this; + } + + public Builder setAllowComments(boolean allowComments) { + this.csvSchema = this.csvSchema.rebuild().setAllowComments(allowComments).build(); + return this; + } + + public Builder setArrayElementDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter, "Array element delimiter must not be null."); + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build(); + return this; + } + + public Builder setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + return this; + } + + public Builder setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + return this; + } + + public Builder setNullLiteral(String nullLiteral) { + Preconditions.checkNotNull(nullLiteral, "Null literal must not be null."); + this.csvSchema = this.csvSchema.rebuild().setNullValue(nullLiteral).build(); + return this; + } + + public Builder setIgnoreParseErrors(boolean ignoreParseErrors) { + this.ignoreParseErrors = ignoreParseErrors; + return this; + } + + public CsvRowDataDeserializationSchema build() { + return new CsvRowDataDeserializationSchema( + rowType, + resultTypeInfo, + csvSchema, + ignoreParseErrors); + } + } + + @Override + public RowData deserialize(byte[] message) throws IOException { + try { + final JsonNode root = objectReader.readValue(message); + return (RowData) runtimeConverter.convert(root); + } catch (Throwable t) { + if (ignoreParseErrors) { + return null; + } + throw new IOException("Failed to deserialize CSV row '" + new String(message) + "'.", t); + } + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public TypeInformation<RowData> getProducedType() { + return resultTypeInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || o.getClass() != this.getClass()) { + return false; + } + final CsvRowDataDeserializationSchema that = (CsvRowDataDeserializationSchema) o; + final CsvSchema otherSchema = that.csvSchema; + + return resultTypeInfo.equals(that.resultTypeInfo) && + ignoreParseErrors == that.ignoreParseErrors && + csvSchema.getColumnSeparator() == otherSchema.getColumnSeparator() && + csvSchema.allowsComments() == otherSchema.allowsComments() && + csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator()) && + csvSchema.getQuoteChar() == otherSchema.getQuoteChar() && + csvSchema.getEscapeChar() == otherSchema.getEscapeChar() && + Arrays.equals(csvSchema.getNullValue(), otherSchema.getNullValue()); + } + + @Override + public int hashCode() { + return Objects.hash( + resultTypeInfo, + ignoreParseErrors, + csvSchema.getColumnSeparator(), + csvSchema.allowsComments(), + csvSchema.getArrayElementSeparator(), + csvSchema.getQuoteChar(), + csvSchema.getEscapeChar(), + csvSchema.getNullValue()); + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** + * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL + * internal data structures. + */ + @FunctionalInterface + private interface DeserializationRuntimeConverter extends Serializable { + Object convert(JsonNode jsonNode); + } + + private DeserializationRuntimeConverter createRowConverter(RowType rowType, boolean isTopLevel) { + final DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream() + .map(RowType.RowField::getType) + .map(this::createNullableConverter) + .toArray(DeserializationRuntimeConverter[]::new); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + final int arity = fieldNames.length; + + return jsonNode -> { + int nodeSize = jsonNode.size(); + + validateArity(arity, nodeSize, ignoreParseErrors); + + GenericRowData row = new GenericRowData(arity); + for (int i = 0; i < arity; i++) { + JsonNode field; + // Jackson only supports mapping by name in the first level + if (isTopLevel) { + field = jsonNode.get(fieldNames[i]); + } else { + field = jsonNode.get(i); + } + if (field == null) { + row.setField(i, null); + } else { + row.setField(i, fieldConverters[i].convert(field)); + } + } + return row; + }; + } + + /** + * Creates a runtime converter which is null safe. + */ + private DeserializationRuntimeConverter createNullableConverter(LogicalType type) { + final DeserializationRuntimeConverter converter = createConverter(type); + return jsonNode -> { + if (jsonNode == null || jsonNode.isNull()) { + return null; + } + try { + return converter.convert(jsonNode); + } catch (Throwable t) { + if (!ignoreParseErrors) { + throw t; + } + return null; + } + }; + } + + /** + * Creates a runtime converter which assuming input object is not null. + */ + private DeserializationRuntimeConverter createConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return jsonNode -> null; + case BOOLEAN: + return this::convertToBoolean; + case TINYINT: + return jsonNode -> Byte.parseByte(jsonNode.asText().trim()); + case SMALLINT: + return jsonNode -> Short.parseShort(jsonNode.asText().trim()); + case INTEGER: + case INTERVAL_YEAR_MONTH: + return this::convertToInt; + case BIGINT: + case INTERVAL_DAY_TIME: + return this::convertToLong; + case DATE: + return this::convertToDate; + case TIME_WITHOUT_TIME_ZONE: + return this::convertToTime; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return this::convertToTimestamp; + case FLOAT: + return this::convertToFloat; + case DOUBLE: + return this::convertToDouble; + case CHAR: + case VARCHAR: + return this::convertToString; + case BINARY: + case VARBINARY: + return this::convertToBytes; + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ARRAY: + return createArrayConverter((ArrayType) type); + case ROW: + return createRowConverter((RowType) type, false); + case MAP: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private boolean convertToBoolean(JsonNode jsonNode) { + if (jsonNode.isBoolean()) { + // avoid redundant toString and parseBoolean, for better performance + return jsonNode.asBoolean(); + } else { + return Boolean.parseBoolean(jsonNode.asText().trim()); + } + } + + private int convertToInt(JsonNode jsonNode) { + if (jsonNode.canConvertToInt()) { + // avoid redundant toString and parseInt, for better performance + return jsonNode.asInt(); + } else { + return Integer.parseInt(jsonNode.asText().trim()); + } + } + + private long convertToLong(JsonNode jsonNode) { + if (jsonNode.canConvertToLong()) { + // avoid redundant toString and parseLong, for better performance + return jsonNode.asLong(); + } else { + return Long.parseLong(jsonNode.asText().trim()); + } + } + + private double convertToDouble(JsonNode jsonNode) { + if (jsonNode.isDouble()) { + // avoid redundant toString and parseDouble, for better performance + return jsonNode.asDouble(); + } else { + return Double.parseDouble(jsonNode.asText().trim()); + } + } + + private float convertToFloat(JsonNode jsonNode) { + if (jsonNode.isDouble()) { + // avoid redundant toString and parseDouble, for better performance + return (float) jsonNode.asDouble(); + } else { + return Float.parseFloat(jsonNode.asText().trim()); + } + } + + private int convertToDate(JsonNode jsonNode) { + // csv currently is using Date.valueOf() to parse date string + return (int) Date.valueOf(jsonNode.asText()).toLocalDate().toEpochDay(); + } + + private int convertToTime(JsonNode jsonNode) { + // csv currently is using Time.valueOf() to parse time string + LocalTime localTime = Time.valueOf(jsonNode.asText()).toLocalTime(); + // get number of milliseconds of the day + return localTime.toSecondOfDay() * 1000; Review comment: Loose mills precision here, use `toNanoOfDay`? BTW, add tests for this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
