[ 
https://issues.apache.org/jira/browse/CAMEL-14980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17147518#comment-17147518
 ] 

Ramu commented on CAMEL-14980:
------------------------------

we can get rid of these type of issues by implementing deadletterqueue for 
kafka .

we can implement a failure-strategy  similar to kafka-connect

failure-strategy  : Specify the failure strategy to apply when a message 
produced from a record is nacked. Values can be fail (default), ignore, or 
dead-letter-queue

Type: string

supports 3 strategies:

fail - fail the application, no more records will be processed. (default) The 
offset of the record that has not been processed correctly is not committed.

ignore - the failure is logged, but the processing continue. The offset of the 
record that has not been processed correctly is committed.

dead-letter-queue - the offset of the record that has not been processed 
correctly is committed, but the record is written to a (Kafka) dead letter 
topic.

The strategy is selected using the failure-strategy attribute.

In the case of dead-letter-queue, you can configure the following attributes:

dead-letter-queue.topic: the topic to use to write the records not processed 
correctly, default is dead-letter-topic-$channel, with $channel being the name 
of the channel.

dead-letter-queue.key.serializer: the serializer used to write the record key 
on the dead letter queue. By default, it deduces the serializer from the key 
deserializer.

dead-letter-queue.value.serializer: the serializer used to write the record 
value on the dead letter queue. By default, it deduces the serializer from the 
value deserializer.

The record written on the dead letter queue contains the dead-letter-reason 
header with the nack reason (message from the exception passed to the nack 
method). It may also contain the dead-letter-cause with the message from the 
cause, if any.

your comments welcome

> camel-kafka - SerializationException - consumer keeps leaving and rejoining 
> the group
> -------------------------------------------------------------------------------------
>
>                 Key: CAMEL-14980
>                 URL: https://issues.apache.org/jira/browse/CAMEL-14980
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.2.0
>            Reporter: joseph m'bimbi-bene
>            Assignee: Ramu
>            Priority: Major
>             Fix For: 3.5.0
>
>         Attachments: camel-kafka-errors.txt, poc_camel_kafka.tar.gz
>
>
> Hello everyone,
>  
> I found out i few days ago that if a `SerializationException` is thrown when 
> the consumer tries to poll messages, it will keep leaving and joining the 
> consumer-group indefinitely and without any informative log.
>  The exception cannot either be handled by any camel exception handler.
> After some searching in the code i found out the culprit:
> {code:java}
> // org.apache.camel.component.kafka.KafkaConsumer (ligns 406-415):
> catch (KafkaException e) {
>   // some kind of error in kafka, it may happen during
>   // unsubscribing or during normal processing
>   if (unsubscribing){             
>     getExceptionHandler().handleException("Error unsubscribing " + threadId + 
> " from kafka topic " + topicName, e);   
>   }else {
>     LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will 
> attempt to re-connect on next run", threadId, topicName, e.getMessage());
>     reConnect = true;
>   }
> }
> {code}
>  
> `SerializationException` extends from `KafkaException`, but it is definitely 
> not a recoverable exception.
> It logs with debug level, which makes it hard to track, there are SO many 
> things logging in debug.
> It it cannot be handled by any camel exception handling mechanism.
> I think it would be better to either:
>  - change that catch so that it pinpoints the subclasses of `KafkaException` 
> that are actually recoverable from rejoining (maybe `WakeupException` and a 
> couple others)
>  - add a `catch` block for `SerializationException` and maybe 
> `ConfigException` and `OAuthBearerConfigException` before, with a log error 
> andallow the user to handle those exceptions
>  - remove that catch block entirely and let users handle any KafkaException 
> however they see fit.
> Thank you



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to