Chia-Ping Tsai created KAFKA-7036:
-------------------------------------

             Summary: Complete the docs of KafkaConsumer#poll
                 Key: KAFKA-7036
                 URL: https://issues.apache.org/jira/browse/KAFKA-7036
             Project: Kafka
          Issue Type: Improvement
            Reporter: Chia-Ping Tsai
            Assignee: Chia-Ping Tsai


KafkaConsumer#poll has a nice docs about the expected exceptions. However, it 
lacks the description of SerializationException. Another mirror issue is that 
KafkaConsumer doesn't catch all type of exception which may be thrown by 
deserializer (see below). We should use Throwable to replace the 
RuntimeException so as to catch all exception and then wrap them to 
SerializationException.
{code:java}
private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                         RecordBatch batch,
                                         Record record) {
    try {
        long offset = record.offset();
        long timestamp = record.timestamp();
        TimestampType timestampType = batch.timestampType();
        Headers headers = new RecordHeaders(record.headers());
        ByteBuffer keyBytes = record.key();
        byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
        K key = keyBytes == null ? null : 
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
        ByteBuffer valueBytes = record.value();
        byte[] valueByteArray = valueBytes == null ? null : 
Utils.toArray(valueBytes);
        V value = valueBytes == null ? null : 
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
        return new ConsumerRecord<>(partition.topic(), partition.partition(), 
offset,
                                    timestamp, timestampType, 
record.checksumOrNull(),
                                    keyByteArray == null ? 
ConsumerRecord.NULL_SIZE : keyByteArray.length,
                                    valueByteArray == null ? 
ConsumerRecord.NULL_SIZE : valueByteArray.length,
                                    key, value, headers);
    } catch (RuntimeException e) {
        throw new SerializationException("Error deserializing key/value for 
partition " + partition +
                " at offset " + record.offset() + ". If needed, please seek 
past the record to continue consumption.", e);
    }
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to