Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102830609
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
                                                                
keyPayload.get(keyBytes);
                                                        }
     
    -                                                   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
    -                                                                   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -                                                   
    -                                                   if 
(deserializer.isEndOfStream(value)) {
    -                                                           // remove 
partition from subscribed partitions.
    -                                                           
partitionsIterator.remove();
    -                                                           continue 
partitionsLoop;
    -                                                   }
    -                                                   
    -                                                   owner.emitRecord(value, 
currentPartition, offset);
    +                                                   final Collector<T> 
collector = new Collector<T>() {
    --- End diff --
    
    Good catch, @tzulitai !
    
    I tried the buffer approach and had no luck. The problem is that calling 
`emitRecord`needs to pass in both the offset and the record itself -- The 
record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer 
itself needs to buffer the deserialized value and the record itself -- it 
cannot solve the problem of having a collector per record.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to