Julian Reichinger created KAFKA-13278:
-----------------------------------------

             Summary: Deserialization behavior of the Fetcher class does not 
match up with API contract of the Deserializer interface
                 Key: KAFKA-13278
                 URL: https://issues.apache.org/jira/browse/KAFKA-13278
             Project: Kafka
          Issue Type: Bug
          Components: clients, documentation
    Affects Versions: 2.6.0
            Reporter: Julian Reichinger


The documentation of the
{noformat}
org.apache.kafka.common.serialization.Deserializer{noformat}
interface states that implementations have to expect null byte-arrays and 
should handle them in a meaningful way.

However, at least in the kafka client it seems to be impossible to actually get 
a null value into a deserializer because the class
{noformat}
org.apache.kafka.clients.consumer.internals.Fetcher{noformat}
does not call the registered deserializer in case of a null value.
{code:java}
private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                             RecordBatch batch,
                                             Record record) {
        try {
            long offset = record.offset();
            long timestamp = record.timestamp();
            Optional<Integer> leaderEpoch = 
maybeLeaderEpoch(batch.partitionLeaderEpoch());
            TimestampType timestampType = batch.timestampType();
            Headers headers = new RecordHeaders(record.headers());
            ByteBuffer keyBytes = record.key();
            byte[] keyByteArray = keyBytes == null ? null : 
Utils.toArray(keyBytes);
            K key = keyBytes == null ? null : 
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
            ByteBuffer valueBytes = record.value();
            byte[] valueByteArray = valueBytes == null ? null : 
Utils.toArray(valueBytes);
            V value = valueBytes == null ? null : 
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
            return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
                                        timestamp, timestampType, 
record.checksumOrNull(),
                                        keyByteArray == null ? 
ConsumerRecord.NULL_SIZE : keyByteArray.length,
                                        valueByteArray == null ? 
ConsumerRecord.NULL_SIZE : valueByteArray.length,
                                        key, value, headers, leaderEpoch);
        } catch (RuntimeException e) {
            throw new SerializationException("Error deserializing key/value for 
partition " + partition +
                    " at offset " + record.offset() + ". If needed, please seek 
past the record to continue consumption.", e);
        }
    }
{code}
I implemented an ErrorHandlingDeserializer which I use to wrap the actual 
deserializers and which records the result (value or exception) in a container 
object.
{code:java}
/**
 * Handles exceptions during de-serializations thrown by a delegate {@link 
Deserializer}.
 *
 * @param <T> type of the deserialized object
 */
final class ErrorHandlingDeserializer<T> implements Deserializer<ReadResult<T>> 
{

  private final Deserializer<Envelope<T>> delegate;

  private ErrorHandlingDeserializer(Deserializer<Envelope<T>> delegate) {
    this.delegate = requireNonNull(delegate);
  }

  static <T> ErrorHandlingDeserializer<T> wrap(Deserializer<Envelope<T>> 
delegate) {
    return new ErrorHandlingDeserializer<>(delegate);
  }

  @Override
  public ReadResult<T> deserialize(String topic, @Nullable byte[] data) {
    try {
      return ReadResult.successful(delegate.deserialize(topic, data));
    } catch (Exception e) {
      return ReadResult.failed(e);
    }
  }
}
{code}
This deserializer cannot produce a null value. However, because of the Fetcher 
behavior I still have to check for null values in the consumer records at every 
usage and additionally I also have to check for a null value inside the 
ReadResult container class, because the Deserializer API says so and I have no 
guarantee that the Fetcher behavior will never change.

In my opinion this behavior is a bug, because everyone implementing a 
Deserializer would expect to actually receive null values (for example in case 
of deletions). There should either be a guarantee on the client side that 
Deserializers always receive null values or that they never receive null values.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to