This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6cd5b68058bfab658482ce4ca5774153fb79828
Author: fengli <[email protected]>
AuthorDate: Wed Jul 19 12:24:51 2023 +0800

    [FLINK-32610][json] Introduce JsonParserToRowDataConverter based 
JsonParserToRowDataConverter deserialization schema which supports projection 
push-down of nested fields
---
 ...java => AbstractJsonDeserializationSchema.java} |  58 +---
 .../flink/formats/json/JsonFormatFactory.java      |  50 ++-
 .../flink/formats/json/JsonFormatOptions.java      |   7 +
 .../JsonParserRowDataDeserializationSchema.java    | 100 ++++++
 .../json/JsonRowDataDeserializationSchema.java     |  79 +----
 .../json/JsonRowDataSerializationSchema.java       |   2 +-
 .../flink/formats/json/JsonFormatFactoryTest.java  |   4 +-
 .../json/JsonParserRowDataDeSerSchemaTest.java     | 346 +++++++++++++++++++++
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 183 ++++++-----
 9 files changed, 611 insertions(+), 218 deletions(-)

diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
similarity index 70%
copy from 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
copy to 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
index 9a57bac203b..aa62e0d5f87 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.formats.json;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.common.TimestampFormat;
@@ -30,52 +29,40 @@ import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import javax.annotation.Nullable;
-
-import java.io.IOException;
 import java.util.Objects;
 
-import static java.lang.String.format;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Deserialization schema from JSON to Flink Table/SQL internal data structure 
{@link RowData}.
- *
- * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads 
the specified fields.
+ * Deserialization schema from JSON to Flink Table/SQL internal data structure 
{@link RowData}. This
+ * is the abstract base class which has different implementation.
  *
  * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
  */
-@Internal
-public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<RowData> {
+public abstract class AbstractJsonDeserializationSchema implements 
DeserializationSchema<RowData> {
+
     private static final long serialVersionUID = 1L;
 
     /** Flag indicating whether to fail if a field is missing. */
-    private final boolean failOnMissingField;
+    protected final boolean failOnMissingField;
 
     /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
-    private final boolean ignoreParseErrors;
+    protected final boolean ignoreParseErrors;
 
     /** TypeInformation of the produced {@link RowData}. */
     private final TypeInformation<RowData> resultTypeInfo;
 
-    /**
-     * Runtime converter that converts {@link JsonNode}s into objects of Flink 
SQL internal data
-     * structures.
-     */
-    private final JsonToRowDataConverters.JsonToRowDataConverter 
runtimeConverter;
-
     /** Object mapper for parsing the JSON. */
-    private transient ObjectMapper objectMapper;
+    protected transient ObjectMapper objectMapper;
 
     /** Timestamp format specification which is used to parse timestamp. */
     private final TimestampFormat timestampFormat;
 
     private final boolean hasDecimalType;
 
-    public JsonRowDataDeserializationSchema(
+    public AbstractJsonDeserializationSchema(
             RowType rowType,
             TypeInformation<RowData> resultTypeInfo,
             boolean failOnMissingField,
@@ -88,9 +75,6 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
         this.resultTypeInfo = checkNotNull(resultTypeInfo);
         this.failOnMissingField = failOnMissingField;
         this.ignoreParseErrors = ignoreParseErrors;
-        this.runtimeConverter =
-                new JsonToRowDataConverters(failOnMissingField, 
ignoreParseErrors, timestampFormat)
-                        .createConverter(checkNotNull(rowType));
         this.timestampFormat = timestampFormat;
         this.hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t 
instanceof DecimalType);
     }
@@ -107,30 +91,6 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
         }
     }
 
-    @Override
-    public RowData deserialize(@Nullable byte[] message) throws IOException {
-        if (message == null) {
-            return null;
-        }
-        try {
-            return convertToRowData(deserializeToJsonNode(message));
-        } catch (Throwable t) {
-            if (ignoreParseErrors) {
-                return null;
-            }
-            throw new IOException(
-                    format("Failed to deserialize JSON '%s'.", new 
String(message)), t);
-        }
-    }
-
-    public JsonNode deserializeToJsonNode(byte[] message) throws IOException {
-        return objectMapper.readTree(message);
-    }
-
-    public RowData convertToRowData(JsonNode message) {
-        return (RowData) runtimeConverter.convert(message);
-    }
-
     @Override
     public boolean isEndOfStream(RowData nextElement) {
         return false;
@@ -149,7 +109,7 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        JsonRowDataDeserializationSchema that = 
(JsonRowDataDeserializationSchema) o;
+        AbstractJsonDeserializationSchema that = 
(AbstractJsonDeserializationSchema) o;
         return failOnMissingField == that.failOnMissingField
                 && ignoreParseErrors == that.ignoreParseErrors
                 && resultTypeInfo.equals(that.resultTypeInfo)
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index 74d8c5310a0..562b99e6bc8 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -44,6 +44,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import static 
org.apache.flink.formats.json.JsonFormatOptions.DECODE_JSON_PARSER_ENABLED;
 import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
 import static 
org.apache.flink.formats.json.JsonFormatOptions.FAIL_ON_MISSING_FIELD;
 import static 
org.apache.flink.formats.json.JsonFormatOptions.IGNORE_PARSE_ERRORS;
@@ -68,6 +69,7 @@ public class JsonFormatFactory implements 
DeserializationFormatFactory, Serializ
 
         final boolean failOnMissingField = 
formatOptions.get(FAIL_ON_MISSING_FIELD);
         final boolean ignoreParseErrors = 
formatOptions.get(IGNORE_PARSE_ERRORS);
+        final boolean jsonParserEnabled = 
formatOptions.get(DECODE_JSON_PARSER_ENABLED);
         TimestampFormat timestampOption = 
JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
 
         return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() 
{
@@ -81,21 +83,57 @@ public class JsonFormatFactory implements 
DeserializationFormatFactory, Serializ
                 final RowType rowType = (RowType) 
producedDataType.getLogicalType();
                 final TypeInformation<RowData> rowDataTypeInfo =
                         context.createTypeInformation(producedDataType);
-                return new JsonRowDataDeserializationSchema(
-                        rowType,
-                        rowDataTypeInfo,
-                        failOnMissingField,
-                        ignoreParseErrors,
-                        timestampOption);
+                if (jsonParserEnabled) {
+                    return new JsonParserRowDataDeserializationSchema(
+                            rowType,
+                            rowDataTypeInfo,
+                            failOnMissingField,
+                            ignoreParseErrors,
+                            timestampOption,
+                            toProjectedNames(
+                                    (RowType) 
physicalDataType.getLogicalType(), projections));
+                } else {
+                    return new JsonRowDataDeserializationSchema(
+                            rowType,
+                            rowDataTypeInfo,
+                            failOnMissingField,
+                            ignoreParseErrors,
+                            timestampOption);
+                }
             }
 
             @Override
             public ChangelogMode getChangelogMode() {
                 return ChangelogMode.insertOnly();
             }
+
+            @Override
+            public boolean supportsNestedProjection() {
+                return jsonParserEnabled;
+            }
         };
     }
 
+    private String[][] toProjectedNames(RowType type, int[][] projectedFields) 
{
+        String[][] projectedNames = new String[projectedFields.length][];
+        for (int i = 0; i < projectedNames.length; i++) {
+            int[] fieldIndices = projectedFields[i];
+            String[] fieldNames = new String[fieldIndices.length];
+            projectedNames[i] = fieldNames;
+
+            // convert fieldIndices to fieldNames
+            RowType currentType = type;
+            for (int j = 0; j < fieldIndices.length; j++) {
+                int index = fieldIndices[j];
+                fieldNames[j] = currentType.getFieldNames().get(index);
+                if (j != fieldIndices.length - 1) {
+                    currentType = (RowType) currentType.getTypeAt(index);
+                }
+            }
+        }
+        return projectedNames;
+    }
+
     @Override
     public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
index 74d567ab9bb..5c9e61068ac 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
@@ -73,6 +73,13 @@ public class JsonFormatOptions {
                     .withDescription(
                             "Optional flag to specify whether to encode all 
decimals as plain numbers instead of possible scientific notations, false by 
default.");
 
+    public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED =
+            ConfigOptions.key("decode.json-parser.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Optional flag to specify whether to use the 
Jackson JsonParser to decode json with better performance, true by default.");
+
     // 
--------------------------------------------------------------------------------------------
     // Enums
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
new file mode 100644
index 00000000000..22df48f2ac2
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tool class used to convert fields from {@link JsonParser} to {@link 
RowData} which has a higher
+ * parsing efficiency.
+ */
+@Internal
+public class JsonParserRowDataDeserializationSchema extends 
AbstractJsonDeserializationSchema {
+
+    /**
+     * Runtime converter that converts {@link JsonParser}s into objects of 
Flink SQL internal data
+     * structures.
+     */
+    private final JsonParserToRowDataConverters.JsonParserToRowDataConverter 
runtimeConverter;
+
+    public JsonParserRowDataDeserializationSchema(
+            RowType rowType,
+            TypeInformation<RowData> resultTypeInfo,
+            boolean failOnMissingField,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat) {
+        this(rowType, resultTypeInfo, failOnMissingField, ignoreParseErrors, 
timestampFormat, null);
+    }
+
+    public JsonParserRowDataDeserializationSchema(
+            RowType rowType,
+            TypeInformation<RowData> resultTypeInfo,
+            boolean failOnMissingField,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat,
+            @Nullable String[][] projectedFields) {
+        super(rowType, resultTypeInfo, failOnMissingField, ignoreParseErrors, 
timestampFormat);
+        this.runtimeConverter =
+                new JsonParserToRowDataConverters(
+                                failOnMissingField, ignoreParseErrors, 
timestampFormat)
+                        .createConverter(projectedFields, 
checkNotNull(rowType));
+    }
+
+    @Override
+    public RowData deserialize(byte[] message) throws IOException {
+        // return null when there is no token
+        if (message == null || message.length == 0) {
+            return null;
+        }
+        try (JsonParser root = 
objectMapper.getFactory().createParser(message)) {
+            /* First: must point to a token; if not pointing to one, advance.
+             * This occurs before first read from JsonParser, as well as
+             * after clearing of current token.
+             */
+            if (root.currentToken() == null) {
+                root.nextToken();
+            }
+            if (root.currentToken() != JsonToken.START_OBJECT) {
+                throw JsonMappingException.from(root, "No content to map due 
to end-of-input");
+            }
+            return (RowData) runtimeConverter.convert(root);
+        } catch (Throwable t) {
+            if (ignoreParseErrors) {
+                return null;
+            }
+            throw new IOException(
+                    format("Failed to deserialize JSON '%s'.", new 
String(message)), t);
+        }
+    }
+}
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 9a57bac203b..5a3fe22b308 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -19,24 +19,16 @@
 package org.apache.flink.formats.json;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
-import org.apache.flink.util.jackson.JacksonMapperFactory;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Objects;
 
 import static java.lang.String.format;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -49,62 +41,25 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
  */
 @Internal
-public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<RowData> {
+public class JsonRowDataDeserializationSchema extends 
AbstractJsonDeserializationSchema {
     private static final long serialVersionUID = 1L;
 
-    /** Flag indicating whether to fail if a field is missing. */
-    private final boolean failOnMissingField;
-
-    /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
-    private final boolean ignoreParseErrors;
-
-    /** TypeInformation of the produced {@link RowData}. */
-    private final TypeInformation<RowData> resultTypeInfo;
-
     /**
      * Runtime converter that converts {@link JsonNode}s into objects of Flink 
SQL internal data
      * structures.
      */
     private final JsonToRowDataConverters.JsonToRowDataConverter 
runtimeConverter;
 
-    /** Object mapper for parsing the JSON. */
-    private transient ObjectMapper objectMapper;
-
-    /** Timestamp format specification which is used to parse timestamp. */
-    private final TimestampFormat timestampFormat;
-
-    private final boolean hasDecimalType;
-
     public JsonRowDataDeserializationSchema(
             RowType rowType,
             TypeInformation<RowData> resultTypeInfo,
             boolean failOnMissingField,
             boolean ignoreParseErrors,
             TimestampFormat timestampFormat) {
-        if (ignoreParseErrors && failOnMissingField) {
-            throw new IllegalArgumentException(
-                    "JSON format doesn't support failOnMissingField and 
ignoreParseErrors are both enabled.");
-        }
-        this.resultTypeInfo = checkNotNull(resultTypeInfo);
-        this.failOnMissingField = failOnMissingField;
-        this.ignoreParseErrors = ignoreParseErrors;
+        super(rowType, resultTypeInfo, failOnMissingField, ignoreParseErrors, 
timestampFormat);
         this.runtimeConverter =
                 new JsonToRowDataConverters(failOnMissingField, 
ignoreParseErrors, timestampFormat)
                         .createConverter(checkNotNull(rowType));
-        this.timestampFormat = timestampFormat;
-        this.hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t 
instanceof DecimalType);
-    }
-
-    @Override
-    public void open(InitializationContext context) throws Exception {
-        objectMapper =
-                JacksonMapperFactory.createObjectMapper()
-                        .configure(
-                                
JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
-                                true);
-        if (hasDecimalType) {
-            
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
-        }
     }
 
     @Override
@@ -130,34 +85,4 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
     public RowData convertToRowData(JsonNode message) {
         return (RowData) runtimeConverter.convert(message);
     }
-
-    @Override
-    public boolean isEndOfStream(RowData nextElement) {
-        return false;
-    }
-
-    @Override
-    public TypeInformation<RowData> getProducedType() {
-        return resultTypeInfo;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        JsonRowDataDeserializationSchema that = 
(JsonRowDataDeserializationSchema) o;
-        return failOnMissingField == that.failOnMissingField
-                && ignoreParseErrors == that.ignoreParseErrors
-                && resultTypeInfo.equals(that.resultTypeInfo)
-                && timestampFormat.equals(that.timestampFormat);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(failOnMissingField, ignoreParseErrors, 
resultTypeInfo, timestampFormat);
-    }
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index c8b7f73b64d..376d0d568a3 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -38,7 +38,7 @@ import java.util.Objects;
  * <p>Serializes the input Flink object into a JSON string and converts it 
into <code>byte[]</code>.
  *
  * <p>Result <code>byte[]</code> messages can be deserialized using {@link
- * JsonRowDataDeserializationSchema}.
+ * JsonRowDataDeserializationSchema} or {@link 
JsonParserRowDataDeserializationSchema}.
  */
 @Internal
 public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowData> {
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
index 7c1c553a6db..3559e2b2c87 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
@@ -151,8 +151,8 @@ class JsonFormatFactoryTest {
     }
 
     private void testSchemaDeserializationSchema(Map<String, String> options) {
-        final JsonRowDataDeserializationSchema expectedDeser =
-                new JsonRowDataDeserializationSchema(
+        final JsonParserRowDataDeserializationSchema expectedDeser =
+                new JsonParserRowDataDeserializationSchema(
                         PHYSICAL_TYPE,
                         InternalTypeInfo.of(PHYSICAL_TYPE),
                         false,
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java
new file mode 100644
index 00000000000..0d7509b2fb2
--- /dev/null
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static 
org.apache.flink.connector.testutils.formats.SchemaTestUtils.open;
+import static 
org.apache.flink.formats.json.JsonRowDataSerDeSchemaTest.convertToExternal;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link JsonParserRowDataDeserializationSchema}. */
+public class JsonParserRowDataDeSerSchemaTest {
+
+    /**
+     * Tests parsing partial fields in actual json data, sometimes we can skip 
some complex field
+     * parsing, e.g., field with Row/Array type.
+     */
+    @Test
+    public void testParsePartialJson() throws Exception {
+        byte tinyint = 'c';
+        short smallint = 128;
+        int intValue = 45536;
+        float floatValue = 33.333F;
+        long bigint = 1238123899121L;
+        String name = "asdlkjasjkdla998y1122";
+        byte[] bytes = new byte[1024];
+        ThreadLocalRandom.current().nextBytes(bytes);
+        BigDecimal decimal = new BigDecimal("123.456789");
+
+        Map<String, Long> map = new HashMap<>();
+        map.put("flink", 123L);
+
+        Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+        Map<String, Integer> innerMap = new HashMap<>();
+        innerMap.put("key", 234);
+        nestedMap.put("inner_map", innerMap);
+
+        ObjectMapper objectMapper = new ObjectMapper();
+        ArrayNode doubleNode = 
objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
+
+        // Root
+        ObjectNode root = objectMapper.createObjectNode();
+        root.put("bool", true);
+        root.put("tinyint", tinyint);
+        root.put("smallint", smallint);
+        root.put("int", intValue);
+        root.put("bigint", bigint);
+        root.put("float", floatValue);
+        root.put("name", name);
+        root.put("bytes", bytes);
+        root.put("decimal", decimal);
+        root.set("doubles", doubleNode);
+        root.put("date", "1990-10-14");
+        root.put("time", "12:12:43");
+        root.put("timestamp3", "1990-10-14T12:12:43.123");
+        root.put("timestamp9", "1990-10-14T12:12:43.123456789");
+        root.putObject("map").put("flink", 123);
+        root.putObject("map2map").putObject("inner_map").put("key", 234);
+
+        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+        DataType dataType =
+                ROW(
+                        FIELD("bool", BOOLEAN()),
+                        FIELD("tinyint", TINYINT()),
+                        FIELD("smallint", SMALLINT()),
+                        FIELD("int", INT()),
+                        FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))));
+        RowType schema = (RowType) dataType.getLogicalType();
+        TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema);
+
+        DeserializationSchema<RowData> deserializationSchema =
+                new JsonParserRowDataDeserializationSchema(
+                        schema, resultTypeInfo, false, false, 
TimestampFormat.ISO_8601);
+        open(deserializationSchema);
+
+        Row expected = new Row(5);
+        expected.setField(0, true);
+        expected.setField(1, tinyint);
+        expected.setField(2, smallint);
+        expected.setField(3, intValue);
+        expected.setField(4, nestedMap);
+
+        RowData rowData = deserializationSchema.deserialize(serializedJson);
+        Row actual = convertToExternal(rowData, dataType);
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testProjected() throws Exception {
+        ObjectMapper objectMapper = new ObjectMapper();
+        ObjectNode root = objectMapper.createObjectNode();
+        root.put("f0", 0);
+        root.put("f1", 1);
+        root.put("f2", 2);
+        root.put("f3", 3);
+        root.put("f4", 4);
+        innerTestProjected(objectMapper.writeValueAsBytes(root), 
GenericRowData.of(1, 4, 0, 2));
+    }
+
+    @Test
+    public void testProjectedNullable() throws Exception {
+        ObjectMapper objectMapper = new ObjectMapper();
+        ObjectNode root = objectMapper.createObjectNode();
+        root.put("f0", 0);
+        root.put("f1", 1);
+        root.putNull("f2");
+        root.put("f3", 3);
+        root.putNull("f4");
+        innerTestProjected(
+                objectMapper.writeValueAsBytes(root), GenericRowData.of(1, 
null, 0, null));
+    }
+
+    private void innerTestProjected(byte[] serializedJson, GenericRowData 
expected)
+            throws Exception {
+        DataType dataType =
+                ROW(FIELD("f1", INT()), FIELD("f4", INT()), FIELD("f0", 
INT()), FIELD("f2", INT()));
+        RowType schema = (RowType) dataType.getLogicalType();
+        TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema);
+
+        JsonParserRowDataDeserializationSchema deserializationSchema =
+                new JsonParserRowDataDeserializationSchema(
+                        schema,
+                        resultTypeInfo,
+                        false,
+                        false,
+                        TimestampFormat.ISO_8601,
+                        new String[][] {
+                            new String[] {"f1"},
+                            new String[] {"f4"},
+                            new String[] {"f0"},
+                            new String[] {"f2"}
+                        });
+        open(deserializationSchema);
+
+        RowData rowData = deserializationSchema.deserialize(serializedJson);
+        assertThat(rowData).isEqualTo(expected);
+    }
+
+    private byte[] prepareNestedRow() throws JsonProcessingException {
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        // Root
+        ObjectNode root = objectMapper.createObjectNode();
+        root.put("f0", 0);
+        root.put("f1", 1);
+        root.put("f2", 2);
+        ObjectNode f3 = root.putObject("f3");
+        f3.put("f0", 30);
+        f3.put("f1", 31);
+        f3.put("f2", 32);
+        ObjectNode f4 = root.putObject("f4");
+        f4.put("f0", 40);
+        f4.put("f1", 41);
+
+        ObjectNode f5 = root.putObject("f5");
+        f5.put("f0", 50);
+        ObjectNode f51 = f5.putObject("f1");
+        f51.put("f0", 510);
+        f51.put("f1", 511);
+
+        return objectMapper.writeValueAsBytes(root);
+    }
+
+    private byte[] prepareNullableNestedRow() throws JsonProcessingException {
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        // Root
+        ObjectNode root = objectMapper.createObjectNode();
+        root.put("f0", 0);
+        root.put("f1", 1);
+        root.put("f2", 2);
+        ObjectNode f3 = root.putObject("f3");
+        f3.put("f0", 30);
+        f3.put("f1", 31);
+        f3.put("f2", 32);
+        ObjectNode f4 = root.putObject("f4");
+        f4.putNull("f0");
+        f4.put("f1", 41);
+
+        ObjectNode f5 = root.putObject("f5");
+        f5.put("f0", 50);
+        f5.putNull("f1");
+
+        return objectMapper.writeValueAsBytes(root);
+    }
+
+    @Test
+    public void testProjectNestedField() throws Exception {
+        GenericRowData expected = GenericRowData.of(1, 31, 32, 510, 0, 
GenericRowData.of(40, 41));
+        innerTestProjectNestedField(prepareNestedRow(), expected);
+    }
+
+    @Test
+    public void testProjectNestedFieldNullable() throws Exception {
+        GenericRowData expected =
+                GenericRowData.of(1, 31, 32, null, 0, GenericRowData.of(null, 
41));
+        innerTestProjectNestedField(prepareNullableNestedRow(), expected);
+    }
+
+    private void innerTestProjectNestedField(byte[] serializedJson, 
GenericRowData expected)
+            throws Exception {
+        DataType dataType =
+                ROW(
+                        FIELD("f1", INT()),
+                        FIELD("f3_1", INT()),
+                        FIELD("f3_2", INT()),
+                        FIELD("f5_1_0", INT()),
+                        FIELD("f0", INT()),
+                        FIELD("f4", ROW(FIELD("f0", INT()), FIELD("f1", 
INT()))));
+        RowType schema = (RowType) dataType.getLogicalType();
+        TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema);
+
+        JsonParserRowDataDeserializationSchema deserializationSchema =
+                new JsonParserRowDataDeserializationSchema(
+                        schema,
+                        resultTypeInfo,
+                        false,
+                        false,
+                        TimestampFormat.ISO_8601,
+                        new String[][] {
+                            new String[] {"f1"},
+                            new String[] {"f3", "f1"},
+                            new String[] {"f3", "f2"},
+                            new String[] {"f5", "f1", "f0"},
+                            new String[] {"f0"},
+                            new String[] {"f4"}
+                        });
+        open(deserializationSchema);
+
+        RowData rowData = deserializationSchema.deserialize(serializedJson);
+        assertThat(rowData).isEqualTo(expected);
+    }
+
+    @Test
+    public void testProjectBothRowAndNestedField() throws Exception {
+        GenericRowData expected =
+                GenericRowData.of(
+                        1,
+                        32,
+                        GenericRowData.of(40, 41),
+                        40,
+                        511,
+                        GenericRowData.of(510, 511),
+                        GenericRowData.of(50, GenericRowData.of(510, 511)));
+        innerTestProjectBothRowAndNestedField(prepareNestedRow(), expected);
+    }
+
+    @Test
+    public void testProjectBothRowAndNestedFieldNullable() throws Exception {
+        GenericRowData expected =
+                GenericRowData.of(
+                        1,
+                        32,
+                        GenericRowData.of(null, 41),
+                        null,
+                        null,
+                        null,
+                        GenericRowData.of(50, null));
+        innerTestProjectBothRowAndNestedField(prepareNullableNestedRow(), 
expected);
+    }
+
+    private void innerTestProjectBothRowAndNestedField(
+            byte[] serializedJson, GenericRowData expected) throws Exception {
+        DataType dataType =
+                ROW(
+                        FIELD("f1", INT()),
+                        FIELD("f3_2", INT()),
+                        FIELD("f4", ROW(FIELD("f0", INT()), FIELD("f1", 
INT()))),
+                        FIELD("f4_0", INT()),
+                        FIELD("f5_1_1", INT()),
+                        FIELD("f5_1", ROW(FIELD("f0", INT()), FIELD("f1", 
INT()))),
+                        FIELD(
+                                "f5",
+                                ROW(
+                                        FIELD("f0", INT()),
+                                        FIELD("f1", ROW(FIELD("f0", INT()), 
FIELD("f1", INT()))))));
+        RowType schema = (RowType) dataType.getLogicalType();
+        TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema);
+
+        JsonParserRowDataDeserializationSchema deserializationSchema =
+                new JsonParserRowDataDeserializationSchema(
+                        schema,
+                        resultTypeInfo,
+                        false,
+                        false,
+                        TimestampFormat.ISO_8601,
+                        new String[][] {
+                            new String[] {"f1"},
+                            new String[] {"f3", "f2"},
+                            new String[] {"f4"},
+                            new String[] {"f4", "f0"},
+                            new String[] {"f5", "f1", "f1"},
+                            new String[] {"f5", "f1"},
+                            new String[] {"f5"}
+                        });
+        open(deserializationSchema);
+
+        RowData rowData = deserializationSchema.deserialize(serializedJson);
+        assertThat(rowData).isEqualTo(expected);
+    }
+}
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 883c3f09134..ced449e0936 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.formats.json;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.formats.common.TimestampFormat;
@@ -30,6 +30,9 @@ import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
@@ -38,6 +41,8 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Arra
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.math.BigDecimal;
 import java.sql.Timestamp;
@@ -47,6 +52,7 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneOffset;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -80,13 +86,22 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
- * Tests for {@link JsonRowDataDeserializationSchema} and {@link 
JsonRowDataSerializationSchema}.
+ * Tests for {@link JsonRowDataDeserializationSchema}, {@link
+ * JsonParserRowDataDeserializationSchema} and {@link 
JsonRowDataSerializationSchema}.
  */
-class JsonRowDataSerDeSchemaTest {
+@ExtendWith(ParameterizedTestExtension.class)
+public class JsonRowDataSerDeSchemaTest {
 
     private static final ObjectMapper OBJECT_MAPPER = 
JacksonMapperFactory.createObjectMapper();
 
-    @Test
+    @Parameter public boolean isJsonParser;
+
+    @Parameters(name = "isJsonParser={0}")
+    public static Collection<Boolean> parameters() throws Exception {
+        return Arrays.asList(true, false);
+    }
+
+    @TestTemplate
     void testSerDe() throws Exception {
         byte tinyint = 'c';
         short smallint = 128;
@@ -164,11 +179,10 @@ class JsonRowDataSerDeSchemaTest {
                         FIELD("multiSet", MULTISET(STRING())),
                         FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))));
         RowType schema = (RowType) dataType.getLogicalType();
-        TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema);
 
-        JsonRowDataDeserializationSchema deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        schema, resultTypeInfo, false, false, 
TimestampFormat.ISO_8601);
+        DeserializationSchema<RowData> deserializationSchema =
+                createDeserializationSchema(
+                        isJsonParser, schema, false, false, 
TimestampFormat.ISO_8601);
         open(deserializationSchema);
 
         Row expected = new Row(18);
@@ -213,7 +227,7 @@ class JsonRowDataSerDeSchemaTest {
      * Tests the deserialization slow path, e.g. convert into string and use 
{@link
      * Double#parseDouble(String)}.
      */
-    @Test
+    @TestTemplate
     void testSlowDeserialization() throws Exception {
         Random random = new Random();
         boolean bool = random.nextBoolean();
@@ -244,13 +258,9 @@ class JsonRowDataSerDeSchemaTest {
                         FIELD("float2", FLOAT()));
         RowType rowType = (RowType) dataType.getLogicalType();
 
-        JsonRowDataDeserializationSchema deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        rowType,
-                        InternalTypeInfo.of(rowType),
-                        false,
-                        false,
-                        TimestampFormat.ISO_8601);
+        DeserializationSchema<RowData> deserializationSchema =
+                createDeserializationSchema(
+                        isJsonParser, rowType, false, false, 
TimestampFormat.ISO_8601);
         open(deserializationSchema);
 
         Row expected = new Row(7);
@@ -267,7 +277,7 @@ class JsonRowDataSerDeSchemaTest {
         assertThat(actual).isEqualTo(expected);
     }
 
-    @Test
+    @TestTemplate
     void testSerDeMultiRows() throws Exception {
         RowType rowType =
                 (RowType)
@@ -280,13 +290,9 @@ class JsonRowDataSerDeSchemaTest {
                                         FIELD("f6", ROW(FIELD("f1", STRING()), 
FIELD("f2", INT()))))
                                 .getLogicalType();
 
-        JsonRowDataDeserializationSchema deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        rowType,
-                        InternalTypeInfo.of(rowType),
-                        false,
-                        false,
-                        TimestampFormat.ISO_8601);
+        DeserializationSchema<RowData> deserializationSchema =
+                createDeserializationSchema(
+                        isJsonParser, rowType, false, false, 
TimestampFormat.ISO_8601);
         open(deserializationSchema);
         JsonRowDataSerializationSchema serializationSchema =
                 new JsonRowDataSerializationSchema(
@@ -338,7 +344,7 @@ class JsonRowDataSerDeSchemaTest {
         }
     }
 
-    @Test
+    @TestTemplate
     void testSerDeMultiRowsWithNullValues() throws Exception {
         String[] jsons =
                 new String[] {
@@ -365,13 +371,9 @@ class JsonRowDataSerDeSchemaTest {
                                         FIELD("metrics", MAP(STRING(), 
DOUBLE())))
                                 .getLogicalType();
 
-        JsonRowDataDeserializationSchema deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        rowType,
-                        InternalTypeInfo.of(rowType),
-                        false,
-                        true,
-                        TimestampFormat.ISO_8601);
+        DeserializationSchema<RowData> deserializationSchema =
+                createDeserializationSchema(
+                        isJsonParser, rowType, false, true, 
TimestampFormat.ISO_8601);
         open(deserializationSchema);
         JsonRowDataSerializationSchema serializationSchema =
                 new JsonRowDataSerializationSchema(
@@ -390,33 +392,33 @@ class JsonRowDataSerDeSchemaTest {
         }
     }
 
-    @Test
+    @TestTemplate
     void testDeserializationNullRow() throws Exception {
         DataType dataType = ROW(FIELD("name", STRING()));
         RowType schema = (RowType) dataType.getLogicalType();
 
-        JsonRowDataDeserializationSchema deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        schema, InternalTypeInfo.of(schema), true, false, 
TimestampFormat.ISO_8601);
+        DeserializationSchema<RowData> deserializationSchema =
+                createDeserializationSchema(
+                        isJsonParser, schema, true, false, 
TimestampFormat.ISO_8601);
         open(deserializationSchema);
 
         assertThat(deserializationSchema.deserialize(null)).isNull();
     }
 
-    @Test
+    @TestTemplate
     void testDeserializationMissingNode() throws Exception {
         DataType dataType = ROW(FIELD("name", STRING()));
         RowType schema = (RowType) dataType.getLogicalType();
 
-        JsonRowDataDeserializationSchema deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        schema, InternalTypeInfo.of(schema), true, false, 
TimestampFormat.ISO_8601);
+        DeserializationSchema<RowData> deserializationSchema =
+                createDeserializationSchema(
+                        isJsonParser, schema, true, false, 
TimestampFormat.ISO_8601);
         open(deserializationSchema);
         RowData rowData = deserializationSchema.deserialize("".getBytes());
         assertThat(rowData).isNull();
     }
 
-    @Test
+    @TestTemplate
     void testDeserializationMissingField() throws Exception {
         // Root
         ObjectNode root = OBJECT_MAPPER.createObjectNode();
@@ -427,13 +429,9 @@ class JsonRowDataSerDeSchemaTest {
         RowType schema = (RowType) dataType.getLogicalType();
 
         // pass on missing field
-        JsonRowDataDeserializationSchema deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        schema,
-                        InternalTypeInfo.of(schema),
-                        false,
-                        false,
-                        TimestampFormat.ISO_8601);
+        DeserializationSchema<RowData> deserializationSchema =
+                createDeserializationSchema(
+                        isJsonParser, schema, false, false, 
TimestampFormat.ISO_8601);
         open(deserializationSchema);
 
         Row expected = new Row(1);
@@ -442,20 +440,20 @@ class JsonRowDataSerDeSchemaTest {
 
         // fail on missing field
         deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        schema, InternalTypeInfo.of(schema), true, false, 
TimestampFormat.ISO_8601);
+                createDeserializationSchema(
+                        isJsonParser, schema, true, false, 
TimestampFormat.ISO_8601);
         open(deserializationSchema);
 
         String errorMessage = "Failed to deserialize JSON 
'{\"id\":123123123}'.";
 
-        JsonRowDataDeserializationSchema finalDeserializationSchema = 
deserializationSchema;
+        DeserializationSchema<RowData> finalDeserializationSchema = 
deserializationSchema;
         assertThatThrownBy(() -> 
finalDeserializationSchema.deserialize(serializedJson))
                 .hasMessage(errorMessage);
 
         // ignore on parse error
         deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        schema, InternalTypeInfo.of(schema), false, true, 
TimestampFormat.ISO_8601);
+                createDeserializationSchema(
+                        isJsonParser, schema, false, true, 
TimestampFormat.ISO_8601);
         open(deserializationSchema);
         actual = 
convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
         assertThat(actual).isEqualTo(expected);
@@ -473,7 +471,7 @@ class JsonRowDataSerDeSchemaTest {
                 .hasMessage(errorMessage);
     }
 
-    @Test
+    @TestTemplate
     void testSerDeSQLTimestampFormat() throws Exception {
         RowType rowType =
                 (RowType)
@@ -488,9 +486,9 @@ class JsonRowDataSerDeSchemaTest {
                                                 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)))
                                 .getLogicalType();
 
-        JsonRowDataDeserializationSchema deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        rowType, InternalTypeInfo.of(rowType), false, false, 
TimestampFormat.SQL);
+        DeserializationSchema<RowData> deserializationSchema =
+                createDeserializationSchema(
+                        isJsonParser, rowType, false, false, 
TimestampFormat.SQL);
         open(deserializationSchema);
         JsonRowDataSerializationSchema serializationSchema =
                 new JsonRowDataSerializationSchema(
@@ -582,7 +580,7 @@ class JsonRowDataSerDeSchemaTest {
         assertThat(new String(actual3)).isEqualTo(expectResult3);
     }
 
-    @Test
+    @TestTemplate
     void testSerializationDecimalEncode() throws Exception {
         RowType schema =
                 (RowType)
@@ -592,11 +590,9 @@ class JsonRowDataSerDeSchemaTest {
                                         FIELD("decimal3", DECIMAL(11, 9)))
                                 .getLogicalType();
 
-        TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema);
-
-        JsonRowDataDeserializationSchema deserializer =
-                new JsonRowDataDeserializationSchema(
-                        schema, resultTypeInfo, false, false, 
TimestampFormat.ISO_8601);
+        DeserializationSchema<RowData> deserializer =
+                createDeserializationSchema(
+                        isJsonParser, schema, false, false, 
TimestampFormat.ISO_8601);
         deserializer.open(new DummyInitializationContext());
 
         JsonRowDataSerializationSchema plainDecimalSerializer =
@@ -630,7 +626,7 @@ class JsonRowDataSerDeSchemaTest {
         assertThat(scientificDecimalResult).isEqualTo(scientificDecimalJson);
     }
 
-    @Test
+    @TestTemplate
     void testJsonParse() throws Exception {
         for (TestSpec spec : testData) {
             testIgnoreParseErrors(spec);
@@ -660,13 +656,13 @@ class JsonRowDataSerDeSchemaTest {
                 .satisfies(anyCauseMatches(RuntimeException.class, 
errorMessage));
     }
 
-    @Test
+    @TestTemplate
     void testDeserializationWithTypesMismatch() {
         RowType rowType = (RowType) ROW(FIELD("f0", STRING()), FIELD("f1", 
INT())).getLogicalType();
         String json = "{\"f0\":\"abc\", \"f1\": \"abc\"}";
-        JsonRowDataDeserializationSchema deserializationSchema =
-                new JsonRowDataDeserializationSchema(
-                        rowType, InternalTypeInfo.of(rowType), false, false, 
TimestampFormat.SQL);
+        DeserializationSchema<RowData> deserializationSchema =
+                createDeserializationSchema(
+                        isJsonParser, rowType, false, false, 
TimestampFormat.SQL);
         open(deserializationSchema);
         String errorMessage = "Fail to deserialize at field: f1.";
 
@@ -676,13 +672,9 @@ class JsonRowDataSerDeSchemaTest {
 
     private void testIgnoreParseErrors(TestSpec spec) throws Exception {
         // the parsing field should be null and no exception is thrown
-        JsonRowDataDeserializationSchema ignoreErrorsSchema =
-                new JsonRowDataDeserializationSchema(
-                        spec.rowType,
-                        InternalTypeInfo.of(spec.rowType),
-                        false,
-                        true,
-                        spec.timestampFormat);
+        DeserializationSchema<RowData> ignoreErrorsSchema =
+                createDeserializationSchema(
+                        isJsonParser, spec.rowType, false, true, 
spec.timestampFormat);
         ignoreErrorsSchema.open(new DummyInitializationContext());
 
         Row expected;
@@ -700,13 +692,9 @@ class JsonRowDataSerDeSchemaTest {
 
     private void testParseErrors(TestSpec spec) {
         // expect exception if parse error is not ignored
-        JsonRowDataDeserializationSchema failingSchema =
-                new JsonRowDataDeserializationSchema(
-                        spec.rowType,
-                        InternalTypeInfo.of(spec.rowType),
-                        false,
-                        false,
-                        spec.timestampFormat);
+        DeserializationSchema<RowData> failingSchema =
+                createDeserializationSchema(
+                        isJsonParser, spec.rowType, false, false, 
spec.timestampFormat);
         open(failingSchema);
 
         assertThatThrownBy(() -> 
failingSchema.deserialize(spec.json.getBytes()))
@@ -724,6 +712,12 @@ class JsonRowDataSerDeSchemaTest {
                     TestSpec.json("{\"id\":\"abc\"}")
                             .rowType(ROW(FIELD("id", INT())))
                             .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"abc\"}'."),
+                    TestSpec.json("{\"id\":11211111111.013}")
+                            .rowType(ROW(FIELD("id", INT())))
+                            .expect(null),
+                    TestSpec.json("{\"id\":112.013}")
+                            .rowType(ROW(FIELD("id", INT())))
+                            .expect(Row.of(112)),
                     TestSpec.json("{\"id\":112.013}")
                             .rowType(ROW(FIELD("id", BIGINT())))
                             .expect(Row.of(112L)),
@@ -830,10 +824,33 @@ class JsonRowDataSerDeSchemaTest {
     }
 
     @SuppressWarnings("unchecked")
-    private static Row convertToExternal(RowData rowData, DataType dataType) {
+    static Row convertToExternal(RowData rowData, DataType dataType) {
         return (Row) 
DataFormatConverters.getConverterForDataType(dataType).toExternal(rowData);
     }
 
+    private DeserializationSchema<RowData> createDeserializationSchema(
+            boolean isJsonParser,
+            RowType rowType,
+            boolean failOnMissingField,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat) {
+        if (isJsonParser) {
+            return new JsonParserRowDataDeserializationSchema(
+                    rowType,
+                    InternalTypeInfo.of(rowType),
+                    failOnMissingField,
+                    ignoreParseErrors,
+                    timestampFormat);
+        } else {
+            return new JsonRowDataDeserializationSchema(
+                    rowType,
+                    InternalTypeInfo.of(rowType),
+                    failOnMissingField,
+                    ignoreParseErrors,
+                    timestampFormat);
+        }
+    }
+
     private static class TestSpec {
         private final String json;
         private RowType rowType;

Reply via email to