All:
I have a kind of semantic question on how the kafka client library is behaving 
when:

1. You are providing you own Serialization/Deserialization class by setting up 
this prop (value.serializer)

2. While deserializing, I am throwing a serialization exception



Based on my tests, not only does Kafka client swallow that exception (just log 
it), but it will try again to reprocess on the next poll basically behaving 
like a “poison pill”. Is this a bug or it is really intended, it really does 
not make sense to me. Below are the details:



1. the custom class deserializer throw an exception

2. this exception is caught by org.apache.kafka.clients.consumer.internals. 
Fetcher.parseRecord (line 625) and rethrown

3. the rethrown exception is then caught and logged by 
org.apache.kafka.clients.poll (line 275)

4. on the next poll, we get the same issue allover again.










Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
      at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:257)
      at 
org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:246)
      at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
      at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
      at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
      at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
      at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
      at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
      at 
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:274)
      at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:176)
      at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
      at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
      at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
      at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
      at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
      at 
com.morningstar.dp.messaging.common.serialization.avro.AvroNoSchemaGenericSerde.deserialize(AvroNoSchemaGenericSerde.java:72)
      at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:622)
      at 
org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:566)
      at 
org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
      at 
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
      at 
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
      at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
      at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
      at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
      at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
      at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
      at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
      at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
      at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
      at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311)
      at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890)
      at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)


Reply via email to