[
https://issues.apache.org/jira/browse/CAMEL-14980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099026#comment-17099026
]
joseph m'bimbi-bene commented on CAMEL-14980:
---------------------------------------------
Hi.
Indeed the serialization error is due to an avro message that the consumer
cannot deserialize and it is the value.
Here are the logs from Camel :
```
2020-05-04 16:55:18.768 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member
consumer-1-3ce58863-302b-4be3-9b86-43a963ffe327 sending LeaveGroup request to
coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:18.768
INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator :
[Consumer clientId=consumer-1, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6]
Member consumer-1-3ce58863-302b-4be3-9b86-43a963ffe327 sending LeaveGroup
request to coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04
16:55:18.866 INFO 13972 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true auto.commit.interval.ms = 5000
auto.offset.reset = latest bootstrap.servers = [http://localhost:9092] [...]
2020-05-04 16:55:18.925 WARN 13972 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig : The configuration
'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
config.2020-05-04 16:55:18.925 INFO 13972 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.12020-05-04
16:55:18.925 INFO 13972 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka commitId:
18a913733fb71c012020-05-04 16:55:18.925 INFO 13972 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs:
15886041189252020-05-04 16:55:18.927 INFO 13972 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 to
topic cont_hist after 5000 ms2020-05-04 16:55:23.928 INFO 13972 ---
[umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer : Subscribing
cont_hist-Thread 0 to topic cont_hist2020-05-04 16:55:23.928 INFO 13972 ---
[umer[cont_hist]] o.a.k.clients.consumer.KafkaConsumer : [Consumer
clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Subscribed
to topic(s): cont_hist2020-05-04 16:55:23.934 INFO 13972 --- [umer[cont_hist]]
org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-2,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Cluster ID:
RN9Osn1cTTONG3zaT0LtOg2020-05-04 16:55:23.936 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Discovered group coordinator
localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:23.937 INFO 13972
--- [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer
clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Revoking
previously assigned partitions []2020-05-04 16:55:23.937 INFO 13972 ---
[umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer
clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining
group2020-05-04 16:55:23.943 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04
16:55:23.949 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Successfully joined group with
generation 32020-05-04 16:55:23.949 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting newly assigned
partitions: cont_hist-02020-05-04 16:55:23.954 INFO 13972 ---
[umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer
clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting
offset for partition cont_hist-0 to the committed offset
FetchPosition\{offset=2, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
epoch=0}}2020-05-04 16:55:23.978 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member
consumer-2-54cba12d-1a38-4a5d-8b50-26f2a6b3868a sending LeaveGroup request to
coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:23.998
INFO 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig :
ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms
= 5000 auto.offset.reset = latest bootstrap.servers = [http://localhost:9092]
[...]
2020-05-04 16:55:24.014 WARN 13972 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig : The configuration
'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
config.2020-05-04 16:55:24.014 INFO 13972 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.12020-05-04
16:55:24.014 INFO 13972 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka commitId:
18a913733fb71c012020-05-04 16:55:24.015 INFO 13972 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs:
15886041240142020-05-04 16:55:24.015 INFO 13972 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 to
topic cont_hist after 5000 ms2020-05-04 16:55:29.015 INFO 13972 ---
[umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer : Subscribing
cont_hist-Thread 0 to topic cont_hist2020-05-04 16:55:29.016 INFO 13972 ---
[umer[cont_hist]] o.a.k.clients.consumer.KafkaConsumer : [Consumer
clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Subscribed
to topic(s): cont_hist2020-05-04 16:55:29.032 INFO 13972 --- [umer[cont_hist]]
org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-3,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Cluster ID:
RN9Osn1cTTONG3zaT0LtOg2020-05-04 16:55:29.032 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Discovered group coordinator
localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:29.033 INFO 13972
--- [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer
clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Revoking
previously assigned partitions []2020-05-04 16:55:29.034 INFO 13972 ---
[umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator : [Consumer
clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining
group2020-05-04 16:55:29.039 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04
16:55:29.048 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Successfully joined group with
generation 52020-05-04 16:55:29.048 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting newly assigned
partitions: cont_hist-02020-05-04 16:55:29.050 INFO 13972 ---
[umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer
clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting
offset for partition cont_hist-0 to the committed offset
FetchPosition\{offset=2, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
epoch=0}}2020-05-04 16:55:29.066 INFO 13972 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3,
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member
consumer-3-3da59636-0a83-4abc-9293-a7e456d65574 sending LeaveGroup request to
coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:29.090
INFO 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig :
ConsumerConfig values: allow.auto.create.topics = true
```
There are no stack trace, it just keep leaving and joining the group.
But here is the message when i place a breakpoint in KafkaConsumer lign 409,
then in the debugger, evaluate `e.printStackTrace` :
```
org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition cont_hist-0 at offset 4. If needed, please seek past
the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 2
Caused by: org.apache.kafka.common.errors.SerializationException: Could not
find class
fr.ing.payment.cardeventnotifier.secure3d.generated.avro.PocContHist2 specified
in writer's schema whilst finding reader's schema for a SpecificRecord.
```
In my re-creation of the error, I deliberately changed the name of the schema
avro of the producer.
Another case: when i send garbage through the console-producer. Here is what i
get when i evaluate `e.printStackTrace()` on the breakpoint:
```
org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition cont_hist-0 at offset 5. If needed, please seek past
the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic
byte!
```
In any case, the consumer leaves and joins the group indefinitely withou any
informative message.
Is that enough or do you need any other information ?
> camel-kafka - SerializationException - consumer keeps leaving and rejoining
> the group
> -------------------------------------------------------------------------------------
>
> Key: CAMEL-14980
> URL: https://issues.apache.org/jira/browse/CAMEL-14980
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.2.0
> Reporter: joseph m'bimbi-bene
> Assignee: Ramu
> Priority: Major
> Fix For: 3.3.0
>
>
> Hello everyone,
>
> I found out i few days ago that if a `SerializationException` is thrown when
> the consumer tries to poll messages, it will keep leaving and joining the
> consumer-group indefinitely and without any informative log.
> The exception cannot either be handled by any camel exception handler.
> After some searching in the code i found out the culprit:
> ``` java
> // org.apache.camel.component.kafka.KafkaConsumer (ligns 406-415):
> catch (KafkaException e) {
> // some kind of error in kafka, it may happen during
> // unsubscribing or during normal processing
> if (unsubscribing) {
> getExceptionHandler().handleException("Error unsubscribing " + threadId +
> " from kafka topic " + topicName, e); }
> else {
> LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
> attempt to re-connect on next run", threadId, topicName, e.getMessage());
> reConnect = true;
> }
> }
> ```
> `SerializationException` extends from `KafkaException`, but it is definitely
> not a recoverable exception.
> It logs with debug level, which makes it hard to track, there are SO many
> things logging in debug.
> It it cannot be handled by any camel exception handling mechanism.
> I think it would be better to either:
> - change that catch so that it pinpoints the subclasses of `KafkaException`
> that are actually recoverable from rejoining (maybe `WakeupException` and a
> couple others)
> - add a `catch` block for `SerializationException` and maybe
> `ConfigException` and `OAuthBearerConfigException` before, with a log error
> andallow the user to handle those exceptions
> - remove that catch block entirely and let users handle any KafkaException
> however they see fit.
> Thank you
--
This message was sent by Atlassian Jira
(v8.3.4#803005)