wuchong commented on a change in pull request #11944: URL: https://github.com/apache/flink/pull/11944#discussion_r417375958
########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java ########## @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.temporal.TemporalAccessor; +import java.time.temporal.TemporalQueries; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static java.lang.String.format; +import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; +import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deserialization schema from JSON to Flink Table/SQL internal data structure {@link RowData}. + * + * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads + * the specified fields. + * + * <p>Failures during deserialization are forwarded as wrapped IOExceptions. + */ +@Internal +public class JsonRowDataDeserializationSchema implements DeserializationSchema<RowData> { + private static final long serialVersionUID = 8576854315236033439L; + + /** Flag indicating whether to fail if a field is missing. */ + private final boolean failOnMissingField; + + /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + private final boolean ignoreParseErrors; + + /** TypeInformation of the produced {@link RowData}. **/ + private final TypeInformation<RowData> resultTypeInfo; + + /** + * Runtime converter that converts {@link JsonNode}s into + * objects of Flink SQL internal data structures. **/ + private final DeserializationRuntimeConverter runtimeConverter; + + /** Object mapper for parsing the JSON. */ + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Creates a builder for {@link JsonRowDataDeserializationSchema}. + */ + public static Builder builder() { + return new Builder(); + } + + private JsonRowDataDeserializationSchema( + RowType rowType, + TypeInformation<RowData> resultTypeInfo, + boolean failOnMissingField, + boolean ignoreParseErrors) { + if (ignoreParseErrors && failOnMissingField) { + throw new IllegalArgumentException( + "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled."); + } + this.resultTypeInfo = checkNotNull(resultTypeInfo); + this.failOnMissingField = failOnMissingField; + this.ignoreParseErrors = ignoreParseErrors; + this.runtimeConverter = createRowConverter(checkNotNull(rowType)); + } + + @Override + public RowData deserialize(byte[] message) throws IOException { + try { + final JsonNode root = objectMapper.readTree(message); + return (RowData) runtimeConverter.convert(root); + } catch (Throwable t) { + if (ignoreParseErrors) { + return null; + } + throw new IOException(format("Failed to deserialize JSON '%s'.", new String(message)), t); + } + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public TypeInformation<RowData> getProducedType() { + return resultTypeInfo; + } + + // ------------------------------------------------------------------------------------- + // Builder + // ------------------------------------------------------------------------------------- + + /** + * Builder for {@link JsonRowDataDeserializationSchema}. + */ + public static class Builder { + + private RowType rowType; + private TypeInformation<RowData> resultTypeInfo; + private boolean failOnMissingField = false; + private boolean ignoreParseErrors = false; + + /** + * Configures with the {@link RowType} schema information. + */ + public Builder schema(RowType rowType) { + this.rowType = rowType; + return this; + } + + /** + * Configures the {@link TypeInformation} of the produced {@link RowData}. + */ + public Builder resultTypeInfo(TypeInformation<RowData> resultTypeInfo) { + this.resultTypeInfo = resultTypeInfo; + return this; + } + + /** + * Configures to fail if a JSON field is missing. + * + * <p>By default, a missing field is ignored and the field is set to null. + */ + public Builder failOnMissingField() { + this.failOnMissingField = true; + return this; + } + + /** + * Configures to fail when parsing json failed. + * + * <p>By default, an exception will be thrown when parsing json fails. + */ + public Builder ignoreParseErrors() { + this.ignoreParseErrors = true; + return this; + } + + /** + * Creates the instance of {@link JsonRowDataDeserializationSchema}. + */ + public JsonRowDataDeserializationSchema build() { + return new JsonRowDataDeserializationSchema( + rowType, + resultTypeInfo, + failOnMissingField, + ignoreParseErrors); + } + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** + * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL + * internal data structures. + */ + @FunctionalInterface + private interface DeserializationRuntimeConverter extends Serializable { + Object convert(JsonNode jsonNode); + } + + /** + * Creates a runtime converter which is null safe. + */ + private DeserializationRuntimeConverter createConverter(LogicalType type) { + return wrapIntoNullableConverter(createNotNullConverter(type)); + } + + /** + * Creates a runtime converter which assuming input object is not null. + */ + private DeserializationRuntimeConverter createNotNullConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return jsonNode -> null; + case BOOLEAN: + return this::convertToBoolean; + case TINYINT: + return jsonNode -> Byte.parseByte(jsonNode.asText().trim()); + case SMALLINT: + return jsonNode -> Short.parseShort(jsonNode.asText().trim()); + case INTEGER: + case INTERVAL_YEAR_MONTH: + return this::convertToInt; + case BIGINT: + case INTERVAL_DAY_TIME: + return this::convertToLong; + case DATE: + return this::convertToDate; + case TIME_WITHOUT_TIME_ZONE: + return this::convertToTime; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return this::convertToTimestamp; + case FLOAT: + return this::convertToFloat; + case DOUBLE: + return this::convertToDouble; + case CHAR: + case VARCHAR: + return this::convertToString; + case BINARY: + case VARBINARY: + return this::convertToBytes; + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ARRAY: + return createArrayConverter((ArrayType) type); + case MAP: + case MULTISET: + return createMapConverter((MapType) type); + case ROW: + return createRowConverter((RowType) type); + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private boolean convertToBoolean(JsonNode jsonNode) { + if (jsonNode.isBoolean()) { + // avoid redundant toString and parseBoolean, for better performance + return jsonNode.asBoolean(); + } else { + return Boolean.parseBoolean(jsonNode.asText().trim()); + } + } + + private int convertToInt(JsonNode jsonNode) { + if (jsonNode.canConvertToInt()) { + // avoid redundant toString and parseInt, for better performance + return jsonNode.asInt(); + } else { + return Integer.parseInt(jsonNode.asText().trim()); + } + } + + private long convertToLong(JsonNode jsonNode) { + if (jsonNode.canConvertToLong()) { + // avoid redundant toString and parseLong, for better performance + return jsonNode.asLong(); + } else { + return Long.parseLong(jsonNode.asText().trim()); + } + } + + private double convertToDouble(JsonNode jsonNode) { + if (jsonNode.isDouble()) { + // avoid redundant toString and parseDouble, for better performance + return jsonNode.asDouble(); + } else { + return Double.parseDouble(jsonNode.asText().trim()); + } + } + + private float convertToFloat(JsonNode jsonNode) { + if (jsonNode.isDouble()) { + // avoid redundant toString and parseDouble, for better performance + return (float) jsonNode.asDouble(); + } else { + return Float.parseFloat(jsonNode.asText().trim()); + } + } + + private int convertToDate(JsonNode jsonNode) { + LocalDate date = ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate()); + return (int) date.toEpochDay(); + } + + private int convertToTime(JsonNode jsonNode) { + // according to RFC 3339 every full-time must have a timezone; + // until we have full timezone support, we only support UTC; + // users can parse their time as string as a workaround + TemporalAccessor parsedTime = RFC3339_TIME_FORMAT.parse(jsonNode.asText()); + + ZoneOffset zoneOffset = parsedTime.query(TemporalQueries.offset()); + LocalTime localTime = parsedTime.query(TemporalQueries.localTime()); + + if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0 || localTime.getNano() != 0) { + throw new JsonParseException( + "Invalid time format. Only a time in UTC timezone without milliseconds is supported yet."); + } + + // get number of milliseconds of the day + return localTime.toSecondOfDay() * 1000; + } + + private TimestampData convertToTimestamp(JsonNode jsonNode) { + // according to RFC 3339 every date-time must have a timezone; + // until we have full timezone support, we only support UTC; + // users can parse their time as string as a workaround + TemporalAccessor parsedTimestamp = RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText()); + + ZoneOffset zoneOffset = parsedTimestamp.query(TemporalQueries.offset()); + + if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0) { + throw new JsonParseException( + "Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " + + "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + } + + LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); + LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate()); + + return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime)); + } + + private StringData convertToString(JsonNode jsonNode) { + return StringData.fromString(jsonNode.asText()); + } + + private byte[] convertToBytes(JsonNode jsonNode) { + try { + return jsonNode.binaryValue(); + } catch (IOException e) { + throw new JsonParseException("Unable to deserialize byte array.", e); + } + } + + private DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return jsonNode -> { + BigDecimal bigDecimal; + if (jsonNode.isBigDecimal()) { + bigDecimal = jsonNode.decimalValue(); + } else { + bigDecimal = new BigDecimal(jsonNode.asText()); + } + return DecimalData.fromBigDecimal(bigDecimal, precision, scale); + }; + } + + private DeserializationRuntimeConverter createArrayConverter(ArrayType arrayType) { + DeserializationRuntimeConverter elementConverter = createConverter(arrayType.getElementType()); + final Class<?> elementClass = LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); + return jsonNode -> { + final ArrayNode node = (ArrayNode) jsonNode; + final Object[] array = (Object[]) Array.newInstance(elementClass, node.size()); + for (int i = 0; i < node.size(); i++) { + final JsonNode innerNode = node.get(i); + array[i] = elementConverter.convert(innerNode); + } + return new GenericArrayData(array); + }; + } + + private DeserializationRuntimeConverter createMapConverter(MapType mapType) { + LogicalType keyType = mapType.getKeyType(); + if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) { + throw new UnsupportedOperationException( + "JSON format doesn't support non-string as key type of map. " + + "The map type is: " + mapType.asSummaryString()); + } + final DeserializationRuntimeConverter keyConverter = createConverter(keyType); + final DeserializationRuntimeConverter valueConverter = createConverter(mapType.getValueType()); + + return jsonNode -> { + Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields(); + Map<Object, Object> result = new HashMap<>(); + while (fields.hasNext()) { + Map.Entry<String, JsonNode> entry = fields.next(); + Object key = keyConverter.convert(TextNode.valueOf(entry.getKey())); + Object value = valueConverter.convert(entry.getValue()); + result.put(key, value); Review comment: Java `HashMap` and SQL MAP allow null keys and null values. I will add a test for this in `testSerDeMultiRowsWithNullValues`. ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.Arrays; + +import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; +import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT; + +/** + * Serialization schema that serializes an object of Flink internal data structure into a JSON bytes. + * + * <p>Serializes the input Flink object into a JSON string and + * converts it into <code>byte[]</code>. + * + * <p>Result <code>byte[]</code> messages can be deserialized using {@link JsonRowDataDeserializationSchema}. + */ +@Internal +public class JsonRowDataSerializationSchema implements SerializationSchema<RowData> { + private static final long serialVersionUID = 1L; Review comment: I learned this from [Flink Code Style](https://flink.apache.org/contributing/code-style-and-quality-java.html) which says: > **The Serial Version UID for new classes should start at 1** and should generally be bumped on every incompatible change to the class according to the Java serialization compatibility definition (i.e: changing the type of a field, or moving the position of a class in the class hierarchy) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
