[
https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
loyi updated FLINK-23595:
-------------------------
Description:
I am migirating old flink-stream system to flink-sql , however it occurs an
Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.
{noformat}
Exception stack:
Caused by: java.io.IOException: Failed to deserialize JSON
'{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
~[?:?]
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
~[?:?]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
~[?:?]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS
to allow
at [Source: UNKNOWN; line: 1, column: 310]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142)
~[?:?]
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
~[?:?]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
~[?:?]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
(END)
{noformat}
Suggestion:
I notice that *JsonRowDataDeserializationSchema* using * *jackson* * as default
implemention, and jackson doesn't enable parsing non-numeric number by default.
For backward compatibility, we could add option `allow-non-numeric-numbers` to
enable this feature.
was:
I am migirating old flink-stream system to flink-sql , however it occurs an
Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.
{noformat}
Exception stack:
Caused by: java.io.IOException: Failed to deserialize JSON
'{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
~[?:?]
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
~[?:?]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
~[?:?]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS
to allow
at [Source: UNKNOWN; line: 1, column: 310]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142)
~[?:?]
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
~[?:?]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
~[?:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
~[?:?]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
(END)
{noformat}
Suggestion:
I notice that *JsonRowDataDeserializationSchema* using * *jackson* * as default
implemention, and jackson doesn't enable parsing non-numeric number by default.
For backward compatibility, we could add option `allow-non-numeric-numbers` to
enable this feature.
> Allow JSON format deserialize non-numeric numbers
> -------------------------------------------------
>
> Key: FLINK-23595
> URL: https://issues.apache.org/jira/browse/FLINK-23595
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.13.1
> Reporter: loyi
> Priority: Major
>
> I am migirating old flink-stream system to flink-sql , however it occurs an
> Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.
>
> {noformat}
> Exception stack:
> Caused by: java.io.IOException: Failed to deserialize JSON
> '{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
> ~[?:?]
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
> ~[?:?]
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
> ~[?:?]
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> ~[?:?]
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> ~[?:?]
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> ~[?:?]
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> Caused by:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
> Non-standard token 'NaN': enable
> JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow
> at [Source: UNKNOWN; line: 1, column: 310]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142)
> ~[?:?]
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
> ~[?:?]
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
> ~[?:?]
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> ~[?:?]
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> ~[?:?]
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> ~[?:?]
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> (END)
> {noformat}
>
> Suggestion:
> I notice that *JsonRowDataDeserializationSchema* using * *jackson* * as
> default implemention, and jackson doesn't enable parsing non-numeric number
> by default. For backward compatibility, we could add option
> `allow-non-numeric-numbers` to enable this feature.
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)