Frédérik ROULEAU created KAFKA-19051:
----------------------------------------

             Summary: Fix implicit acknowledgement cannot be override when 
RecordDeserializationException occurs
                 Key: KAFKA-19051
                 URL: https://issues.apache.org/jira/browse/KAFKA-19051
             Project: Kafka
          Issue Type: Sub-task
          Components: consumer
            Reporter: Frédérik ROULEAU


When a record generates a RecordDeserializationException, KIP mentioned that 
with explicit acknowledgement the default Release can be overridden.
When tried, I have:
{code:java}
Exception in thread "main" java.lang.IllegalStateException: The record cannot 
be acknowledged.
    at 
org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123)
    at 
org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683)
    at 
org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534)
    at org.example.frouleau.kip932.Main.main(Main.java:62) {code}
It looks like the record was already released.

Code used:
{code:java}
//....
} catch (RecordDeserializationException re) {
    long offset = re.offset();
    Throwable t = re.getCause();
    LOGGER.error("Failed to deserialize record at partition={} offset={}", 
re.topicPartition().partition(), offset, t);
    ConsumerRecord<String,String> record = new 
ConsumerRecord<>(re.topicPartition().topic(), re.topicPartition().partition(), 
offset, "", "");
    consumer.acknowledge(record, AcknowledgeType.REJECT);
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to