[
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887668#comment-15887668
]
ASF GitHub Bot commented on FLINK-3679:
---------------------------------------
Github user rmetzger commented on the issue:
https://github.com/apache/flink/pull/3314
@StephanEwen and I just had an offline discussion about the change, and we
came up with the following thoughts:
Using an `ArrayList` for buffering elements is an "anti-pattern" in Flink,
because it is not a robust solution. Users could theoretically run into the
size limit of an array list, and unnesting large messages (in multiple threads
in the Kafka 0.8 case) can put pressure on the GC. We think that we should try
to avoid that approach if possible.
Alternative approaches we considered (ordered by preference):
- Define the DeserializationSchema so that users can return `null` if the
user doesn't want to emit a record.
This approach would not change the current approach, and is pretty minimal.
Of course, it would not allow for the "unnesting" use case, where you want to
emit multiple records from one Kafka message. Users would need to deserialize
into a nested structure and use a flatMap afterwards to do the un-nesting.
- Move the deserialization into the checkpoint lock. This would allow us to
collect elements into our internal collector from the user collector while
still preserving exactly once semantics.
This change would probably be a bit more involved code-wise, as we need to
rearrange some parts (maybe moving the deserialization schema instance into the
emitRecord() method, change of some method signatures).
A downside of this approach would be that the Kafka 0.8 consumer threads
would deserialize records in a sequential order (since only one consumer thread
can hold the lock at a time). For Kafka 0.9 this is already the case. I think
we can live with that, because the majority of users moved away from kafka 0.8
by now.
- Use the `ArrayList` approach. Users would potentially run into issues and
we would loose some of Flink's robustness.
@jgrier since you've opened the original JIRA back then, what's your take
on the discussion? How bad would it be for users to just allow the `null` or
record approach? (Other opinions are of course also appreciated)
> DeserializationSchema should handle zero or more outputs for every input
> ------------------------------------------------------------------------
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, Kafka Connector
> Reporter: Jamie Grier
> Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think
> should be improved. This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one
> mapping between input and outputs. In reality there are scenarios where one
> input message (say from Kafka) might actually map to zero or more logical
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a
> source (such as Kafka) and say the raw bytes don't deserialize properly.
> Right now the only recourse is to throw IOException and therefore fail the
> job.
> This is definitely not good since bad data is a reality and failing the job
> is not the right option. If the job fails we'll just end up replaying the
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty
> set.
> The other case is where one input message should logically be multiple output
> messages. This case is probably less important since there are other ways to
> do this but in general it might be good to make the
> DeserializationSchema.deserialize() method return a collection rather than a
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics
> more like that of FlatMap.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)