lamber-ken commented on issue #8583: [FLINK-11820][serialization] 
SimpleStringSchema handle message record which value is null
URL: https://github.com/apache/flink/pull/8583#issuecomment-500272524
 
 
   Hi, @aljoscha @GJL, from `Kafka09Fetcher#runFetchLoop`, we can see that it 
needs to deserialize the kafka value first , and call `emitRecord` method after.
   ```
   while (running) {
        // this blocks until we get the next records
        // it automatically re-throws exceptions encountered in the consumer 
thread
        final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
   
        // get the records for each topic partition
        for (KafkaTopicPartitionState<TopicPartition> partition : 
subscribedPartitionStates()) {
   
                List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                                
records.records(partition.getKafkaPartitionHandle());
   
                for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
   
                        final T value = deserializer.deserialize(record);
   
                        if (deserializer.isEndOfStream(value)) {
                                // end of stream signaled
                                running = false;
                                break;
                        }
   
                        // emit the actual record. this also updates offset 
state atomically
                        // and deals with timestamps and watermark generation
                        emitRecord(value, partition, record.offset(), record);
                }
        }
   }
   ```
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to