[ 
https://issues.apache.org/jira/browse/CAMEL-14980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099026#comment-17099026
 ] 

joseph m'bimbi-bene commented on CAMEL-14980:
---------------------------------------------

Hi.

Indeed the serialization error is due to an avro message that the consumer 
cannot deserialize and it is the value.

Here are the logs from Camel :

```
2020-05-04 16:55:18.768  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member 
consumer-1-3ce58863-302b-4be3-9b86-43a963ffe327 sending LeaveGroup request to 
coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:18.768  
INFO 13972 --- [umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator  : 
[Consumer clientId=consumer-1, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] 
Member consumer-1-3ce58863-302b-4be3-9b86-43a963ffe327 sending LeaveGroup 
request to coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 
16:55:18.866  INFO 13972 --- [umer[cont_hist]] 
o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:  
allow.auto.create.topics = true auto.commit.interval.ms = 5000 
auto.offset.reset = latest bootstrap.servers = [http://localhost:9092] [...]
2020-05-04 16:55:18.925  WARN 13972 --- [umer[cont_hist]] 
o.a.k.clients.consumer.ConsumerConfig    : The configuration 
'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known 
config.2020-05-04 16:55:18.925  INFO 13972 --- [umer[cont_hist]] 
o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.12020-05-04 
16:55:18.925  INFO 13972 --- [umer[cont_hist]] 
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 
18a913733fb71c012020-05-04 16:55:18.925  INFO 13972 --- [umer[cont_hist]] 
o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 
15886041189252020-05-04 16:55:18.927  INFO 13972 --- [umer[cont_hist]] 
o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0 to 
topic cont_hist after 5000 ms2020-05-04 16:55:23.928  INFO 13972 --- 
[umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer  : Subscribing 
cont_hist-Thread 0 to topic cont_hist2020-05-04 16:55:23.928  INFO 13972 --- 
[umer[cont_hist]] o.a.k.clients.consumer.KafkaConsumer     : [Consumer 
clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Subscribed 
to topic(s): cont_hist2020-05-04 16:55:23.934  INFO 13972 --- [umer[cont_hist]] 
org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-2, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Cluster ID: 
RN9Osn1cTTONG3zaT0LtOg2020-05-04 16:55:23.936  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Discovered group coordinator 
localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:23.937  INFO 13972 
--- [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Revoking 
previously assigned partitions []2020-05-04 16:55:23.937  INFO 13972 --- 
[umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining 
group2020-05-04 16:55:23.943  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04 
16:55:23.949  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Successfully joined group with 
generation 32020-05-04 16:55:23.949  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting newly assigned 
partitions: cont_hist-02020-05-04 16:55:23.954  INFO 13972 --- 
[umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=consumer-2, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting 
offset for partition cont_hist-0 to the committed offset 
FetchPosition\{offset=2, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), 
epoch=0}}2020-05-04 16:55:23.978  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member 
consumer-2-54cba12d-1a38-4a5d-8b50-26f2a6b3868a sending LeaveGroup request to 
coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:23.998  
INFO 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig    : 
ConsumerConfig values:  allow.auto.create.topics = true auto.commit.interval.ms 
= 5000 auto.offset.reset = latest bootstrap.servers = [http://localhost:9092] 
[...]
2020-05-04 16:55:24.014  WARN 13972 --- [umer[cont_hist]] 
o.a.k.clients.consumer.ConsumerConfig    : The configuration 
'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known 
config.2020-05-04 16:55:24.014  INFO 13972 --- [umer[cont_hist]] 
o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.12020-05-04 
16:55:24.014  INFO 13972 --- [umer[cont_hist]] 
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 
18a913733fb71c012020-05-04 16:55:24.015  INFO 13972 --- [umer[cont_hist]] 
o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 
15886041240142020-05-04 16:55:24.015  INFO 13972 --- [umer[cont_hist]] 
o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0 to 
topic cont_hist after 5000 ms2020-05-04 16:55:29.015  INFO 13972 --- 
[umer[cont_hist]] o.a.camel.component.kafka.KafkaConsumer  : Subscribing 
cont_hist-Thread 0 to topic cont_hist2020-05-04 16:55:29.016  INFO 13972 --- 
[umer[cont_hist]] o.a.k.clients.consumer.KafkaConsumer     : [Consumer 
clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Subscribed 
to topic(s): cont_hist2020-05-04 16:55:29.032  INFO 13972 --- [umer[cont_hist]] 
org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-3, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Cluster ID: 
RN9Osn1cTTONG3zaT0LtOg2020-05-04 16:55:29.032  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Discovered group coordinator 
localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:29.033  INFO 13972 
--- [umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Revoking 
previously assigned partitions []2020-05-04 16:55:29.034  INFO 13972 --- 
[umer[cont_hist]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining 
group2020-05-04 16:55:29.039  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] (Re-)joining group2020-05-04 
16:55:29.048  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Successfully joined group with 
generation 52020-05-04 16:55:29.048  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting newly assigned 
partitions: cont_hist-02020-05-04 16:55:29.050  INFO 13972 --- 
[umer[cont_hist]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=consumer-3, groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Setting 
offset for partition cont_hist-0 to the committed offset 
FetchPosition\{offset=2, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), 
epoch=0}}2020-05-04 16:55:29.066  INFO 13972 --- [umer[cont_hist]] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, 
groupId=6dde7e33-87c6-4ea7-a173-49e903d549e6] Member 
consumer-3-3da59636-0a83-4abc-9293-a7e456d65574 sending LeaveGroup request to 
coordinator localhost:9092 (id: 2147483646 rack: null)2020-05-04 16:55:29.090  
INFO 13972 --- [umer[cont_hist]] o.a.k.clients.consumer.ConsumerConfig    : 
ConsumerConfig values:  allow.auto.create.topics = true
```

There are no stack trace, it just keep leaving and joining the group.

But here is the message when i place a breakpoint in KafkaConsumer lign 409, 
then in the debugger, evaluate `e.printStackTrace` :

```
org.apache.kafka.common.errors.SerializationException: Error deserializing 
key/value for partition cont_hist-0 at offset 4. If needed, please seek past 
the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error 
deserializing Avro message for id 2
Caused by: org.apache.kafka.common.errors.SerializationException: Could not 
find class 
fr.ing.payment.cardeventnotifier.secure3d.generated.avro.PocContHist2 specified 
in writer's schema whilst finding reader's schema for a SpecificRecord.
```

In my re-creation of the error, I deliberately changed the name of the schema 
avro of the producer.

Another case: when i send garbage through the console-producer. Here is what i 
get when i evaluate `e.printStackTrace()` on the breakpoint:

```
org.apache.kafka.common.errors.SerializationException: Error deserializing 
key/value for partition cont_hist-0 at offset 5. If needed, please seek past 
the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error 
deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic 
byte!
```

In any case, the consumer leaves and joins the group indefinitely withou any 
informative message.

Is that enough or do you need any other information ?

> camel-kafka - SerializationException - consumer keeps leaving and rejoining 
> the group
> -------------------------------------------------------------------------------------
>
>                 Key: CAMEL-14980
>                 URL: https://issues.apache.org/jira/browse/CAMEL-14980
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.2.0
>            Reporter: joseph m'bimbi-bene
>            Assignee: Ramu
>            Priority: Major
>             Fix For: 3.3.0
>
>
> Hello everyone,
>  
> I found out i few days ago that if a `SerializationException` is thrown when 
> the consumer tries to poll messages, it will keep leaving and joining the 
> consumer-group indefinitely and without any informative log.
>  The exception cannot either be handled by any camel exception handler.
> After some searching in the code i found out the culprit:
> ``` java
>  // org.apache.camel.component.kafka.KafkaConsumer (ligns 406-415):
>  catch (KafkaException e) {
>    // some kind of error in kafka, it may happen during
>    // unsubscribing or during normal processing
>    if (unsubscribing) {        
>     getExceptionHandler().handleException("Error unsubscribing " + threadId + 
> " from kafka topic " + topicName, e);   }
> else {
>      LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will 
> attempt to re-connect on next run", threadId, topicName, e.getMessage());
>      reConnect = true;
>    }
>  }
>  ```
> `SerializationException` extends from `KafkaException`, but it is definitely 
> not a recoverable exception.
> It logs with debug level, which makes it hard to track, there are SO many 
> things logging in debug.
> It it cannot be handled by any camel exception handling mechanism.
> I think it would be better to either:
>  - change that catch so that it pinpoints the subclasses of `KafkaException` 
> that are actually recoverable from rejoining (maybe `WakeupException` and a 
> couple others)
>  - add a `catch` block for `SerializationException` and maybe 
> `ConfigException` and `OAuthBearerConfigException` before, with a log error 
> andallow the user to handle those exceptions
>  - remove that catch block entirely and let users handle any KafkaException 
> however they see fit.
> Thank you



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to