Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102299038
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
                                                                
keyPayload.get(keyBytes);
                                                        }
     
    -                                                   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
    -                                                                   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -                                                   
    -                                                   if 
(deserializer.isEndOfStream(value)) {
    -                                                           // remove 
partition from subscribed partitions.
    -                                                           
partitionsIterator.remove();
    -                                                           continue 
partitionsLoop;
    -                                                   }
    -                                                   
    -                                                   owner.emitRecord(value, 
currentPartition, offset);
    +                                                   final Collector<T> 
collector = new Collector<T>() {
    --- End diff --
    
    Totally agree. Playing around a little bit and it might require some 
trade-offs here.
    
    The problem is that `emitRecord()` needs the state for each records (e.g., 
topic partition, offset, etc.). The state can be either passed inside a closure 
(like the new instance for the `Collector`) or passed through arguments. I see 
there are three possibilities here:
    
    1. Create a new instance of `Collector` for every record. The JVM may or 
may not be able to optimize it. Trace-based JVM should be able to but I'm not 
sure about classed-based JVM.
    
    2. Expose the internal state in the `collect()` call. The `collect()` call 
takes additional parameters such as offset and partition state. It reduces the 
GC overheads but also hinders changing the implementation.
    
    3. Create a new interface like `Optional<T> deserialize(byte[] messageKey, 
...)` (or
    `void deserialize(byte[] messageKey, ..., AtomicReference<T> result)` to 
optimize away the cost of the `Optional` class). It results in a slightly more 
complex APIs but it probably has the best trade-offs between performances and 
API compatibility.
    
    What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to