Hmm.. Here we are silently discarding zstd batches from the fetch response. If we only have zstd compressed data, then the consumer will be stuck fetching from its current offset (we use the returned records to set the next fetch position).
It would be better if we could return UNSUPPORTED_COMPRESSION_TYPE as we discussed in the KIP so that the client can fail with a clear error. I'm now realizing, however, that this is a bit tricky because of the way we lazily down-convert. By the time we encounter the first zstd compressed batch, we may have already sent some data from the response. By then, it's too late to change the error code. I think I have an idea to address this problem though. In fact, we always down-convert the first batch prior to sending any response data. If we find that it is compressed with zstd, we can catch the error and return the UNSUPPORTED_COMPRESSION_TYPE error for that partition. The tricky case is if the first batch is not zstd compressed. For example, if we have something like this in the log: ``` ------------------------ ------------------------- | offset=0, codec=gzip | offset=1, codec=zstd | ------------------------ ------------------------- ``` We may down-convert the record at offset 0 and send it to the client before we realize that the next batch cannot be down-converted. One option is just to close the connection, but then the client will again be stuck fetching from offset 0 and it will continually get the same error. But what we can do instead is to send a dummy oversized record in place of the zstd-compressed. We already have logic to do this inside `LazyDownConversionRecordsSend`. This will allow the client to consume the valid data. It will then encounter the oversized record and simply discard the data. On its next fetch, the first batch will be the zstd compressed For example, a fetch at offset=0 would return the following: ``` ------------------------ ------------------------- | offset=0, codec=gzip | overflow record | ------------------------ ------------------------- ``` The consumer would parse the record at offset 0 and skip over the overflow record. Its next fetch would be at offset 1. This time the broker would attempt to down-convert the zstd compressed record at offset 1 before sending any response data. We can catch the exception and return the UNSUPPORTED_COMPRESSION_TYPE error to the user. Does that sound reasonable? I can help out with this implementation if needed. [ Full content available at: https://github.com/apache/kafka/pull/2267 ] This message was relayed via gitbox.apache.org for [email protected]
