Sébastien Launay created KAFKA-4740:
---------------------------------------

             Summary: Using new consumer API with a Deserializer that throws 
SerializationException can lead to infinite loop
                 Key: KAFKA-4740
                 URL: https://issues.apache.org/jira/browse/KAFKA-4740
             Project: Kafka
          Issue Type: Bug
          Components: clients, consumer
    Affects Versions: 0.10.1.1, 0.10.1.0, 0.10.0.1, 0.10.0.0, 0.9.0.1, 0.9.0.0
         Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on 
the broker version)
Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
            Reporter: Sébastien Launay
            Priority: Critical


The old consumer supports deserializing records into typed objects and throws a 
{{SerializationException}} through {{MessageAndMetadata#key()}} and 
{{MessageAndMetadata#message()}} that can be catched by the client \[1\].

When using the new consumer API with kafka-clients version < 0.10.0.1, such the 
exception is swallowed by the {{NetworkClient}} class and result in an infinite 
loop which the client has no control over like:
{noformat}
DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset 
for partition test2-0 to earliest offset.
DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 
for partition test2-0
ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
completion:
org.apache.kafka.common.errors.SerializationException: Size of data received by 
IntegerDeserializer is not 4
ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
completion:
org.apache.kafka.common.errors.SerializationException: Size of data received by 
IntegerDeserializer is not 4
...
{noformat}

Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another 
issue still remains.
Indeed, the client can now catch the {{SerializationException}} but the next 
call to {{Consumer#poll(long)}} will throw the same exception indefinitely.

The following snippet (full example available on Github \[2\] for most released 
kafka-clients versions):
{code:java}
try (KafkaConsumer<String, Integer> kafkaConsumer = new 
KafkaConsumer<>(consumerConfig, new StringDeserializer(), new 
IntegerDeserializer())) {
    kafkaConsumer.subscribe(Arrays.asList("topic"));

    // Will run till the shutdown hook is called
    while (!doStop) {
        try {
            ConsumerRecords<String, Integer> records = kafkaConsumer.poll(1000);
            if (!records.isEmpty()) {
                logger.info("Got {} messages", records.count());
                for (ConsumerRecord<String, Integer> record : records) {
                    logger.info("Message with partition: {}, offset: {}, key: 
{}, value: {}",
                    record.partition(), record.offset(), record.key(), 
record.value());

                }
            } else {
                    logger.info("No messages to consume");
            }
        } catch (SerializationException e) {
            logger.warn("Failed polling some records", e);
        }
     }
}
{code}

when run with the following records (third record has an invalid Integer value):
{noformat}
    printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic topic
    printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic topic
    printf "\x00\x00\x00\n"     | bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic topic
    printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic topic
{noformat}

will produce the following logs:
{noformat}
INFO  consumer.Consumer - Got 2 messages
INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, 
value: 0
INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, 
value: 1
WARN  consumer.Consumer - Failed polling some records
org.apache.kafka.common.errors.SerializationException: Error deserializing 
key/value for partition topic-0 at offset 2
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data 
received by IntegerDeserializer is not 4
WARN  consumer.Consumer - Failed polling some records
org.apache.kafka.common.errors.SerializationException: Error deserializing 
key/value for partition topic-0 at offset 2
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data 
received by IntegerDeserializer is not 4
WARN  consumer.Consumer - Failed polling some records
org.apache.kafka.common.errors.SerializationException: Error deserializing 
key/value for partition topic-0 at offset 2
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data 
received by IntegerDeserializer is not 4
...
{noformat}

I don't believe committing offsets would help and even if it did this could 
potentially result in a few well formed records not being consumed from that 
{{ConsumerRecords}} batch (data loss).

I have only seen a few mentions of this bug online \[3\] but I believe this is 
a critical issue as the new consumer API is not in beta anymore yet if you do 
not control producers (that can inject malformed values) or you use some 
advanced deserializer that throws such exception (e.g. schema-registry 
{{KafkaAvroDeserializer}}) then you can end up blocking a consumer from 
advancing in the stream.

Current workarounds:
- use a {{Deserializer}} that do not throw a {{SerializationException}} (e.g.  
{{ByteArrayDeserializer}}, {{StringDeserializer}})
- wrap the {{Deserializer}} to catch and log the {{SerializationException}} but 
return {{null}} and then check for {{null}} in the client code (that's what we 
use on top of {{KafkaAvroDeserializer}} in case there is an issue reaching the 
schema registry or the Avro datum is either invalid or not compatible with the 
reader's schema for some reason)

Potential solutions:
# continue to throw {{SerializationException}} when calling 
{{Consumer#poll(long)}} but skip that malformed record on next 
{{Consumer#poll(long)}}
# do not throw {{SerializationException}} when calling {{Consumer#poll(long)}} 
but expose information about invalid records in {{ConsumerRecords}}
# do not throw {{SerializationException}} when calling {{Consumer#poll(long)}} 
but store the exception(s) in the {{ConsumerRecord}} object record so that it 
is rethrown on  {{ConsumerRecord#key()}} and {{ConsumerRecord#value()}}
# do not deserialize records during {{Consumer#poll()}} but do it when calling 
{{ConsumerRecord#key()}} and {{ConsumerRecord#value()}} (similar to the old 
consumer)

I believe any of those solutions breaks compatibility semantic wise but not 
necessary binary compatibility as the {{SerializationException}} is a 
{{RuntimeException}} so it could be "moved around".
My preference goes to the two last ones and I would be happy to contribute such 
a change as well as update the documentation on {{SerializationException}} to 
reflect that it is not only used for serializing records.

\[1\] 
https://github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/message/MessageAndMetadata.scala
\[1\] 
http://docs.confluent.io/2.0.0/schema-registry/docs/serializer-formatter.html#serializer
\[2\] https://github.com/slaunay/kafka-consumer-serialization-exception-example
\[3\] https://groups.google.com/forum/#!topic/kafka-clients/KBSPmY69H94



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to