libenchao commented on a change in pull request #11119: [FLINK-15396][json] 
Support to ignore parse errors for JSON format
URL: https://github.com/apache/flink/pull/11119#discussion_r387438612
 
 

 ##########
 File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 ##########
 @@ -351,76 +379,187 @@ private DeserializationRuntimeConverter 
createFallbackConverter(Class<?> valueTy
                }
        }
 
+       private boolean convertToBoolean(ObjectMapper mapper, JsonNode 
jsonNode) {
+               try {
+                       return jsonNode.asBoolean();
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
boolean.", t);
+               }
+       }
+
+       private String convertToString(ObjectMapper mapper, JsonNode jsonNode) {
+               try {
+                       return jsonNode.asText();
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
string.", t);
+               }
+       }
+
+       private int convertToInt(ObjectMapper mapper, JsonNode jsonNode) {
+               try {
+                       return jsonNode.asInt();
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
int.", t);
+               }
+       }
+
+       private long convertToLong(ObjectMapper mapper, JsonNode jsonNode) {
+               try {
+                       return jsonNode.asLong();
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
long.", t);
+               }
+       }
+
+       private double convertToDouble(ObjectMapper mapper, JsonNode jsonNode) {
+               try {
+                       return jsonNode.asDouble();
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
double.", t);
+               }
+       }
+
+       private float convertToFloat(ObjectMapper mapper, JsonNode jsonNode) {
+               try {
+                       return Float.parseFloat(jsonNode.asText().trim());
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
float.", t);
+               }
+       }
+
+       private short convertToShot(ObjectMapper mapper, JsonNode jsonNode) {
+               try {
+                       return Short.parseShort(jsonNode.asText().trim());
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
short.", t);
+               }
+       }
+
+       private byte convertToByte(ObjectMapper mapper, JsonNode jsonNode) {
+               try {
+                       return Byte.parseByte(jsonNode.asText().trim());
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
byte.", t);
+               }
+       }
+
+       private BigDecimal convertToBigDecimal(ObjectMapper mapper, JsonNode 
jsonNode) {
+               try {
+                       return jsonNode.decimalValue();
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
BigDecimal.", t);
+               }
+       }
+
+       private BigInteger convertToBigInt(ObjectMapper mapper, JsonNode 
jsonNode) {
+               try {
+                       return jsonNode.bigIntegerValue();
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
BigInteger.", t);
+               }
+       }
+
        private LocalDate convertToLocalDate(ObjectMapper mapper, JsonNode 
jsonNode) {
-               return 
ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+               try {
+                       return 
ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
local date.", t);
+               }
        }
 
        private Date convertToDate(ObjectMapper mapper, JsonNode jsonNode) {
-               return Date.valueOf(convertToLocalDate(mapper, jsonNode));
+               try {
+                       return Date.valueOf(convertToLocalDate(mapper, 
jsonNode));
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
date.", t);
+               }
        }
 
        private LocalDateTime convertToLocalDateTime(ObjectMapper mapper, 
JsonNode jsonNode) {
                // according to RFC 3339 every date-time must have a timezone;
                // until we have full timezone support, we only support UTC;
                // users can parse their time as string as a workaround
-               TemporalAccessor parsedTimestamp = 
RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+               try {
+                       TemporalAccessor parsedTimestamp = 
RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
 
-               ZoneOffset zoneOffset = 
parsedTimestamp.query(TemporalQueries.offset());
+                       ZoneOffset zoneOffset = 
parsedTimestamp.query(TemporalQueries.offset());
 
-               if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0) {
-                       throw new IllegalStateException(
-                               "Invalid timestamp format. Only a timestamp in 
UTC timezone is supported yet. " +
-                                       "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
-               }
+                       if (zoneOffset != null && zoneOffset.getTotalSeconds() 
!= 0) {
+                               throw new IllegalStateException(
+                                       "Invalid timestamp format. Only a 
timestamp in UTC timezone is supported yet. " +
+                                               "Format: 
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+                       }
 
-               LocalTime localTime = 
parsedTimestamp.query(TemporalQueries.localTime());
-               LocalDate localDate = 
parsedTimestamp.query(TemporalQueries.localDate());
+                       LocalTime localTime = 
parsedTimestamp.query(TemporalQueries.localTime());
+                       LocalDate localDate = 
parsedTimestamp.query(TemporalQueries.localDate());
 
-               return LocalDateTime.of(localDate, localTime);
+                       return LocalDateTime.of(localDate, localTime);
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
local date time.", t);
+               }
        }
 
        private Timestamp convertToTimestamp(ObjectMapper mapper, JsonNode 
jsonNode) {
-               return Timestamp.valueOf(convertToLocalDateTime(mapper, 
jsonNode));
+               try {
+                       return Timestamp.valueOf(convertToLocalDateTime(mapper, 
jsonNode));
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
timestamp.", t);
+               }
        }
 
        private LocalTime convertToLocalTime(ObjectMapper mapper, JsonNode 
jsonNode) {
                // according to RFC 3339 every full-time must have a timezone;
                // until we have full timezone support, we only support UTC;
                // users can parse their time as string as a workaround
-               TemporalAccessor parsedTime = 
RFC3339_TIME_FORMAT.parse(jsonNode.asText());
 
-               ZoneOffset zoneOffset = 
parsedTime.query(TemporalQueries.offset());
-               LocalTime localTime = 
parsedTime.query(TemporalQueries.localTime());
+               try {
+                       TemporalAccessor parsedTime = 
RFC3339_TIME_FORMAT.parse(jsonNode.asText());
 
-               if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0 || 
localTime.getNano() != 0) {
-                       throw new IllegalStateException(
-                               "Invalid time format. Only a time in UTC 
timezone without milliseconds is supported yet.");
-               }
+                       ZoneOffset zoneOffset = 
parsedTime.query(TemporalQueries.offset());
+                       LocalTime localTime = 
parsedTime.query(TemporalQueries.localTime());
 
-               return localTime;
+                       if (zoneOffset != null && zoneOffset.getTotalSeconds() 
!= 0 || localTime.getNano() != 0) {
+                               throw new IllegalStateException(
+                                       "Invalid time format. Only a time in 
UTC timezone without milliseconds is supported yet.");
+                       }
+
+                       return localTime;
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
local time.", t);
+               }
        }
 
        private Time convertToTime(ObjectMapper mapper, JsonNode jsonNode) {
-               return Time.valueOf(convertToLocalTime(mapper, jsonNode));
+               try {
+                       return Time.valueOf(convertToLocalTime(mapper, 
jsonNode));
+               } catch (Throwable t) {
+                       throw new ParseErrorException("Unable to deserialize 
time.", t);
+               }
        }
 
        private DeserializationRuntimeConverter assembleRowConverter(
                String[] fieldNames,
                List<DeserializationRuntimeConverter> fieldConverters) {
-               return (mapper, jsonNode) -> {
-                       ObjectNode node = (ObjectNode) jsonNode;
-
-                       int arity = fieldNames.length;
-                       Row row = new Row(arity);
-                       for (int i = 0; i < arity; i++) {
-                               String fieldName = fieldNames[i];
-                               JsonNode field = node.get(fieldName);
-                               Object convertField = convertField(mapper, 
fieldConverters.get(i), fieldName, field);
-                               row.setField(i, convertField);
+               try {
+                       return (mapper, jsonNode) -> {
+                               ObjectNode node = (ObjectNode) jsonNode;
+                               int arity = fieldNames.length;
+                               Row row = new Row(arity);
+                               for (int i = 0; i < arity; i++) {
+                                       String fieldName = fieldNames[i];
+                                       JsonNode field = node.get(fieldName);
+                                       Object convertField = 
convertField(mapper, fieldConverters.get(i), fieldName, field);
+                                       row.setField(i, convertField);
+                               }
+
+                               return row;
+                       };
+               } catch (Throwable t) {
 
 Review comment:
   if the exception is from inner field, the exception will duplicate.  
   or we just check jsonNode is a valid ObjectNode, and throw exception if it's 
not? instead of try all process of the conversion?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to