[
https://issues.apache.org/jira/browse/FLINK-20463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17243021#comment-17243021
]
MengYao edited comment on FLINK-20463 at 12/3/20, 9:02 AM:
-----------------------------------------------------------
*I have a problem similar to yours,**I defined a Kafka dynamic table in
SQL-Client. However, due to the incorrect format of some elements in the Kafka
topic, an exception was thrown in SQL-Client. Can we add a configuration item
to ignore these error records?*
version = 1.11.2
module = Table & SQL
My Setps:
*{color:#00875a}// 1、enter the command line{color}*
** $FLINK_HOME/bin/sql-client.sh embedded
***{color:#00875a}// 2、create kafka dynamic table{color}*
*{color:#57d9a3}Flink SQL{color}>* CREATE TABLE kfk_test01 (
** order_id BIGINT, -- 订单ID
** original_price DOUBLE, -- 实付金额
** ctime BIGINT, -- 创建时间
** ts AS TO_TIMESTAMP(FROM_UNIXTIME(ctime, 'yyyy-MM-dd
HH:mm:ss')), -- 使用ctime字段值作为时间戳ts
** WATERMARK FOR ts AS ts - INTERVAL '5' SECOND --
在ts字段上定义5秒延迟的水位线
** ) WITH (
** 'connector' = 'kafka',
** 'topic' = 'test01',
** 'properties.bootstrap.servers' = 'node1:9092',
** 'properties.group.id' = 'testGroup',
** 'format' = 'json',
** 'scan.startup.mode' = 'earliest-offset'
* *);**
*{color:#00875a}// 3、execute query statement{color}*
***{color:#57d9a3}Flink SQL>{color}* **select * from kfk_test01;
*{color:#00875a}// 4、{color}*{color:#00875a}*Encountered a malformed
element causing the query to fail(element context is: NULL or Empty)*{color}
!image-2020-12-03-17-04-01-463.png!
*// 5、Can you add a general configuration item similar to MapReduce
that can skip bad records, it can be used for json and csv*
*例如:{color:#de350b}skip.fail.records=0{color}(0、-1、>0)*
*{color:#de350b}0:The default value of 0 means that bad
records are not allowed to be skipped,{color}*
*{color:#de350b}-1: means that all bad records can be
skipped{color}*
*{color:#de350b}Any number> 0:indicates the maximum acceptable
number of bad records{color}*
was (Author: mengyao):
*I have a problem similar to yours,**I defined a Kafka dynamic table in
SQL-Client. However, due to the incorrect format of some elements in the Kafka
topic, an exception was thrown in SQL-Client. Can we add a configuration item
to ignore these error records?*
version = 1.11.2
module = Table & SQL
My Setps:
*{color:#00875a}// 1、enter the command line{color}*
** $FLINK_HOME/bin/sql-client.sh embedded
***{color:#00875a}// 2、create kafka dynamic table{color}*
*{color:#57d9a3}Flink SQL{color}>* CREATE TABLE kfk_test01 (
** order_id BIGINT, -- 订单ID
** original_price DOUBLE, -- 实付金额
** ctime BIGINT, -- 创建时间
** ts AS TO_TIMESTAMP(FROM_UNIXTIME(ctime, 'yyyy-MM-dd
HH:mm:ss')), -- 使用ctime字段值作为时间戳ts
** WATERMARK FOR ts AS ts - INTERVAL '5' SECOND --
在ts字段上定义5秒延迟的水位线
** ) WITH (
** 'connector' = 'kafka',
** 'topic' = 'test01',
** 'properties.bootstrap.servers' = 'node1:9092',
** 'properties.group.id' = 'testGroup',
** 'format' = 'json',
** 'scan.startup.mode' = 'earliest-offset'
** );**
*{color:#00875a}// 3、execute query statement{color}*
***{color:#57d9a3}Flink SQL>{color}* **select * from kfk_test01;
*{color:#00875a}// 4、{color}*{color:#00875a}*Encountered a malformed
element causing the query to fail(element context is: NULL or Empty)*{color}
*!QQ截图111.jpg!*
*// 5、Can you add a general configuration item similar to MapReduce
that can skip bad records, it can be used for json and csv*
*例如:{color:#de350b}skip.fail.records=0{color}(0、-1、>0)*
*{color:#de350b}0:The default value of 0 means that bad
records are not allowed to be skipped,{color}*
*{color:#de350b}-1: means that all bad records can be
skipped{color}*
*{color:#de350b}Any number> 0:indicates the maximum acceptable
number of bad records{color}*
> flink-1.11.2 -sql cannot ignore exception record
> ------------------------------------------------
>
> Key: FLINK-20463
> URL: https://issues.apache.org/jira/browse/FLINK-20463
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.11.2
> Environment: <flink.version>1.11.2</flink.version>
> <scala.binary.version>2.11</scala.binary.version>
> Reporter: 谢波
> Priority: Major
> Attachments: image-2020-12-03-17-04-01-463.png
>
>
> can Flink SQL provide an option to ignore exception record?
> I have a table that maps kafka data in json format.
> When parsing the exception data, an exception is thrown, but the data is
> valid JSON, not a valid record.
> {color:#FF0000}exception data:{"SHEET":[""]}{color}
> {color:#FF0000}my table:{color}
> CREATE TABLE offline
> (
> SHEET ROW (
> HEADER MAP < STRING, STRING >,
> ITEM ROW (
> AMOUNT STRING,
> COST STRING,
> GOODSID STRING,
> SALEVALUE STRING,
> SAP_RTMATNR STRING,
> SAP_RTPLU STRING,
> SERIALID STRING,
> SHEETID STRING
> ) ARRAY,
> ITEM5 MAP < STRING, STRING > ARRAY,
> ITEM1 MAP < STRING, STRING > ARRAY,
> TENDER MAP < STRING, STRING > ARRAY
> ) ARRAY
> )
> WITH (
> 'connector' = 'kafka',
> 'properties.bootstrap.servers' = 'xxx:9092',
> 'properties.group.id' = 'realtime.sales.offline.group',
> 'topic' = 'bms133',
> 'format' = 'json',
> {color:#FF0000}'json.ignore-parse-errors' = 'true',{color}
> 'scan.startup.mode' = 'earliest-offset'
> );
> {color:#FF0000}exception:{color}
> Caused by: java.lang.NullPointerExceptionCaused by:
> java.lang.NullPointerException at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:116)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
> at
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:129)
> at
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90)
> at
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:51)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)