Dhruvil Shah created KAFKA-7045:
-----------------------------------

             Summary: Consumer may not be able to consume all messages when 
down-conversion is required
                 Key: KAFKA-7045
                 URL: https://issues.apache.org/jira/browse/KAFKA-7045
             Project: Kafka
          Issue Type: Bug
          Components: consumer, core
    Affects Versions: 1.0.1, 1.1.0, 0.11.0.2, 1.0.0, 0.11.0.1, 0.11.0.0, 2.0.0
            Reporter: Dhruvil Shah
             Fix For: 2.1.0


When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.


 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

Relevant code from 1.x release that sends `FileRecords` when we are not able to 
down-convert any messages:
```
public ConvertedRecords<? extends Records> downConvert(byte toMagic, long 
firstOffset, Time time) {
 ConvertedRecords<MemoryRecords> convertedRecords = downConvert(batches, 
toMagic, firstOffset, time);
 if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) {
 // This indicates that the message is too large, which means that the buffer 
is not large
 // enough to hold a full record batch. We just return all the bytes in this 
instance.
 // Even though the record batch does not have the right format version, we 
expect old clients
 // to raise an error to the user after reading the record batch size and 
seeing that there
 // are not enough available bytes in the response to read it fully. Note that 
this is
 // only possible prior to KIP-74, after which the broker was changed to always 
return at least
 // one full record batch, even if it requires exceeding the max fetch size 
requested by the client.
 return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);
 } else {
 return convertedRecords;
 }
}
``` 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to