[
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876573#comment-15876573
]
ASF GitHub Bot commented on FLINK-3679:
---------------------------------------
Github user haohui commented on a diff in the pull request:
https://github.com/apache/flink/pull/3314#discussion_r102299038
--- Diff:
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {
keyPayload.get(keyBytes);
}
- final T value =
deserializer.deserialize(keyBytes, valueBytes,
-
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-
- if
(deserializer.isEndOfStream(value)) {
- // remove
partition from subscribed partitions.
-
partitionsIterator.remove();
- continue
partitionsLoop;
- }
-
- owner.emitRecord(value,
currentPartition, offset);
+ final Collector<T>
collector = new Collector<T>() {
--- End diff --
Totally agree. Playing around a little bit and it might require some
trade-offs here.
The problem is that `emitRecord()` needs the state for each records (e.g.,
topic partition, offset, etc.). The state can be either passed inside a closure
(like the new instance for the `Collector`) or passed through arguments. I see
there are three possibilities here:
1. Create a new instance of `Collector` for every record. The JVM may or
may not be able to optimize it. Trace-based JVM should be able to but I'm not
sure about classed-based JVM.
2. Expose the internal state in the `collect()` call. The `collect()` call
takes additional parameters such as offset and partition state. It reduces the
GC overheads but also hinders changing the implementation.
3. Create a new interface like `Optional<T> deserialize(byte[] messageKey,
...)` (or
`void deserialize(byte[] messageKey, ..., AtomicReference<T> result)` to
optimize away the cost of the `Optional` class). It results in a slightly more
complex APIs but it probably has the best trade-offs between performances and
API compatibility.
What do you think?
> DeserializationSchema should handle zero or more outputs for every input
> ------------------------------------------------------------------------
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, Kafka Connector
> Reporter: Jamie Grier
> Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think
> should be improved. This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one
> mapping between input and outputs. In reality there are scenarios where one
> input message (say from Kafka) might actually map to zero or more logical
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a
> source (such as Kafka) and say the raw bytes don't deserialize properly.
> Right now the only recourse is to throw IOException and therefore fail the
> job.
> This is definitely not good since bad data is a reality and failing the job
> is not the right option. If the job fails we'll just end up replaying the
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty
> set.
> The other case is where one input message should logically be multiple output
> messages. This case is probably less important since there are other ways to
> do this but in general it might be good to make the
> DeserializationSchema.deserialize() method return a collection rather than a
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics
> more like that of FlatMap.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)