Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    @StephanEwen and I just had an offline discussion about the change, and we 
came up with the following thoughts:
    
    Using an `ArrayList` for buffering elements is an "anti-pattern" in Flink, 
because it is not a robust solution. Users could theoretically run into the 
size limit of an array list, and unnesting large messages (in multiple threads 
in the Kafka 0.8 case) can put pressure on the GC. We think that we should try 
to avoid that approach if possible.
    
    Alternative approaches we considered (ordered by preference):
    - Define the DeserializationSchema so that users can return `null` if the 
user doesn't want to emit a record.
    This approach would not change the current approach, and is pretty minimal. 
Of course, it would not allow for the "unnesting" use case, where you want to 
emit multiple records from one Kafka message. Users would need to deserialize 
into a nested structure and use a flatMap afterwards to do the un-nesting.
    - Move the deserialization into the checkpoint lock. This would allow us to 
collect elements into our internal collector from the user collector while 
still preserving exactly once semantics.
    This change would probably be a bit more involved code-wise, as we need to 
rearrange some parts (maybe moving the deserialization schema instance into the 
emitRecord() method, change of some method signatures).
    A downside of this approach would be that the Kafka 0.8 consumer threads 
would deserialize records in a sequential order (since only one consumer thread 
can hold the lock at a time). For Kafka 0.9 this is already the case. I think 
we can live with that, because the majority of users moved away from kafka 0.8 
by now.
    - Use the `ArrayList` approach. Users would potentially run into issues and 
we would loose some of Flink's robustness.
    
    @jgrier since you've opened the original JIRA back then, what's your take 
on the discussion? How bad would it be for users to just allow the `null` or 
record approach? (Other opinions are of course also appreciated)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to