[
https://issues.apache.org/jira/browse/FLINK-17575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104874#comment-17104874
]
Maxim Malykhin commented on FLINK-17575:
----------------------------------------
kafka-streams has DeserializationExceptionHandler, spring has
ErrorHandlingDeserializer, but flink need to rewrite stream code to handle this
exception? or patch flink source? your code example shows a complete change to
the processing structure and manual installation of offsets, which contradicts
the processing architecture, in which the developer does not care about offsets
and their preservation, otherwise you can write stupidly in java and not use
wrappers like flink.
> KafkaException: Received exception when fetching the next record from XXX-14.
> If needed, please seek past the record to continue consumption.
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17575
> URL: https://issues.apache.org/jira/browse/FLINK-17575
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.10.0
> Reporter: Maxim Malykhin
> Priority: Major
>
> Stream don't catch exception for bad packet in kafka, exception terminate
> thread.
> {code:java}
> org.apache.kafka.common.KafkaException: Received exception when fetching the
> next record from prod_netscout_ug_s1mme-14. If needed, please seek past the
> record to continue consumption.org.apache.kafka.common.KafkaException:
> Received exception when fetching the next record from
> prod_netscout_ug_s1mme-14. If needed, please seek past the record to continue
> consumption. at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1522)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:253)Caused
> by: org.apache.kafka.common.KafkaException: Record batch for partition
> prod_netscout_ug_s1mme-14 at offset 22405513471 is invalid, cause: Record is
> corrupt (stored crc = 995400728, computed crc = 4153305836) at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1435)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1479)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
> ... 7 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)