wuchong commented on a change in pull request #11944:
URL: https://github.com/apache/flink/pull/11944#discussion_r417375958



##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static 
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from JSON to Flink Table/SQL internal data structure 
{@link RowData}.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads
+ * the specified fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ */
+@Internal
+public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<RowData> {
+       private static final long serialVersionUID = 8576854315236033439L;
+
+       /** Flag indicating whether to fail if a field is missing. */
+       private final boolean failOnMissingField;
+
+       /** Flag indicating whether to ignore invalid fields/rows (default: 
throw an exception). */
+       private final boolean ignoreParseErrors;
+
+       /** TypeInformation of the produced {@link RowData}. **/
+       private final TypeInformation<RowData> resultTypeInfo;
+
+       /**
+        * Runtime converter that converts {@link JsonNode}s into
+        * objects of Flink SQL internal data structures. **/
+       private final DeserializationRuntimeConverter runtimeConverter;
+
+       /** Object mapper for parsing the JSON. */
+       private final ObjectMapper objectMapper = new ObjectMapper();
+
+       /**
+        * Creates a builder for {@link JsonRowDataDeserializationSchema}.
+        */
+       public static Builder builder() {
+               return new Builder();
+       }
+
+       private JsonRowDataDeserializationSchema(
+                       RowType rowType,
+                       TypeInformation<RowData> resultTypeInfo,
+                       boolean failOnMissingField,
+                       boolean ignoreParseErrors) {
+               if (ignoreParseErrors && failOnMissingField) {
+                       throw new IllegalArgumentException(
+                               "JSON format doesn't support failOnMissingField 
and ignoreParseErrors are both enabled.");
+               }
+               this.resultTypeInfo = checkNotNull(resultTypeInfo);
+               this.failOnMissingField = failOnMissingField;
+               this.ignoreParseErrors = ignoreParseErrors;
+               this.runtimeConverter = 
createRowConverter(checkNotNull(rowType));
+       }
+
+       @Override
+       public RowData deserialize(byte[] message) throws IOException {
+               try {
+                       final JsonNode root = objectMapper.readTree(message);
+                       return (RowData) runtimeConverter.convert(root);
+               } catch (Throwable t) {
+                       if (ignoreParseErrors) {
+                               return null;
+                       }
+                       throw new IOException(format("Failed to deserialize 
JSON '%s'.", new String(message)), t);
+               }
+       }
+
+       @Override
+       public boolean isEndOfStream(RowData nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<RowData> getProducedType() {
+               return resultTypeInfo;
+       }
+
+       // 
-------------------------------------------------------------------------------------
+       // Builder
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * Builder for {@link JsonRowDataDeserializationSchema}.
+        */
+       public static class Builder {
+
+               private RowType rowType;
+               private TypeInformation<RowData> resultTypeInfo;
+               private boolean failOnMissingField = false;
+               private boolean ignoreParseErrors = false;
+
+               /**
+                * Configures with the {@link RowType} schema information.
+                */
+               public Builder schema(RowType rowType) {
+                       this.rowType = rowType;
+                       return this;
+               }
+
+               /**
+                * Configures the {@link TypeInformation} of the produced 
{@link RowData}.
+                */
+               public Builder resultTypeInfo(TypeInformation<RowData> 
resultTypeInfo) {
+                       this.resultTypeInfo = resultTypeInfo;
+                       return this;
+               }
+
+               /**
+                * Configures to fail if a JSON field is missing.
+                *
+                * <p>By default, a missing field is ignored and the field is 
set to null.
+                */
+               public Builder failOnMissingField() {
+                       this.failOnMissingField = true;
+                       return this;
+               }
+
+               /**
+                * Configures to fail when parsing json failed.
+                *
+                * <p>By default, an exception will be thrown when parsing json 
fails.
+                */
+               public Builder ignoreParseErrors() {
+                       this.ignoreParseErrors = true;
+                       return this;
+               }
+
+               /**
+                * Creates the instance of {@link 
JsonRowDataDeserializationSchema}.
+                */
+               public JsonRowDataDeserializationSchema build() {
+                       return new JsonRowDataDeserializationSchema(
+                               rowType,
+                               resultTypeInfo,
+                               failOnMissingField,
+                               ignoreParseErrors);
+               }
+       }
+
+       // 
-------------------------------------------------------------------------------------
+       // Runtime Converters
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * Runtime converter that converts {@link JsonNode}s into objects of 
Flink Table & SQL
+        * internal data structures.
+        */
+       @FunctionalInterface
+       private interface DeserializationRuntimeConverter extends Serializable {
+               Object convert(JsonNode jsonNode);
+       }
+
+       /**
+        * Creates a runtime converter which is null safe.
+        */
+       private DeserializationRuntimeConverter createConverter(LogicalType 
type) {
+               return wrapIntoNullableConverter(createNotNullConverter(type));
+       }
+
+       /**
+        * Creates a runtime converter which assuming input object is not null.
+        */
+       private DeserializationRuntimeConverter 
createNotNullConverter(LogicalType type) {
+               switch (type.getTypeRoot()) {
+                       case NULL:
+                               return jsonNode -> null;
+                       case BOOLEAN:
+                               return this::convertToBoolean;
+                       case TINYINT:
+                               return jsonNode -> 
Byte.parseByte(jsonNode.asText().trim());
+                       case SMALLINT:
+                               return jsonNode -> 
Short.parseShort(jsonNode.asText().trim());
+                       case INTEGER:
+                       case INTERVAL_YEAR_MONTH:
+                               return this::convertToInt;
+                       case BIGINT:
+                       case INTERVAL_DAY_TIME:
+                               return this::convertToLong;
+                       case DATE:
+                               return this::convertToDate;
+                       case TIME_WITHOUT_TIME_ZONE:
+                               return this::convertToTime;
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               return this::convertToTimestamp;
+                       case FLOAT:
+                               return this::convertToFloat;
+                       case DOUBLE:
+                               return this::convertToDouble;
+                       case CHAR:
+                       case VARCHAR:
+                               return this::convertToString;
+                       case BINARY:
+                       case VARBINARY:
+                               return this::convertToBytes;
+                       case DECIMAL:
+                               return createDecimalConverter((DecimalType) 
type);
+                       case ARRAY:
+                               return createArrayConverter((ArrayType) type);
+                       case MAP:
+                       case MULTISET:
+                               return createMapConverter((MapType) type);
+                       case ROW:
+                               return createRowConverter((RowType) type);
+                       case RAW:
+                       default:
+                               throw new 
UnsupportedOperationException("Unsupported type: " + type);
+               }
+       }
+
+       private boolean convertToBoolean(JsonNode jsonNode) {
+               if (jsonNode.isBoolean()) {
+                       // avoid redundant toString and parseBoolean, for 
better performance
+                       return jsonNode.asBoolean();
+               } else {
+                       return Boolean.parseBoolean(jsonNode.asText().trim());
+               }
+       }
+
+       private int convertToInt(JsonNode jsonNode) {
+               if (jsonNode.canConvertToInt()) {
+                       // avoid redundant toString and parseInt, for better 
performance
+                       return jsonNode.asInt();
+               } else {
+                       return Integer.parseInt(jsonNode.asText().trim());
+               }
+       }
+
+       private long convertToLong(JsonNode jsonNode) {
+               if (jsonNode.canConvertToLong()) {
+                       // avoid redundant toString and parseLong, for better 
performance
+                       return jsonNode.asLong();
+               } else {
+                       return Long.parseLong(jsonNode.asText().trim());
+               }
+       }
+
+       private double convertToDouble(JsonNode jsonNode) {
+               if (jsonNode.isDouble()) {
+                       // avoid redundant toString and parseDouble, for better 
performance
+                       return jsonNode.asDouble();
+               } else {
+                       return Double.parseDouble(jsonNode.asText().trim());
+               }
+       }
+
+       private float convertToFloat(JsonNode jsonNode) {
+               if (jsonNode.isDouble()) {
+                       // avoid redundant toString and parseDouble, for better 
performance
+                       return (float) jsonNode.asDouble();
+               } else {
+                       return Float.parseFloat(jsonNode.asText().trim());
+               }
+       }
+
+       private int convertToDate(JsonNode jsonNode) {
+               LocalDate date = 
ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+               return (int) date.toEpochDay();
+       }
+
+       private int convertToTime(JsonNode jsonNode) {
+               // according to RFC 3339 every full-time must have a timezone;
+               // until we have full timezone support, we only support UTC;
+               // users can parse their time as string as a workaround
+               TemporalAccessor parsedTime = 
RFC3339_TIME_FORMAT.parse(jsonNode.asText());
+
+               ZoneOffset zoneOffset = 
parsedTime.query(TemporalQueries.offset());
+               LocalTime localTime = 
parsedTime.query(TemporalQueries.localTime());
+
+               if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0 || 
localTime.getNano() != 0) {
+                       throw new JsonParseException(
+                               "Invalid time format. Only a time in UTC 
timezone without milliseconds is supported yet.");
+               }
+
+               // get number of milliseconds of the day
+               return localTime.toSecondOfDay() * 1000;
+       }
+
+       private TimestampData convertToTimestamp(JsonNode jsonNode) {
+               // according to RFC 3339 every date-time must have a timezone;
+               // until we have full timezone support, we only support UTC;
+               // users can parse their time as string as a workaround
+               TemporalAccessor parsedTimestamp = 
RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+
+               ZoneOffset zoneOffset = 
parsedTimestamp.query(TemporalQueries.offset());
+
+               if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0) {
+                       throw new JsonParseException(
+                               "Invalid timestamp format. Only a timestamp in 
UTC timezone is supported yet. " +
+                                       "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+               }
+
+               LocalTime localTime = 
parsedTimestamp.query(TemporalQueries.localTime());
+               LocalDate localDate = 
parsedTimestamp.query(TemporalQueries.localDate());
+
+               return 
TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime));
+       }
+
+       private StringData convertToString(JsonNode jsonNode) {
+               return StringData.fromString(jsonNode.asText());
+       }
+
+       private byte[] convertToBytes(JsonNode jsonNode) {
+               try {
+                       return jsonNode.binaryValue();
+               } catch (IOException e) {
+                       throw new JsonParseException("Unable to deserialize 
byte array.", e);
+               }
+       }
+
+       private DeserializationRuntimeConverter 
createDecimalConverter(DecimalType decimalType) {
+               final int precision = decimalType.getPrecision();
+               final int scale = decimalType.getScale();
+               return jsonNode -> {
+                       BigDecimal bigDecimal;
+                       if (jsonNode.isBigDecimal()) {
+                               bigDecimal = jsonNode.decimalValue();
+                       } else {
+                               bigDecimal = new BigDecimal(jsonNode.asText());
+                       }
+                       return DecimalData.fromBigDecimal(bigDecimal, 
precision, scale);
+               };
+       }
+
+       private DeserializationRuntimeConverter createArrayConverter(ArrayType 
arrayType) {
+               DeserializationRuntimeConverter elementConverter = 
createConverter(arrayType.getElementType());
+               final Class<?> elementClass = 
LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+               return jsonNode -> {
+                       final ArrayNode node = (ArrayNode) jsonNode;
+                       final Object[] array = (Object[]) 
Array.newInstance(elementClass, node.size());
+                       for (int i = 0; i < node.size(); i++) {
+                               final JsonNode innerNode = node.get(i);
+                               array[i] = elementConverter.convert(innerNode);
+                       }
+                       return new GenericArrayData(array);
+               };
+       }
+
+       private DeserializationRuntimeConverter createMapConverter(MapType 
mapType) {
+               LogicalType keyType = mapType.getKeyType();
+               if (!LogicalTypeChecks.hasFamily(keyType, 
LogicalTypeFamily.CHARACTER_STRING)) {
+                       throw new UnsupportedOperationException(
+                               "JSON format doesn't support non-string as key 
type of map. " +
+                               "The map type is: " + 
mapType.asSummaryString());
+               }
+               final DeserializationRuntimeConverter keyConverter = 
createConverter(keyType);
+               final DeserializationRuntimeConverter valueConverter = 
createConverter(mapType.getValueType());
+
+               return jsonNode -> {
+                       Iterator<Map.Entry<String, JsonNode>> fields = 
jsonNode.fields();
+                       Map<Object, Object> result = new HashMap<>();
+                       while (fields.hasNext()) {
+                               Map.Entry<String, JsonNode> entry = 
fields.next();
+                               Object key = 
keyConverter.convert(TextNode.valueOf(entry.getKey()));
+                               Object value = 
valueConverter.convert(entry.getValue());
+                               result.put(key, value);

Review comment:
       Java `HashMap` and SQL MAP allow null keys and null values. I will add a 
test for this in `testSerDeMultiRowsWithNullValues`.

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static 
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+
+/**
+ * Serialization schema that serializes an object of Flink internal data 
structure into a JSON bytes.
+ *
+ * <p>Serializes the input Flink object into a JSON string and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link 
JsonRowDataDeserializationSchema}.
+ */
+@Internal
+public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowData> {
+       private static final long serialVersionUID = 1L;

Review comment:
       I learned this from [Flink Code 
Style](https://flink.apache.org/contributing/code-style-and-quality-java.html) 
which says:
   
   > **The Serial Version UID for new classes should start at 1** and should 
generally be bumped on every incompatible change to the class according to the 
Java serialization compatibility definition (i.e: changing the type of a field, 
or moving the position of a class in the class hierarchy)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to