Julian Reichinger created KAFKA-13278: -----------------------------------------
Summary: Deserialization behavior of the Fetcher class does not match up with API contract of the Deserializer interface Key: KAFKA-13278 URL: https://issues.apache.org/jira/browse/KAFKA-13278 Project: Kafka Issue Type: Bug Components: clients, documentation Affects Versions: 2.6.0 Reporter: Julian Reichinger The documentation of the {noformat} org.apache.kafka.common.serialization.Deserializer{noformat} interface states that implementations have to expect null byte-arrays and should handle them in a meaningful way. However, at least in the kafka client it seems to be impossible to actually get a null value into a deserializer because the class {noformat} org.apache.kafka.clients.consumer.internals.Fetcher{noformat} does not call the registered deserializer in case of a null value. {code:java} private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) { try { long offset = record.offset(); long timestamp = record.timestamp(); Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch()); TimestampType timestampType = batch.timestampType(); Headers headers = new RecordHeaders(record.headers()); ByteBuffer keyBytes = record.key(); byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes); K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray); ByteBuffer valueBytes = record.value(); byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, record.checksumOrNull(), keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, key, value, headers, leaderEpoch); } catch (RuntimeException e) { throw new SerializationException("Error deserializing key/value for partition " + partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e); } } {code} I implemented an ErrorHandlingDeserializer which I use to wrap the actual deserializers and which records the result (value or exception) in a container object. {code:java} /** * Handles exceptions during de-serializations thrown by a delegate {@link Deserializer}. * * @param <T> type of the deserialized object */ final class ErrorHandlingDeserializer<T> implements Deserializer<ReadResult<T>> { private final Deserializer<Envelope<T>> delegate; private ErrorHandlingDeserializer(Deserializer<Envelope<T>> delegate) { this.delegate = requireNonNull(delegate); } static <T> ErrorHandlingDeserializer<T> wrap(Deserializer<Envelope<T>> delegate) { return new ErrorHandlingDeserializer<>(delegate); } @Override public ReadResult<T> deserialize(String topic, @Nullable byte[] data) { try { return ReadResult.successful(delegate.deserialize(topic, data)); } catch (Exception e) { return ReadResult.failed(e); } } } {code} This deserializer cannot produce a null value. However, because of the Fetcher behavior I still have to check for null values in the consumer records at every usage and additionally I also have to check for a null value inside the ReadResult container class, because the Deserializer API says so and I have no guarantee that the Fetcher behavior will never change. In my opinion this behavior is a bug, because everyone implementing a Deserializer would expect to actually receive null values (for example in case of deletions). There should either be a guarantee on the client side that Deserializers always receive null values or that they never receive null values. -- This message was sent by Atlassian Jira (v8.3.4#803005)