[ 
https://issues.apache.org/jira/browse/FLINK-23595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loyi updated FLINK-23595:
-------------------------
    Description: 
I am migirating old flink-stream system to flink-sql , however it occurs an 
Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.  

 
{noformat}
Exception stack:

Caused by: java.io.IOException: Failed to deserialize JSON 
'{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[?:?]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: 
Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS 
to allow
 at [Source: UNKNOWN; line: 1, column: 310]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[?:?]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
(END)
 {noformat}
 

Suggestion:

I notice that *JsonRowDataDeserializationSchema* using * *jackson* * as default 
implemention, and jackson doesn't enable parsing non-numeric number by default. 
For backward compatibility, we could add option `allow-non-numeric-numbers` to 
enable this feature.

 

 

  was:
I am migirating old flink-stream system to flink-sql , however it occurs an 
Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.  

 
{noformat}
Exception stack:

Caused by: java.io.IOException: Failed to deserialize JSON 
'{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[?:?]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: 
Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS 
to allow
 at [Source: UNKNOWN; line: 1, column: 310]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142)
 ~[?:?]
        at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
 ~[?:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
 ~[?:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
 ~[?:?]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
(END)




 {noformat}
 

Suggestion:

I notice that *JsonRowDataDeserializationSchema* using * *jackson* * as default 
implemention, and jackson doesn't enable parsing non-numeric number by default. 
For backward compatibility, we could add option `allow-non-numeric-numbers` to 
enable this feature.

 

 


> Allow JSON format deserialize non-numeric numbers
> -------------------------------------------------
>
>                 Key: FLINK-23595
>                 URL: https://issues.apache.org/jira/browse/FLINK-23595
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.13.1
>            Reporter: loyi
>            Priority: Major
>
> I am migirating old flink-stream system to flink-sql , however it occurs an 
> Excpetion when serializing json with *non-numeric (NaN、Infinite)* numbers.  
>  
> {noformat}
> Exception stack:
> Caused by: java.io.IOException: Failed to deserialize JSON 
> '{"uniqueId":"6974697215254697525","monitorKey":"live-log-list","indicatorMap":{"record-retry-rate":NaN,"request-latency-max":-Infinity,"request-latency-avg":NaN,"buffer-available-bytes":3.3554432E7,"waiting-threads":0.0,"record-error-rate":NaN},"tagMap":{"_aggregate":"RAW","host":"live-log-list-001"},"periodInMs":0,"dataTime":1627903774962,"receiveTime":1627903774965}'.
>         at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:149)
>  ~[?:?]
>         at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
>  ~[?:?]
>         at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> Caused by: 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
>  Non-standard token 'NaN': enable 
> JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow
>  at [Source: UNKNOWN; line: 1, column: 310]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2669)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1094)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:268)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:277)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:16)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4635)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:142)
>  ~[?:?]
>         at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:81)
>  ~[?:?]
>         at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>  ~[?:?]
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> (END)
>  {noformat}
>  
> Suggestion:
> I notice that *JsonRowDataDeserializationSchema* using * *jackson* * as 
> default implemention, and jackson doesn't enable parsing non-numeric number 
> by default. For backward compatibility, we could add option 
> `allow-non-numeric-numbers` to enable this feature.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to