libenchao commented on a change in pull request #11119: [FLINK-15396][json]
Support to ignore parse errors for JSON format
URL: https://github.com/apache/flink/pull/11119#discussion_r387438612
##########
File path:
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
##########
@@ -351,76 +379,187 @@ private DeserializationRuntimeConverter
createFallbackConverter(Class<?> valueTy
}
}
+ private boolean convertToBoolean(ObjectMapper mapper, JsonNode
jsonNode) {
+ try {
+ return jsonNode.asBoolean();
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
boolean.", t);
+ }
+ }
+
+ private String convertToString(ObjectMapper mapper, JsonNode jsonNode) {
+ try {
+ return jsonNode.asText();
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
string.", t);
+ }
+ }
+
+ private int convertToInt(ObjectMapper mapper, JsonNode jsonNode) {
+ try {
+ return jsonNode.asInt();
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
int.", t);
+ }
+ }
+
+ private long convertToLong(ObjectMapper mapper, JsonNode jsonNode) {
+ try {
+ return jsonNode.asLong();
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
long.", t);
+ }
+ }
+
+ private double convertToDouble(ObjectMapper mapper, JsonNode jsonNode) {
+ try {
+ return jsonNode.asDouble();
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
double.", t);
+ }
+ }
+
+ private float convertToFloat(ObjectMapper mapper, JsonNode jsonNode) {
+ try {
+ return Float.parseFloat(jsonNode.asText().trim());
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
float.", t);
+ }
+ }
+
+ private short convertToShot(ObjectMapper mapper, JsonNode jsonNode) {
+ try {
+ return Short.parseShort(jsonNode.asText().trim());
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
short.", t);
+ }
+ }
+
+ private byte convertToByte(ObjectMapper mapper, JsonNode jsonNode) {
+ try {
+ return Byte.parseByte(jsonNode.asText().trim());
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
byte.", t);
+ }
+ }
+
+ private BigDecimal convertToBigDecimal(ObjectMapper mapper, JsonNode
jsonNode) {
+ try {
+ return jsonNode.decimalValue();
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
BigDecimal.", t);
+ }
+ }
+
+ private BigInteger convertToBigInt(ObjectMapper mapper, JsonNode
jsonNode) {
+ try {
+ return jsonNode.bigIntegerValue();
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
BigInteger.", t);
+ }
+ }
+
private LocalDate convertToLocalDate(ObjectMapper mapper, JsonNode
jsonNode) {
- return
ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+ try {
+ return
ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
local date.", t);
+ }
}
private Date convertToDate(ObjectMapper mapper, JsonNode jsonNode) {
- return Date.valueOf(convertToLocalDate(mapper, jsonNode));
+ try {
+ return Date.valueOf(convertToLocalDate(mapper,
jsonNode));
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
date.", t);
+ }
}
private LocalDateTime convertToLocalDateTime(ObjectMapper mapper,
JsonNode jsonNode) {
// according to RFC 3339 every date-time must have a timezone;
// until we have full timezone support, we only support UTC;
// users can parse their time as string as a workaround
- TemporalAccessor parsedTimestamp =
RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+ try {
+ TemporalAccessor parsedTimestamp =
RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
- ZoneOffset zoneOffset =
parsedTimestamp.query(TemporalQueries.offset());
+ ZoneOffset zoneOffset =
parsedTimestamp.query(TemporalQueries.offset());
- if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0) {
- throw new IllegalStateException(
- "Invalid timestamp format. Only a timestamp in
UTC timezone is supported yet. " +
- "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
- }
+ if (zoneOffset != null && zoneOffset.getTotalSeconds()
!= 0) {
+ throw new IllegalStateException(
+ "Invalid timestamp format. Only a
timestamp in UTC timezone is supported yet. " +
+ "Format:
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ }
- LocalTime localTime =
parsedTimestamp.query(TemporalQueries.localTime());
- LocalDate localDate =
parsedTimestamp.query(TemporalQueries.localDate());
+ LocalTime localTime =
parsedTimestamp.query(TemporalQueries.localTime());
+ LocalDate localDate =
parsedTimestamp.query(TemporalQueries.localDate());
- return LocalDateTime.of(localDate, localTime);
+ return LocalDateTime.of(localDate, localTime);
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
local date time.", t);
+ }
}
private Timestamp convertToTimestamp(ObjectMapper mapper, JsonNode
jsonNode) {
- return Timestamp.valueOf(convertToLocalDateTime(mapper,
jsonNode));
+ try {
+ return Timestamp.valueOf(convertToLocalDateTime(mapper,
jsonNode));
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
timestamp.", t);
+ }
}
private LocalTime convertToLocalTime(ObjectMapper mapper, JsonNode
jsonNode) {
// according to RFC 3339 every full-time must have a timezone;
// until we have full timezone support, we only support UTC;
// users can parse their time as string as a workaround
- TemporalAccessor parsedTime =
RFC3339_TIME_FORMAT.parse(jsonNode.asText());
- ZoneOffset zoneOffset =
parsedTime.query(TemporalQueries.offset());
- LocalTime localTime =
parsedTime.query(TemporalQueries.localTime());
+ try {
+ TemporalAccessor parsedTime =
RFC3339_TIME_FORMAT.parse(jsonNode.asText());
- if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0 ||
localTime.getNano() != 0) {
- throw new IllegalStateException(
- "Invalid time format. Only a time in UTC
timezone without milliseconds is supported yet.");
- }
+ ZoneOffset zoneOffset =
parsedTime.query(TemporalQueries.offset());
+ LocalTime localTime =
parsedTime.query(TemporalQueries.localTime());
- return localTime;
+ if (zoneOffset != null && zoneOffset.getTotalSeconds()
!= 0 || localTime.getNano() != 0) {
+ throw new IllegalStateException(
+ "Invalid time format. Only a time in
UTC timezone without milliseconds is supported yet.");
+ }
+
+ return localTime;
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
local time.", t);
+ }
}
private Time convertToTime(ObjectMapper mapper, JsonNode jsonNode) {
- return Time.valueOf(convertToLocalTime(mapper, jsonNode));
+ try {
+ return Time.valueOf(convertToLocalTime(mapper,
jsonNode));
+ } catch (Throwable t) {
+ throw new ParseErrorException("Unable to deserialize
time.", t);
+ }
}
private DeserializationRuntimeConverter assembleRowConverter(
String[] fieldNames,
List<DeserializationRuntimeConverter> fieldConverters) {
- return (mapper, jsonNode) -> {
- ObjectNode node = (ObjectNode) jsonNode;
-
- int arity = fieldNames.length;
- Row row = new Row(arity);
- for (int i = 0; i < arity; i++) {
- String fieldName = fieldNames[i];
- JsonNode field = node.get(fieldName);
- Object convertField = convertField(mapper,
fieldConverters.get(i), fieldName, field);
- row.setField(i, convertField);
+ try {
+ return (mapper, jsonNode) -> {
+ ObjectNode node = (ObjectNode) jsonNode;
+ int arity = fieldNames.length;
+ Row row = new Row(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ JsonNode field = node.get(fieldName);
+ Object convertField =
convertField(mapper, fieldConverters.get(i), fieldName, field);
+ row.setField(i, convertField);
+ }
+
+ return row;
+ };
+ } catch (Throwable t) {
Review comment:
if the exception is from inner field, the exception will duplicate.
or we just check jsonNode is a valid ObjectNode, and throw exception if it's
not? instead of try all process of the conversion?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services