This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58f7ceb45013bca847b39901f62ce746680e4f0c
Author: fengli <[email protected]>
AuthorDate: Wed Jul 19 12:21:00 2023 +0800

    [FLINK-32610][json] Introduce JsonParser based JsonParserToRowDataConverter 
with better performance
---
 .../flink/formats/json/JsonParseException.java     |  32 ++
 .../json/JsonParserToRowDataConverters.java        | 626 +++++++++++++++++++++
 .../formats/json/JsonToRowDataConverters.java      |  13 -
 3 files changed, 658 insertions(+), 13 deletions(-)

diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java
new file mode 100644
index 00000000000..6c75a37cfd8
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+/** Exception which refers to parse errors in converters. */
+public class JsonParseException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public JsonParseException(String message) {
+        super(message);
+    }
+
+    public JsonParseException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
new file mode 100644
index 00000000000..6556502f0d4
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static 
org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT;
+
+/** Tool class used to convert fields from {@link JsonParser} to {@link 
RowData}. */
+@Internal
+public class JsonParserToRowDataConverters implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** Flag indicating whether to fail if a field is missing. */
+    private final boolean failOnMissingField;
+
+    /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
+    private final boolean ignoreParseErrors;
+
+    /** Timestamp format specification which is used to parse timestamp. */
+    private final TimestampFormat timestampFormat;
+
+    public JsonParserToRowDataConverters(
+            boolean failOnMissingField,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat) {
+        this.failOnMissingField = failOnMissingField;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormat = timestampFormat;
+    }
+
+    /**
+     * Runtime converter that converts {@link JsonParser}s into objects of 
Flink Table & SQL
+     * internal data structures. Unlike {@link 
JsonToRowDataConverters.JsonToRowDataConverter}, this
+     * interface also supports projection pushdown of nested fields.
+     */
+    @FunctionalInterface
+    public interface JsonParserToRowDataConverter extends Serializable {
+        Object convert(JsonParser jsonParser) throws IOException;
+    }
+
+    /** Creates a runtime nested converter which is null safe. */
+    public JsonParserToRowDataConverter createConverter(
+            String[][] projectedFields, RowType rowType) {
+        // If projectedFields is null or doesn't contain nested fields, 
fallback to origin way
+        if (projectedFields == null
+                || Arrays.stream(projectedFields).allMatch(arr -> arr.length 
== 1)) {
+            return createConverter(rowType);
+        }
+
+        RowNestedConverter rowConverter = new RowNestedConverter();
+        for (int i = 0; i < projectedFields.length; i++) {
+            addFieldConverter(
+                    rowConverter.fieldConverters, projectedFields[i], 0, i, 
rowType.getTypeAt(i));
+        }
+
+        // DO NOT USE Lambda,it has shade problem.
+        return new JsonParserToRowDataConverter() {
+
+            @Override
+            public Object convert(JsonParser jp) throws IOException {
+                GenericRowData row = new 
GenericRowData(rowType.getFieldCount());
+                rowConverter.convert(jp, row);
+                return row;
+            }
+        };
+    }
+
+    /** Creates a runtime converter which is null safe. */
+    private JsonParserToRowDataConverter createConverter(LogicalType type) {
+        return wrapIntoNullableConverter(createNotNullConverter(type));
+    }
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    private JsonParserToRowDataConverter createNotNullConverter(LogicalType 
type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return jsonNode -> null;
+            case BOOLEAN:
+                return this::convertToBoolean;
+            case TINYINT:
+                return this::convertToByte;
+            case SMALLINT:
+                return this::convertToShort;
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return this::convertToInt;
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return this::convertToLong;
+            case DATE:
+                return this::convertToDate;
+            case TIME_WITHOUT_TIME_ZONE:
+                return this::convertToTime;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return this::convertToTimestamp;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return this::convertToTimestampWithLocalZone;
+            case FLOAT:
+                return this::convertToFloat;
+            case DOUBLE:
+                return this::convertToDouble;
+            case CHAR:
+            case VARCHAR:
+                return this::convertToString;
+            case BINARY:
+            case VARBINARY:
+                return JsonParser::getBinaryValue;
+            case DECIMAL:
+                return createDecimalConverter((DecimalType) type);
+            case ARRAY:
+                return createArrayConverter((ArrayType) type);
+            case MAP:
+                MapType mapType = (MapType) type;
+                return createMapConverter(
+                        mapType.asSummaryString(), mapType.getKeyType(), 
mapType.getValueType());
+            case MULTISET:
+                MultisetType multisetType = (MultisetType) type;
+                return createMapConverter(
+                        multisetType.asSummaryString(),
+                        multisetType.getElementType(),
+                        new IntType());
+            case ROW:
+                return createRowConverter((RowType) type);
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
type);
+        }
+    }
+
+    private boolean convertToBoolean(JsonParser jp) throws IOException {
+        if (jp.currentToken() == JsonToken.VALUE_TRUE) {
+            return true;
+        } else if (jp.currentToken() == JsonToken.VALUE_FALSE) {
+            return false;
+        } else {
+            return Boolean.parseBoolean(jp.getText().trim());
+        }
+    }
+
+    private byte convertToByte(JsonParser jp) throws IOException {
+        if (jp.currentToken() == JsonToken.VALUE_NUMBER_INT) {
+            // DON'T use jp.getByteValue() whose value is from -128 to 255 
because of the unsigned
+            // value.
+            int value = jp.getIntValue();
+            if (value < Byte.MIN_VALUE || value > Byte.MAX_VALUE) {
+                throw new JsonParseException(
+                        String.format("Numeric value (%s) out of range of Java 
byte.", value));
+            }
+            return (byte) value;
+        } else {
+            return Byte.parseByte(jp.getText().trim());
+        }
+    }
+
+    private short convertToShort(JsonParser jp) throws IOException {
+        if (jp.currentToken() == JsonToken.VALUE_NUMBER_INT) {
+            return jp.getShortValue();
+        } else {
+            return Short.parseShort(jp.getText().trim());
+        }
+    }
+
+    private int convertToInt(JsonParser jp) throws IOException {
+        if (jp.currentToken() == JsonToken.VALUE_NUMBER_INT
+                || jp.currentToken() == JsonToken.VALUE_NUMBER_FLOAT) {
+            return jp.getIntValue();
+        } else {
+            return Integer.parseInt(jp.getText().trim());
+        }
+    }
+
+    private long convertToLong(JsonParser jp) throws IOException {
+        if (jp.currentToken() == JsonToken.VALUE_NUMBER_INT
+                || jp.currentToken() == JsonToken.VALUE_NUMBER_FLOAT) {
+            return jp.getLongValue();
+        } else {
+            return Long.parseLong(jp.getText().trim());
+        }
+    }
+
+    private double convertToDouble(JsonParser jp) throws IOException {
+        if (jp.currentToken() == JsonToken.VALUE_NUMBER_FLOAT) {
+            return jp.getDoubleValue();
+        } else {
+            return Double.parseDouble(jp.getText().trim());
+        }
+    }
+
+    private float convertToFloat(JsonParser jp) throws IOException {
+        if (jp.currentToken() == JsonToken.VALUE_NUMBER_FLOAT) {
+            return jp.getFloatValue();
+        } else {
+            return Float.parseFloat(jp.getText().trim());
+        }
+    }
+
+    private int convertToDate(JsonParser jp) throws IOException {
+        LocalDate date = 
ISO_LOCAL_DATE.parse(jp.getText()).query(TemporalQueries.localDate());
+        return (int) date.toEpochDay();
+    }
+
+    private int convertToTime(JsonParser jsonNode) throws IOException {
+        TemporalAccessor parsedTime = 
SQL_TIME_FORMAT.parse(jsonNode.getText());
+        LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
+
+        // get number of milliseconds of the day
+        return localTime.toSecondOfDay() * 1000;
+    }
+
+    private TimestampData convertToTimestamp(JsonParser jp) throws IOException 
{
+        TemporalAccessor parsedTimestamp;
+        switch (timestampFormat) {
+            case SQL:
+                parsedTimestamp = SQL_TIMESTAMP_FORMAT.parse(jp.getText());
+                break;
+            case ISO_8601:
+                parsedTimestamp = ISO8601_TIMESTAMP_FORMAT.parse(jp.getText());
+                break;
+            default:
+                throw new TableException(
+                        String.format(
+                                "Unsupported timestamp format '%s'. Validator 
should have checked that.",
+                                timestampFormat));
+        }
+        LocalTime localTime = 
parsedTimestamp.query(TemporalQueries.localTime());
+        LocalDate localDate = 
parsedTimestamp.query(TemporalQueries.localDate());
+
+        return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, 
localTime));
+    }
+
+    private TimestampData convertToTimestampWithLocalZone(JsonParser jp) 
throws IOException {
+        TemporalAccessor parsedTimestampWithLocalZone;
+        switch (timestampFormat) {
+            case SQL:
+                parsedTimestampWithLocalZone =
+                        
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jp.getText());
+                break;
+            case ISO_8601:
+                parsedTimestampWithLocalZone =
+                        
ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jp.getText());
+                break;
+            default:
+                throw new TableException(
+                        String.format(
+                                "Unsupported timestamp format '%s'. Validator 
should have checked that.",
+                                timestampFormat));
+        }
+        LocalTime localTime = 
parsedTimestampWithLocalZone.query(TemporalQueries.localTime());
+        LocalDate localDate = 
parsedTimestampWithLocalZone.query(TemporalQueries.localDate());
+
+        return TimestampData.fromInstant(
+                LocalDateTime.of(localDate, 
localTime).toInstant(ZoneOffset.UTC));
+    }
+
+    private StringData convertToString(JsonParser jp) throws IOException {
+        if (jp.currentToken() == JsonToken.START_OBJECT
+                || jp.currentToken() == JsonToken.START_ARRAY) {
+            return StringData.fromString(jp.readValueAsTree().toString());
+        } else {
+            return StringData.fromString(jp.getText());
+        }
+    }
+
+    private JsonParserToRowDataConverter createDecimalConverter(DecimalType 
decimalType) {
+        final int precision = decimalType.getPrecision();
+        final int scale = decimalType.getScale();
+        return jp -> {
+            BigDecimal bigDecimal;
+            if (jp.currentToken() == JsonToken.VALUE_STRING) {
+                bigDecimal = new BigDecimal(jp.getText().trim());
+            } else {
+                bigDecimal = jp.getDecimalValue();
+            }
+            return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+        };
+    }
+
+    private JsonParserToRowDataConverter createArrayConverter(ArrayType 
arrayType) {
+        JsonParserToRowDataConverter elementConverter = 
createConverter(arrayType.getElementType());
+        final Class<?> elementClass =
+                
LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+        return jp -> {
+            if (jp.currentToken() != JsonToken.START_ARRAY) {
+                throw new IllegalStateException("Illegal JSON array data...");
+            }
+            List<Object> result = new ArrayList<>();
+            while (jp.nextToken() != JsonToken.END_ARRAY) {
+                Object convertField = elementConverter.convert(jp);
+                result.add(convertField);
+            }
+            final Object[] array = (Object[]) Array.newInstance(elementClass, 
result.size());
+            return new GenericArrayData(result.toArray(array));
+        };
+    }
+
+    private JsonParserToRowDataConverter createMapConverter(
+            String typeSummary, LogicalType keyType, LogicalType valueType) {
+        if (!keyType.is(LogicalTypeFamily.CHARACTER_STRING)) {
+            throw new UnsupportedOperationException(
+                    "JSON format doesn't support non-string as key type of 
map. "
+                            + "The type is: "
+                            + typeSummary);
+        }
+        final JsonParserToRowDataConverter keyConverter = 
createConverter(keyType);
+        final JsonParserToRowDataConverter valueConverter = 
createConverter(valueType);
+
+        return jp -> {
+            if (jp.currentToken() != JsonToken.START_OBJECT) {
+                throw new IllegalStateException("Illegal JSON map data...");
+            }
+            Map<Object, Object> result = new HashMap<>();
+            while (jp.nextToken() != JsonToken.END_OBJECT) {
+                Object key = keyConverter.convert(jp);
+                jp.nextToken();
+                Object value = valueConverter.convert(jp);
+                result.put(key, value);
+            }
+            return new GenericMapData(result);
+        };
+    }
+
+    public JsonParserToRowDataConverter createRowConverter(RowType rowType) {
+        final JsonParserToRowDataConverter[] fieldConverters =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(this::createConverter)
+                        .toArray(JsonParserToRowDataConverter[]::new);
+        final String[] fieldNames = rowType.getFieldNames().toArray(new 
String[0]);
+
+        Map<String, Integer> nameIdxMap = new HashMap<>();
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            nameIdxMap.put(fieldNames[i], i);
+        }
+
+        return jp -> {
+            if (jp.currentToken() != JsonToken.START_OBJECT) {
+                throw new IllegalStateException("Illegal JSON object data...");
+            }
+            int arity = nameIdxMap.size();
+            GenericRowData row = new GenericRowData(arity);
+            int cnt = 0;
+            jp.nextToken();
+            while (jp.currentToken() != JsonToken.END_OBJECT) {
+                if (cnt >= arity) {
+                    skipToNextField(jp);
+                    continue;
+                }
+                String fieldName = jp.getText();
+                jp.nextToken();
+                Integer idx = nameIdxMap.get(fieldName);
+                if (idx != null) {
+                    try {
+                        Object convertField = fieldConverters[idx].convert(jp);
+                        row.setField(idx, convertField);
+                    } catch (Throwable t) {
+                        throw new JsonParseException(
+                                String.format("Fail to deserialize at field: 
%s.", fieldName));
+                    }
+                    jp.nextToken();
+                    cnt++;
+                } else {
+                    skipToNextField(jp);
+                }
+            }
+            if (cnt < arity && failOnMissingField) {
+                throw new JsonParseException("Some field is missing in the 
JSON data.");
+            }
+            return row;
+        };
+    }
+
+    public static void skipToNextField(JsonParser jp) throws IOException {
+        switch (jp.currentToken()) {
+            case START_OBJECT:
+            case START_ARRAY:
+                int match = 1;
+                JsonToken token;
+                while (match > 0) {
+                    token = jp.nextToken();
+                    if (token == JsonToken.END_ARRAY || token == 
JsonToken.END_OBJECT) {
+                        match--;
+                    } else if (token == JsonToken.START_ARRAY || token == 
JsonToken.START_OBJECT) {
+                        match++;
+                    }
+                }
+                break;
+            default:
+        }
+        jp.nextToken();
+    }
+
+    private JsonParserToRowDataConverter wrapIntoNullableConverter(
+            JsonParserToRowDataConverter converter) {
+        return jp -> {
+            if (jp == null
+                    || jp.currentToken() == null
+                    || jp.getCurrentToken() == JsonToken.VALUE_NULL) {
+                return null;
+            }
+            try {
+                return converter.convert(jp);
+            } catch (Throwable t) {
+                if (!ignoreParseErrors) {
+                    throw t;
+                }
+                return null;
+            }
+        };
+    }
+
+    private void addFieldConverter(
+            Map<String, ProjectedConverter> converters,
+            String[] nestedField,
+            int depth,
+            int outputPos,
+            LogicalType type) {
+        String name = nestedField[depth];
+        if (depth == nestedField.length - 1) {
+            FieldConverter fieldConverter = new FieldConverter(outputPos, 
type);
+            ProjectedConverter converter = converters.get(name);
+            if (converter instanceof RowNestedConverter) {
+                // Has RowNestedConverter, fallback to get nested field from a 
converted row field.
+                for (Map.Entry<Integer, String[]> entry :
+                        ((RowNestedConverter) 
converter).outputPosToPath.entrySet()) {
+                    fieldConverter.addNestedFieldPath(entry.getKey(), 
entry.getValue());
+                }
+            } else if (converter instanceof FieldConverter) {
+                throw new RuntimeException("This is a bug, contains duplicated 
fields.");
+            }
+            converters.put(name, fieldConverter);
+        } else {
+            ProjectedConverter converter =
+                    converters.computeIfAbsent(name, k -> new 
RowNestedConverter());
+            String[] namePath = ArrayUtils.subarray(nestedField, depth + 1, 
nestedField.length);
+            if (converter instanceof FieldConverter) {
+                // Already converted this row field, just get nested field 
from it.
+                ((FieldConverter) converter).addNestedFieldPath(outputPos, 
namePath);
+            } else {
+                RowNestedConverter rowConverter = (RowNestedConverter) 
converter;
+                rowConverter.outputPosToPath.put(outputPos, namePath);
+                addFieldConverter(
+                        rowConverter.fieldConverters, nestedField, depth + 1, 
outputPos, type);
+            }
+        }
+    }
+
+    /**
+     * Runtime converter that converts {@link JsonParser}s into objects of 
Flink Table & SQL
+     * internal data structures.
+     */
+    private abstract static class ProjectedConverter implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        public final void convert(JsonParser jp, GenericRowData outputRow) 
throws IOException {
+            if (jp != null
+                    && jp.currentToken() != null
+                    && jp.getCurrentToken() != JsonToken.VALUE_NULL) {
+                convertNotNull(jp, outputRow);
+            }
+        }
+
+        public abstract void convertNotNull(JsonParser jp, GenericRowData 
outputRow)
+                throws IOException;
+    }
+
+    private class FieldConverter extends ProjectedConverter {
+
+        private static final long serialVersionUID = 1L;
+
+        private final int outputPos;
+
+        private final JsonParserToRowDataConverter converter;
+
+        private final LogicalType type;
+
+        private final Map<Integer, int[]> outputPosToPath = new HashMap<>();
+
+        public FieldConverter(int outputPos, LogicalType type) {
+            this.outputPos = outputPos;
+            this.converter = createConverter(type);
+            this.type = type;
+        }
+
+        @Override
+        public void convertNotNull(JsonParser jp, GenericRowData outputRow) 
throws IOException {
+            Object field = converter.convert(jp);
+            outputRow.setField(outputPos, field);
+            if (field != null && !outputPosToPath.isEmpty()) {
+                outNestedFields(field, outputRow);
+            }
+        }
+
+        private void outNestedFields(Object field, GenericRowData outputRow) {
+            outputLoop:
+            for (Map.Entry<Integer, int[]> entry : outputPosToPath.entrySet()) 
{
+                Object currentField = field;
+                for (int i : entry.getValue()) {
+                    if (currentField == null) {
+                        continue outputLoop;
+                    }
+                    currentField = ((GenericRowData) currentField).getField(i);
+                }
+                outputRow.setField(entry.getKey(), currentField);
+            }
+        }
+
+        public void addNestedFieldPath(int pos, String[] namePath) {
+            int[] path = new int[namePath.length];
+            LogicalType currentType = type;
+            for (int i = 0; i < path.length; i++) {
+                if (!(currentType instanceof RowType)) {
+                    throw new RuntimeException(
+                            "This is a bug, currentType should be row type, 
but is: "
+                                    + currentType);
+                }
+
+                int fieldIndex = ((RowType) 
currentType).getFieldNames().indexOf(namePath[i]);
+                currentType = currentType.getChildren().get(fieldIndex);
+                path[i] = fieldIndex;
+            }
+            outputPosToPath.put(pos, path);
+        }
+    }
+
+    private class RowNestedConverter extends ProjectedConverter {
+
+        private static final long serialVersionUID = 1L;
+
+        private final Map<String, ProjectedConverter> fieldConverters = new 
HashMap<>();
+
+        // Keep path here for fallback to get nested field from a converted 
row field.
+        private final Map<Integer, String[]> outputPosToPath = new HashMap<>();
+
+        @Override
+        public void convertNotNull(JsonParser jp, GenericRowData outputRow) 
throws IOException {
+            if (jp.currentToken() != JsonToken.START_OBJECT) {
+                throw new IllegalStateException("Illegal Json Data...");
+            }
+            int arity = fieldConverters.size();
+            int cnt = 0;
+            jp.nextToken();
+            while (jp.currentToken() != JsonToken.END_OBJECT) {
+                if (cnt >= arity) {
+                    skipToNextField(jp);
+                    continue;
+                }
+                String fieldName = jp.getText();
+                jp.nextToken();
+                ProjectedConverter converter = fieldConverters.get(fieldName);
+                if (converter != null) {
+                    converter.convert(jp, outputRow);
+                    jp.nextToken();
+                    cnt++;
+                } else {
+                    skipToNextField(jp);
+                }
+            }
+            if (cnt < arity && failOnMissingField) {
+                throw new JsonParseException("Some field is missing in the 
Json data.");
+            }
+        }
+    }
+}
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
index c89b5f58377..5da937e43c7 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
@@ -386,17 +386,4 @@ public class JsonToRowDataConverters implements 
Serializable {
             }
         };
     }
-
-    /** Exception which refers to parse errors in converters. */
-    private static final class JsonParseException extends RuntimeException {
-        private static final long serialVersionUID = 1L;
-
-        public JsonParseException(String message) {
-            super(message);
-        }
-
-        public JsonParseException(String message, Throwable cause) {
-            super(message, cause);
-        }
-    }
 }

Reply via email to