This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c6cd5b68058bfab658482ce4ca5774153fb79828 Author: fengli <[email protected]> AuthorDate: Wed Jul 19 12:24:51 2023 +0800 [FLINK-32610][json] Introduce JsonParserToRowDataConverter based JsonParserToRowDataConverter deserialization schema which supports projection push-down of nested fields --- ...java => AbstractJsonDeserializationSchema.java} | 58 +--- .../flink/formats/json/JsonFormatFactory.java | 50 ++- .../flink/formats/json/JsonFormatOptions.java | 7 + .../JsonParserRowDataDeserializationSchema.java | 100 ++++++ .../json/JsonRowDataDeserializationSchema.java | 79 +---- .../json/JsonRowDataSerializationSchema.java | 2 +- .../flink/formats/json/JsonFormatFactoryTest.java | 4 +- .../json/JsonParserRowDataDeSerSchemaTest.java | 346 +++++++++++++++++++++ .../formats/json/JsonRowDataSerDeSchemaTest.java | 183 ++++++----- 9 files changed, 611 insertions(+), 218 deletions(-) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java similarity index 70% copy from flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java copy to flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java index 9a57bac203b..aa62e0d5f87 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java @@ -18,7 +18,6 @@ package org.apache.flink.formats.json; -import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.common.TimestampFormat; @@ -30,52 +29,40 @@ import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import javax.annotation.Nullable; - -import java.io.IOException; import java.util.Objects; -import static java.lang.String.format; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Deserialization schema from JSON to Flink Table/SQL internal data structure {@link RowData}. - * - * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields. + * Deserialization schema from JSON to Flink Table/SQL internal data structure {@link RowData}. This + * is the abstract base class which has different implementation. * * <p>Failures during deserialization are forwarded as wrapped IOExceptions. */ -@Internal -public class JsonRowDataDeserializationSchema implements DeserializationSchema<RowData> { +public abstract class AbstractJsonDeserializationSchema implements DeserializationSchema<RowData> { + private static final long serialVersionUID = 1L; /** Flag indicating whether to fail if a field is missing. */ - private final boolean failOnMissingField; + protected final boolean failOnMissingField; /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ - private final boolean ignoreParseErrors; + protected final boolean ignoreParseErrors; /** TypeInformation of the produced {@link RowData}. */ private final TypeInformation<RowData> resultTypeInfo; - /** - * Runtime converter that converts {@link JsonNode}s into objects of Flink SQL internal data - * structures. - */ - private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter; - /** Object mapper for parsing the JSON. */ - private transient ObjectMapper objectMapper; + protected transient ObjectMapper objectMapper; /** Timestamp format specification which is used to parse timestamp. */ private final TimestampFormat timestampFormat; private final boolean hasDecimalType; - public JsonRowDataDeserializationSchema( + public AbstractJsonDeserializationSchema( RowType rowType, TypeInformation<RowData> resultTypeInfo, boolean failOnMissingField, @@ -88,9 +75,6 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R this.resultTypeInfo = checkNotNull(resultTypeInfo); this.failOnMissingField = failOnMissingField; this.ignoreParseErrors = ignoreParseErrors; - this.runtimeConverter = - new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat) - .createConverter(checkNotNull(rowType)); this.timestampFormat = timestampFormat; this.hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType); } @@ -107,30 +91,6 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R } } - @Override - public RowData deserialize(@Nullable byte[] message) throws IOException { - if (message == null) { - return null; - } - try { - return convertToRowData(deserializeToJsonNode(message)); - } catch (Throwable t) { - if (ignoreParseErrors) { - return null; - } - throw new IOException( - format("Failed to deserialize JSON '%s'.", new String(message)), t); - } - } - - public JsonNode deserializeToJsonNode(byte[] message) throws IOException { - return objectMapper.readTree(message); - } - - public RowData convertToRowData(JsonNode message) { - return (RowData) runtimeConverter.convert(message); - } - @Override public boolean isEndOfStream(RowData nextElement) { return false; @@ -149,7 +109,7 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R if (o == null || getClass() != o.getClass()) { return false; } - JsonRowDataDeserializationSchema that = (JsonRowDataDeserializationSchema) o; + AbstractJsonDeserializationSchema that = (AbstractJsonDeserializationSchema) o; return failOnMissingField == that.failOnMissingField && ignoreParseErrors == that.ignoreParseErrors && resultTypeInfo.equals(that.resultTypeInfo) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java index 74d8c5310a0..562b99e6bc8 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import static org.apache.flink.formats.json.JsonFormatOptions.DECODE_JSON_PARSER_ENABLED; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; import static org.apache.flink.formats.json.JsonFormatOptions.FAIL_ON_MISSING_FIELD; import static org.apache.flink.formats.json.JsonFormatOptions.IGNORE_PARSE_ERRORS; @@ -68,6 +69,7 @@ public class JsonFormatFactory implements DeserializationFormatFactory, Serializ final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD); final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS); + final boolean jsonParserEnabled = formatOptions.get(DECODE_JSON_PARSER_ENABLED); TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions); return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() { @@ -81,21 +83,57 @@ public class JsonFormatFactory implements DeserializationFormatFactory, Serializ final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation<RowData> rowDataTypeInfo = context.createTypeInformation(producedDataType); - return new JsonRowDataDeserializationSchema( - rowType, - rowDataTypeInfo, - failOnMissingField, - ignoreParseErrors, - timestampOption); + if (jsonParserEnabled) { + return new JsonParserRowDataDeserializationSchema( + rowType, + rowDataTypeInfo, + failOnMissingField, + ignoreParseErrors, + timestampOption, + toProjectedNames( + (RowType) physicalDataType.getLogicalType(), projections)); + } else { + return new JsonRowDataDeserializationSchema( + rowType, + rowDataTypeInfo, + failOnMissingField, + ignoreParseErrors, + timestampOption); + } } @Override public ChangelogMode getChangelogMode() { return ChangelogMode.insertOnly(); } + + @Override + public boolean supportsNestedProjection() { + return jsonParserEnabled; + } }; } + private String[][] toProjectedNames(RowType type, int[][] projectedFields) { + String[][] projectedNames = new String[projectedFields.length][]; + for (int i = 0; i < projectedNames.length; i++) { + int[] fieldIndices = projectedFields[i]; + String[] fieldNames = new String[fieldIndices.length]; + projectedNames[i] = fieldNames; + + // convert fieldIndices to fieldNames + RowType currentType = type; + for (int j = 0; j < fieldIndices.length; j++) { + int index = fieldIndices[j]; + fieldNames[j] = currentType.getFieldNames().get(index); + if (j != fieldIndices.length - 1) { + currentType = (RowType) currentType.getTypeAt(index); + } + } + } + return projectedNames; + } + @Override public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java index 74d567ab9bb..5c9e61068ac 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java @@ -73,6 +73,13 @@ public class JsonFormatOptions { .withDescription( "Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default."); + public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED = + ConfigOptions.key("decode.json-parser.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Optional flag to specify whether to use the Jackson JsonParser to decode json with better performance, true by default."); + // -------------------------------------------------------------------------------------------- // Enums // -------------------------------------------------------------------------------------------- diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java new file mode 100644 index 00000000000..22df48f2ac2 --- /dev/null +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static java.lang.String.format; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Tool class used to convert fields from {@link JsonParser} to {@link RowData} which has a higher + * parsing efficiency. + */ +@Internal +public class JsonParserRowDataDeserializationSchema extends AbstractJsonDeserializationSchema { + + /** + * Runtime converter that converts {@link JsonParser}s into objects of Flink SQL internal data + * structures. + */ + private final JsonParserToRowDataConverters.JsonParserToRowDataConverter runtimeConverter; + + public JsonParserRowDataDeserializationSchema( + RowType rowType, + TypeInformation<RowData> resultTypeInfo, + boolean failOnMissingField, + boolean ignoreParseErrors, + TimestampFormat timestampFormat) { + this(rowType, resultTypeInfo, failOnMissingField, ignoreParseErrors, timestampFormat, null); + } + + public JsonParserRowDataDeserializationSchema( + RowType rowType, + TypeInformation<RowData> resultTypeInfo, + boolean failOnMissingField, + boolean ignoreParseErrors, + TimestampFormat timestampFormat, + @Nullable String[][] projectedFields) { + super(rowType, resultTypeInfo, failOnMissingField, ignoreParseErrors, timestampFormat); + this.runtimeConverter = + new JsonParserToRowDataConverters( + failOnMissingField, ignoreParseErrors, timestampFormat) + .createConverter(projectedFields, checkNotNull(rowType)); + } + + @Override + public RowData deserialize(byte[] message) throws IOException { + // return null when there is no token + if (message == null || message.length == 0) { + return null; + } + try (JsonParser root = objectMapper.getFactory().createParser(message)) { + /* First: must point to a token; if not pointing to one, advance. + * This occurs before first read from JsonParser, as well as + * after clearing of current token. + */ + if (root.currentToken() == null) { + root.nextToken(); + } + if (root.currentToken() != JsonToken.START_OBJECT) { + throw JsonMappingException.from(root, "No content to map due to end-of-input"); + } + return (RowData) runtimeConverter.convert(root); + } catch (Throwable t) { + if (ignoreParseErrors) { + return null; + } + throw new IOException( + format("Failed to deserialize JSON '%s'.", new String(message)), t); + } + } +} diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java index 9a57bac203b..5a3fe22b308 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java @@ -19,24 +19,16 @@ package org.apache.flink.formats.json; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; -import org.apache.flink.util.jackson.JacksonMapperFactory; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import javax.annotation.Nullable; import java.io.IOException; -import java.util.Objects; import static java.lang.String.format; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -49,62 +41,25 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <p>Failures during deserialization are forwarded as wrapped IOExceptions. */ @Internal -public class JsonRowDataDeserializationSchema implements DeserializationSchema<RowData> { +public class JsonRowDataDeserializationSchema extends AbstractJsonDeserializationSchema { private static final long serialVersionUID = 1L; - /** Flag indicating whether to fail if a field is missing. */ - private final boolean failOnMissingField; - - /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ - private final boolean ignoreParseErrors; - - /** TypeInformation of the produced {@link RowData}. */ - private final TypeInformation<RowData> resultTypeInfo; - /** * Runtime converter that converts {@link JsonNode}s into objects of Flink SQL internal data * structures. */ private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter; - /** Object mapper for parsing the JSON. */ - private transient ObjectMapper objectMapper; - - /** Timestamp format specification which is used to parse timestamp. */ - private final TimestampFormat timestampFormat; - - private final boolean hasDecimalType; - public JsonRowDataDeserializationSchema( RowType rowType, TypeInformation<RowData> resultTypeInfo, boolean failOnMissingField, boolean ignoreParseErrors, TimestampFormat timestampFormat) { - if (ignoreParseErrors && failOnMissingField) { - throw new IllegalArgumentException( - "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled."); - } - this.resultTypeInfo = checkNotNull(resultTypeInfo); - this.failOnMissingField = failOnMissingField; - this.ignoreParseErrors = ignoreParseErrors; + super(rowType, resultTypeInfo, failOnMissingField, ignoreParseErrors, timestampFormat); this.runtimeConverter = new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat) .createConverter(checkNotNull(rowType)); - this.timestampFormat = timestampFormat; - this.hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType); - } - - @Override - public void open(InitializationContext context) throws Exception { - objectMapper = - JacksonMapperFactory.createObjectMapper() - .configure( - JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), - true); - if (hasDecimalType) { - objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); - } } @Override @@ -130,34 +85,4 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R public RowData convertToRowData(JsonNode message) { return (RowData) runtimeConverter.convert(message); } - - @Override - public boolean isEndOfStream(RowData nextElement) { - return false; - } - - @Override - public TypeInformation<RowData> getProducedType() { - return resultTypeInfo; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - JsonRowDataDeserializationSchema that = (JsonRowDataDeserializationSchema) o; - return failOnMissingField == that.failOnMissingField - && ignoreParseErrors == that.ignoreParseErrors - && resultTypeInfo.equals(that.resultTypeInfo) - && timestampFormat.equals(that.timestampFormat); - } - - @Override - public int hashCode() { - return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat); - } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java index c8b7f73b64d..376d0d568a3 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java @@ -38,7 +38,7 @@ import java.util.Objects; * <p>Serializes the input Flink object into a JSON string and converts it into <code>byte[]</code>. * * <p>Result <code>byte[]</code> messages can be deserialized using {@link - * JsonRowDataDeserializationSchema}. + * JsonRowDataDeserializationSchema} or {@link JsonParserRowDataDeserializationSchema}. */ @Internal public class JsonRowDataSerializationSchema implements SerializationSchema<RowData> { diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java index 7c1c553a6db..3559e2b2c87 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java @@ -151,8 +151,8 @@ class JsonFormatFactoryTest { } private void testSchemaDeserializationSchema(Map<String, String> options) { - final JsonRowDataDeserializationSchema expectedDeser = - new JsonRowDataDeserializationSchema( + final JsonParserRowDataDeserializationSchema expectedDeser = + new JsonParserRowDataDeserializationSchema( PHYSICAL_TYPE, InternalTypeInfo.of(PHYSICAL_TYPE), false, diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java new file mode 100644 index 00000000000..0d7509b2fb2 --- /dev/null +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.connector.testutils.formats.SchemaTestUtils.open; +import static org.apache.flink.formats.json.JsonRowDataSerDeSchemaTest.convertToExternal; +import static org.apache.flink.table.api.DataTypes.BOOLEAN; +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.MAP; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.SMALLINT; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.TINYINT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link JsonParserRowDataDeserializationSchema}. */ +public class JsonParserRowDataDeSerSchemaTest { + + /** + * Tests parsing partial fields in actual json data, sometimes we can skip some complex field + * parsing, e.g., field with Row/Array type. + */ + @Test + public void testParsePartialJson() throws Exception { + byte tinyint = 'c'; + short smallint = 128; + int intValue = 45536; + float floatValue = 33.333F; + long bigint = 1238123899121L; + String name = "asdlkjasjkdla998y1122"; + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + BigDecimal decimal = new BigDecimal("123.456789"); + + Map<String, Long> map = new HashMap<>(); + map.put("flink", 123L); + + Map<String, Map<String, Integer>> nestedMap = new HashMap<>(); + Map<String, Integer> innerMap = new HashMap<>(); + innerMap.put("key", 234); + nestedMap.put("inner_map", innerMap); + + ObjectMapper objectMapper = new ObjectMapper(); + ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D); + + // Root + ObjectNode root = objectMapper.createObjectNode(); + root.put("bool", true); + root.put("tinyint", tinyint); + root.put("smallint", smallint); + root.put("int", intValue); + root.put("bigint", bigint); + root.put("float", floatValue); + root.put("name", name); + root.put("bytes", bytes); + root.put("decimal", decimal); + root.set("doubles", doubleNode); + root.put("date", "1990-10-14"); + root.put("time", "12:12:43"); + root.put("timestamp3", "1990-10-14T12:12:43.123"); + root.put("timestamp9", "1990-10-14T12:12:43.123456789"); + root.putObject("map").put("flink", 123); + root.putObject("map2map").putObject("inner_map").put("key", 234); + + byte[] serializedJson = objectMapper.writeValueAsBytes(root); + + DataType dataType = + ROW( + FIELD("bool", BOOLEAN()), + FIELD("tinyint", TINYINT()), + FIELD("smallint", SMALLINT()), + FIELD("int", INT()), + FIELD("map2map", MAP(STRING(), MAP(STRING(), INT())))); + RowType schema = (RowType) dataType.getLogicalType(); + TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema); + + DeserializationSchema<RowData> deserializationSchema = + new JsonParserRowDataDeserializationSchema( + schema, resultTypeInfo, false, false, TimestampFormat.ISO_8601); + open(deserializationSchema); + + Row expected = new Row(5); + expected.setField(0, true); + expected.setField(1, tinyint); + expected.setField(2, smallint); + expected.setField(3, intValue); + expected.setField(4, nestedMap); + + RowData rowData = deserializationSchema.deserialize(serializedJson); + Row actual = convertToExternal(rowData, dataType); + assertThat(actual).isEqualTo(expected); + } + + @Test + public void testProjected() throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + ObjectNode root = objectMapper.createObjectNode(); + root.put("f0", 0); + root.put("f1", 1); + root.put("f2", 2); + root.put("f3", 3); + root.put("f4", 4); + innerTestProjected(objectMapper.writeValueAsBytes(root), GenericRowData.of(1, 4, 0, 2)); + } + + @Test + public void testProjectedNullable() throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + ObjectNode root = objectMapper.createObjectNode(); + root.put("f0", 0); + root.put("f1", 1); + root.putNull("f2"); + root.put("f3", 3); + root.putNull("f4"); + innerTestProjected( + objectMapper.writeValueAsBytes(root), GenericRowData.of(1, null, 0, null)); + } + + private void innerTestProjected(byte[] serializedJson, GenericRowData expected) + throws Exception { + DataType dataType = + ROW(FIELD("f1", INT()), FIELD("f4", INT()), FIELD("f0", INT()), FIELD("f2", INT())); + RowType schema = (RowType) dataType.getLogicalType(); + TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema); + + JsonParserRowDataDeserializationSchema deserializationSchema = + new JsonParserRowDataDeserializationSchema( + schema, + resultTypeInfo, + false, + false, + TimestampFormat.ISO_8601, + new String[][] { + new String[] {"f1"}, + new String[] {"f4"}, + new String[] {"f0"}, + new String[] {"f2"} + }); + open(deserializationSchema); + + RowData rowData = deserializationSchema.deserialize(serializedJson); + assertThat(rowData).isEqualTo(expected); + } + + private byte[] prepareNestedRow() throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + + // Root + ObjectNode root = objectMapper.createObjectNode(); + root.put("f0", 0); + root.put("f1", 1); + root.put("f2", 2); + ObjectNode f3 = root.putObject("f3"); + f3.put("f0", 30); + f3.put("f1", 31); + f3.put("f2", 32); + ObjectNode f4 = root.putObject("f4"); + f4.put("f0", 40); + f4.put("f1", 41); + + ObjectNode f5 = root.putObject("f5"); + f5.put("f0", 50); + ObjectNode f51 = f5.putObject("f1"); + f51.put("f0", 510); + f51.put("f1", 511); + + return objectMapper.writeValueAsBytes(root); + } + + private byte[] prepareNullableNestedRow() throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + + // Root + ObjectNode root = objectMapper.createObjectNode(); + root.put("f0", 0); + root.put("f1", 1); + root.put("f2", 2); + ObjectNode f3 = root.putObject("f3"); + f3.put("f0", 30); + f3.put("f1", 31); + f3.put("f2", 32); + ObjectNode f4 = root.putObject("f4"); + f4.putNull("f0"); + f4.put("f1", 41); + + ObjectNode f5 = root.putObject("f5"); + f5.put("f0", 50); + f5.putNull("f1"); + + return objectMapper.writeValueAsBytes(root); + } + + @Test + public void testProjectNestedField() throws Exception { + GenericRowData expected = GenericRowData.of(1, 31, 32, 510, 0, GenericRowData.of(40, 41)); + innerTestProjectNestedField(prepareNestedRow(), expected); + } + + @Test + public void testProjectNestedFieldNullable() throws Exception { + GenericRowData expected = + GenericRowData.of(1, 31, 32, null, 0, GenericRowData.of(null, 41)); + innerTestProjectNestedField(prepareNullableNestedRow(), expected); + } + + private void innerTestProjectNestedField(byte[] serializedJson, GenericRowData expected) + throws Exception { + DataType dataType = + ROW( + FIELD("f1", INT()), + FIELD("f3_1", INT()), + FIELD("f3_2", INT()), + FIELD("f5_1_0", INT()), + FIELD("f0", INT()), + FIELD("f4", ROW(FIELD("f0", INT()), FIELD("f1", INT())))); + RowType schema = (RowType) dataType.getLogicalType(); + TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema); + + JsonParserRowDataDeserializationSchema deserializationSchema = + new JsonParserRowDataDeserializationSchema( + schema, + resultTypeInfo, + false, + false, + TimestampFormat.ISO_8601, + new String[][] { + new String[] {"f1"}, + new String[] {"f3", "f1"}, + new String[] {"f3", "f2"}, + new String[] {"f5", "f1", "f0"}, + new String[] {"f0"}, + new String[] {"f4"} + }); + open(deserializationSchema); + + RowData rowData = deserializationSchema.deserialize(serializedJson); + assertThat(rowData).isEqualTo(expected); + } + + @Test + public void testProjectBothRowAndNestedField() throws Exception { + GenericRowData expected = + GenericRowData.of( + 1, + 32, + GenericRowData.of(40, 41), + 40, + 511, + GenericRowData.of(510, 511), + GenericRowData.of(50, GenericRowData.of(510, 511))); + innerTestProjectBothRowAndNestedField(prepareNestedRow(), expected); + } + + @Test + public void testProjectBothRowAndNestedFieldNullable() throws Exception { + GenericRowData expected = + GenericRowData.of( + 1, + 32, + GenericRowData.of(null, 41), + null, + null, + null, + GenericRowData.of(50, null)); + innerTestProjectBothRowAndNestedField(prepareNullableNestedRow(), expected); + } + + private void innerTestProjectBothRowAndNestedField( + byte[] serializedJson, GenericRowData expected) throws Exception { + DataType dataType = + ROW( + FIELD("f1", INT()), + FIELD("f3_2", INT()), + FIELD("f4", ROW(FIELD("f0", INT()), FIELD("f1", INT()))), + FIELD("f4_0", INT()), + FIELD("f5_1_1", INT()), + FIELD("f5_1", ROW(FIELD("f0", INT()), FIELD("f1", INT()))), + FIELD( + "f5", + ROW( + FIELD("f0", INT()), + FIELD("f1", ROW(FIELD("f0", INT()), FIELD("f1", INT())))))); + RowType schema = (RowType) dataType.getLogicalType(); + TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema); + + JsonParserRowDataDeserializationSchema deserializationSchema = + new JsonParserRowDataDeserializationSchema( + schema, + resultTypeInfo, + false, + false, + TimestampFormat.ISO_8601, + new String[][] { + new String[] {"f1"}, + new String[] {"f3", "f2"}, + new String[] {"f4"}, + new String[] {"f4", "f0"}, + new String[] {"f5", "f1", "f1"}, + new String[] {"f5", "f1"}, + new String[] {"f5"} + }); + open(deserializationSchema); + + RowData rowData = deserializationSchema.deserialize(serializedJson); + assertThat(rowData).isEqualTo(expected); + } +} diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index 883c3f09134..ced449e0936 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -18,7 +18,7 @@ package org.apache.flink.formats.json; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.formats.common.TimestampFormat; @@ -30,6 +30,9 @@ import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.types.Row; import org.apache.flink.util.jackson.JacksonMapperFactory; @@ -38,6 +41,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Arra import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import java.math.BigDecimal; import java.sql.Timestamp; @@ -47,6 +52,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -80,13 +86,22 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** - * Tests for {@link JsonRowDataDeserializationSchema} and {@link JsonRowDataSerializationSchema}. + * Tests for {@link JsonRowDataDeserializationSchema}, {@link + * JsonParserRowDataDeserializationSchema} and {@link JsonRowDataSerializationSchema}. */ -class JsonRowDataSerDeSchemaTest { +@ExtendWith(ParameterizedTestExtension.class) +public class JsonRowDataSerDeSchemaTest { private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); - @Test + @Parameter public boolean isJsonParser; + + @Parameters(name = "isJsonParser={0}") + public static Collection<Boolean> parameters() throws Exception { + return Arrays.asList(true, false); + } + + @TestTemplate void testSerDe() throws Exception { byte tinyint = 'c'; short smallint = 128; @@ -164,11 +179,10 @@ class JsonRowDataSerDeSchemaTest { FIELD("multiSet", MULTISET(STRING())), FIELD("map2map", MAP(STRING(), MAP(STRING(), INT())))); RowType schema = (RowType) dataType.getLogicalType(); - TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema); - JsonRowDataDeserializationSchema deserializationSchema = - new JsonRowDataDeserializationSchema( - schema, resultTypeInfo, false, false, TimestampFormat.ISO_8601); + DeserializationSchema<RowData> deserializationSchema = + createDeserializationSchema( + isJsonParser, schema, false, false, TimestampFormat.ISO_8601); open(deserializationSchema); Row expected = new Row(18); @@ -213,7 +227,7 @@ class JsonRowDataSerDeSchemaTest { * Tests the deserialization slow path, e.g. convert into string and use {@link * Double#parseDouble(String)}. */ - @Test + @TestTemplate void testSlowDeserialization() throws Exception { Random random = new Random(); boolean bool = random.nextBoolean(); @@ -244,13 +258,9 @@ class JsonRowDataSerDeSchemaTest { FIELD("float2", FLOAT())); RowType rowType = (RowType) dataType.getLogicalType(); - JsonRowDataDeserializationSchema deserializationSchema = - new JsonRowDataDeserializationSchema( - rowType, - InternalTypeInfo.of(rowType), - false, - false, - TimestampFormat.ISO_8601); + DeserializationSchema<RowData> deserializationSchema = + createDeserializationSchema( + isJsonParser, rowType, false, false, TimestampFormat.ISO_8601); open(deserializationSchema); Row expected = new Row(7); @@ -267,7 +277,7 @@ class JsonRowDataSerDeSchemaTest { assertThat(actual).isEqualTo(expected); } - @Test + @TestTemplate void testSerDeMultiRows() throws Exception { RowType rowType = (RowType) @@ -280,13 +290,9 @@ class JsonRowDataSerDeSchemaTest { FIELD("f6", ROW(FIELD("f1", STRING()), FIELD("f2", INT())))) .getLogicalType(); - JsonRowDataDeserializationSchema deserializationSchema = - new JsonRowDataDeserializationSchema( - rowType, - InternalTypeInfo.of(rowType), - false, - false, - TimestampFormat.ISO_8601); + DeserializationSchema<RowData> deserializationSchema = + createDeserializationSchema( + isJsonParser, rowType, false, false, TimestampFormat.ISO_8601); open(deserializationSchema); JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema( @@ -338,7 +344,7 @@ class JsonRowDataSerDeSchemaTest { } } - @Test + @TestTemplate void testSerDeMultiRowsWithNullValues() throws Exception { String[] jsons = new String[] { @@ -365,13 +371,9 @@ class JsonRowDataSerDeSchemaTest { FIELD("metrics", MAP(STRING(), DOUBLE()))) .getLogicalType(); - JsonRowDataDeserializationSchema deserializationSchema = - new JsonRowDataDeserializationSchema( - rowType, - InternalTypeInfo.of(rowType), - false, - true, - TimestampFormat.ISO_8601); + DeserializationSchema<RowData> deserializationSchema = + createDeserializationSchema( + isJsonParser, rowType, false, true, TimestampFormat.ISO_8601); open(deserializationSchema); JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema( @@ -390,33 +392,33 @@ class JsonRowDataSerDeSchemaTest { } } - @Test + @TestTemplate void testDeserializationNullRow() throws Exception { DataType dataType = ROW(FIELD("name", STRING())); RowType schema = (RowType) dataType.getLogicalType(); - JsonRowDataDeserializationSchema deserializationSchema = - new JsonRowDataDeserializationSchema( - schema, InternalTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601); + DeserializationSchema<RowData> deserializationSchema = + createDeserializationSchema( + isJsonParser, schema, true, false, TimestampFormat.ISO_8601); open(deserializationSchema); assertThat(deserializationSchema.deserialize(null)).isNull(); } - @Test + @TestTemplate void testDeserializationMissingNode() throws Exception { DataType dataType = ROW(FIELD("name", STRING())); RowType schema = (RowType) dataType.getLogicalType(); - JsonRowDataDeserializationSchema deserializationSchema = - new JsonRowDataDeserializationSchema( - schema, InternalTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601); + DeserializationSchema<RowData> deserializationSchema = + createDeserializationSchema( + isJsonParser, schema, true, false, TimestampFormat.ISO_8601); open(deserializationSchema); RowData rowData = deserializationSchema.deserialize("".getBytes()); assertThat(rowData).isNull(); } - @Test + @TestTemplate void testDeserializationMissingField() throws Exception { // Root ObjectNode root = OBJECT_MAPPER.createObjectNode(); @@ -427,13 +429,9 @@ class JsonRowDataSerDeSchemaTest { RowType schema = (RowType) dataType.getLogicalType(); // pass on missing field - JsonRowDataDeserializationSchema deserializationSchema = - new JsonRowDataDeserializationSchema( - schema, - InternalTypeInfo.of(schema), - false, - false, - TimestampFormat.ISO_8601); + DeserializationSchema<RowData> deserializationSchema = + createDeserializationSchema( + isJsonParser, schema, false, false, TimestampFormat.ISO_8601); open(deserializationSchema); Row expected = new Row(1); @@ -442,20 +440,20 @@ class JsonRowDataSerDeSchemaTest { // fail on missing field deserializationSchema = - new JsonRowDataDeserializationSchema( - schema, InternalTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601); + createDeserializationSchema( + isJsonParser, schema, true, false, TimestampFormat.ISO_8601); open(deserializationSchema); String errorMessage = "Failed to deserialize JSON '{\"id\":123123123}'."; - JsonRowDataDeserializationSchema finalDeserializationSchema = deserializationSchema; + DeserializationSchema<RowData> finalDeserializationSchema = deserializationSchema; assertThatThrownBy(() -> finalDeserializationSchema.deserialize(serializedJson)) .hasMessage(errorMessage); // ignore on parse error deserializationSchema = - new JsonRowDataDeserializationSchema( - schema, InternalTypeInfo.of(schema), false, true, TimestampFormat.ISO_8601); + createDeserializationSchema( + isJsonParser, schema, false, true, TimestampFormat.ISO_8601); open(deserializationSchema); actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataType); assertThat(actual).isEqualTo(expected); @@ -473,7 +471,7 @@ class JsonRowDataSerDeSchemaTest { .hasMessage(errorMessage); } - @Test + @TestTemplate void testSerDeSQLTimestampFormat() throws Exception { RowType rowType = (RowType) @@ -488,9 +486,9 @@ class JsonRowDataSerDeSchemaTest { TIMESTAMP_WITH_LOCAL_TIME_ZONE(9))) .getLogicalType(); - JsonRowDataDeserializationSchema deserializationSchema = - new JsonRowDataDeserializationSchema( - rowType, InternalTypeInfo.of(rowType), false, false, TimestampFormat.SQL); + DeserializationSchema<RowData> deserializationSchema = + createDeserializationSchema( + isJsonParser, rowType, false, false, TimestampFormat.SQL); open(deserializationSchema); JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema( @@ -582,7 +580,7 @@ class JsonRowDataSerDeSchemaTest { assertThat(new String(actual3)).isEqualTo(expectResult3); } - @Test + @TestTemplate void testSerializationDecimalEncode() throws Exception { RowType schema = (RowType) @@ -592,11 +590,9 @@ class JsonRowDataSerDeSchemaTest { FIELD("decimal3", DECIMAL(11, 9))) .getLogicalType(); - TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema); - - JsonRowDataDeserializationSchema deserializer = - new JsonRowDataDeserializationSchema( - schema, resultTypeInfo, false, false, TimestampFormat.ISO_8601); + DeserializationSchema<RowData> deserializer = + createDeserializationSchema( + isJsonParser, schema, false, false, TimestampFormat.ISO_8601); deserializer.open(new DummyInitializationContext()); JsonRowDataSerializationSchema plainDecimalSerializer = @@ -630,7 +626,7 @@ class JsonRowDataSerDeSchemaTest { assertThat(scientificDecimalResult).isEqualTo(scientificDecimalJson); } - @Test + @TestTemplate void testJsonParse() throws Exception { for (TestSpec spec : testData) { testIgnoreParseErrors(spec); @@ -660,13 +656,13 @@ class JsonRowDataSerDeSchemaTest { .satisfies(anyCauseMatches(RuntimeException.class, errorMessage)); } - @Test + @TestTemplate void testDeserializationWithTypesMismatch() { RowType rowType = (RowType) ROW(FIELD("f0", STRING()), FIELD("f1", INT())).getLogicalType(); String json = "{\"f0\":\"abc\", \"f1\": \"abc\"}"; - JsonRowDataDeserializationSchema deserializationSchema = - new JsonRowDataDeserializationSchema( - rowType, InternalTypeInfo.of(rowType), false, false, TimestampFormat.SQL); + DeserializationSchema<RowData> deserializationSchema = + createDeserializationSchema( + isJsonParser, rowType, false, false, TimestampFormat.SQL); open(deserializationSchema); String errorMessage = "Fail to deserialize at field: f1."; @@ -676,13 +672,9 @@ class JsonRowDataSerDeSchemaTest { private void testIgnoreParseErrors(TestSpec spec) throws Exception { // the parsing field should be null and no exception is thrown - JsonRowDataDeserializationSchema ignoreErrorsSchema = - new JsonRowDataDeserializationSchema( - spec.rowType, - InternalTypeInfo.of(spec.rowType), - false, - true, - spec.timestampFormat); + DeserializationSchema<RowData> ignoreErrorsSchema = + createDeserializationSchema( + isJsonParser, spec.rowType, false, true, spec.timestampFormat); ignoreErrorsSchema.open(new DummyInitializationContext()); Row expected; @@ -700,13 +692,9 @@ class JsonRowDataSerDeSchemaTest { private void testParseErrors(TestSpec spec) { // expect exception if parse error is not ignored - JsonRowDataDeserializationSchema failingSchema = - new JsonRowDataDeserializationSchema( - spec.rowType, - InternalTypeInfo.of(spec.rowType), - false, - false, - spec.timestampFormat); + DeserializationSchema<RowData> failingSchema = + createDeserializationSchema( + isJsonParser, spec.rowType, false, false, spec.timestampFormat); open(failingSchema); assertThatThrownBy(() -> failingSchema.deserialize(spec.json.getBytes())) @@ -724,6 +712,12 @@ class JsonRowDataSerDeSchemaTest { TestSpec.json("{\"id\":\"abc\"}") .rowType(ROW(FIELD("id", INT()))) .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'."), + TestSpec.json("{\"id\":11211111111.013}") + .rowType(ROW(FIELD("id", INT()))) + .expect(null), + TestSpec.json("{\"id\":112.013}") + .rowType(ROW(FIELD("id", INT()))) + .expect(Row.of(112)), TestSpec.json("{\"id\":112.013}") .rowType(ROW(FIELD("id", BIGINT()))) .expect(Row.of(112L)), @@ -830,10 +824,33 @@ class JsonRowDataSerDeSchemaTest { } @SuppressWarnings("unchecked") - private static Row convertToExternal(RowData rowData, DataType dataType) { + static Row convertToExternal(RowData rowData, DataType dataType) { return (Row) DataFormatConverters.getConverterForDataType(dataType).toExternal(rowData); } + private DeserializationSchema<RowData> createDeserializationSchema( + boolean isJsonParser, + RowType rowType, + boolean failOnMissingField, + boolean ignoreParseErrors, + TimestampFormat timestampFormat) { + if (isJsonParser) { + return new JsonParserRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + failOnMissingField, + ignoreParseErrors, + timestampFormat); + } else { + return new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + failOnMissingField, + ignoreParseErrors, + timestampFormat); + } + } + private static class TestSpec { private final String json; private RowType rowType;
