[ 
https://issues.apache.org/jira/browse/FLINK-20463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MengYao updated FLINK-20463:
----------------------------
    Attachment:     (was: 无标题111.png)

> flink-1.11.2 -sql cannot ignore exception record
> ------------------------------------------------
>
>                 Key: FLINK-20463
>                 URL: https://issues.apache.org/jira/browse/FLINK-20463
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.2
>         Environment: <flink.version>1.11.2</flink.version>
> <scala.binary.version>2.11</scala.binary.version>
>            Reporter: 谢波
>            Priority: Major
>
> can Flink SQL provide an option to ignore exception record?
> I have a table that maps kafka data in json format.
> When parsing the exception data, an exception is thrown, but the data is 
> valid JSON, not a valid record.
> {color:#FF0000}exception data:{"SHEET":[""]}{color}
> {color:#FF0000}my table:{color}
> CREATE TABLE offline
> (
>  SHEET ROW (
>  HEADER MAP < STRING, STRING >,
>  ITEM ROW (
>  AMOUNT STRING,
>  COST STRING,
>  GOODSID STRING,
>  SALEVALUE STRING,
>  SAP_RTMATNR STRING,
>  SAP_RTPLU STRING,
>  SERIALID STRING,
>  SHEETID STRING
>  ) ARRAY,
>  ITEM5 MAP < STRING, STRING > ARRAY,
>  ITEM1 MAP < STRING, STRING > ARRAY,
>  TENDER MAP < STRING, STRING > ARRAY
>  ) ARRAY
> )
> WITH (
>  'connector' = 'kafka',
>  'properties.bootstrap.servers' = 'xxx:9092',
>  'properties.group.id' = 'realtime.sales.offline.group',
>  'topic' = 'bms133',
>  'format' = 'json',
>  {color:#FF0000}'json.ignore-parse-errors' = 'true',{color}
>  'scan.startup.mode' = 'earliest-offset'
> );
> {color:#FF0000}exception:{color}
> Caused by: java.lang.NullPointerExceptionCaused by: 
> java.lang.NullPointerException at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:116)
>  at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
>  at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:129)
>  at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90)
>  at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:51)
>  at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
>  at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
>  at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>  at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to