[
https://issues.apache.org/jira/browse/FLINK-17575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104722#comment-17104722
]
Maxim Malykhin commented on FLINK-17575:
----------------------------------------
I didn't find a way to tell the kafka client to skip recording, in
kafka-streaming this is done on the streaming side. Since flink is engaged in
reading, the solution was only to roll back to the kafka client version
0.10.2.2, which does not generate a given exception and flink does not crash.
{code:java}
DataStream<GeoRecord> stream = env.addSource(
new FlinkKafkaConsumer010<>(
new ArrayList<>(topics.keySet()),
new DeSerPairOfGeoRecords(topics),
properties
)
.setStartFromGroupOffsets()
.setCommitOffsetsOnCheckpoints(true)
)
{code}
> KafkaException: Received exception when fetching the next record from XXX-14.
> If needed, please seek past the record to continue consumption.
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17575
> URL: https://issues.apache.org/jira/browse/FLINK-17575
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.10.0
> Reporter: Maxim Malykhin
> Priority: Major
>
> Stream don't catch exception for bad packet in kafka, exception terminate
> thread.
> {code:java}
> org.apache.kafka.common.KafkaException: Received exception when fetching the
> next record from prod_netscout_ug_s1mme-14. If needed, please seek past the
> record to continue consumption.org.apache.kafka.common.KafkaException:
> Received exception when fetching the next record from
> prod_netscout_ug_s1mme-14. If needed, please seek past the record to continue
> consumption. at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1522)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:253)Caused
> by: org.apache.kafka.common.KafkaException: Record batch for partition
> prod_netscout_ug_s1mme-14 at offset 22405513471 is invalid, cause: Record is
> corrupt (stored crc = 995400728, computed crc = 4153305836) at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1435)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1479)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
> ... 7 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)