Hmm.. Here we are silently discarding zstd batches from the fetch response. If 
we only have zstd compressed data, then the consumer will be stuck fetching 
from its current offset (we use the returned records to set the next fetch 
position).

It would be better if we could return UNSUPPORTED_COMPRESSION_TYPE as we 
discussed in the KIP so that the client can fail with a clear error. I'm now 
realizing, however, that this is a bit tricky because of the way we lazily 
down-convert. By the time we encounter the first zstd compressed batch, we may 
have already sent some data from the response. By then, it's too late to change 
the error code.

I think I have an idea to address this problem though. In fact, we always 
down-convert the first batch prior to sending any response data. If we find 
that it is compressed with zstd, we can catch the error and return the 
UNSUPPORTED_COMPRESSION_TYPE error for that partition. The tricky case is if 
the first batch is not zstd compressed. For example, if we have something like 
this in the log:

```
------------------------  -------------------------
| offset=0, codec=gzip   |   offset=1, codec=zstd |
------------------------  -------------------------
```

We may down-convert the record at offset 0 and send it to the client before we 
realize that the next batch cannot be down-converted. One option is just to 
close the connection, but then the client will again be stuck fetching from 
offset 0 and it will continually get the same error. But what we can do instead 
is to send a dummy oversized record in place of the zstd-compressed. We already 
have logic to do this inside `LazyDownConversionRecordsSend`. This will allow 
the client to consume the valid data. It will then encounter the oversized 
record and simply discard the data. On its next fetch, the first batch will be 
the zstd compressed 

For example, a fetch at offset=0 would return the following:
```
------------------------  -------------------------
| offset=0, codec=gzip   |   overflow record      |
------------------------  -------------------------
```
The consumer would parse the record at offset 0 and skip over the overflow 
record. Its next fetch would be at offset 1. This time the broker would attempt 
to down-convert the zstd compressed record at offset 1 before sending any 
response data. We can catch the exception and return the 
UNSUPPORTED_COMPRESSION_TYPE error to the user.

Does that sound reasonable? I can help out with this implementation if needed.

[ Full content available at: https://github.com/apache/kafka/pull/2267 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to