[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874745#comment-15874745
 ] 

ASF GitHub Bot commented on FLINK-3679:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102048656
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
    @@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception {
                                final ConsumerRecords<byte[], byte[]> records = 
handover.pollNext();
     
                                // get the records for each topic partition
    -                           for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitions()) {
    +                           for (final 
KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
     
                                        List<ConsumerRecord<byte[], byte[]>> 
partitionRecords =
                                                        
records.records(partition.getKafkaPartitionHandle());
     
    -                                   for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
    -                                           final T value = 
deserializer.deserialize(
    -                                                           record.key(), 
record.value(),
    -                                                           record.topic(), 
record.partition(), record.offset());
    -
    -                                           if 
(deserializer.isEndOfStream(value)) {
    -                                                   // end of stream 
signaled
    -                                                   running = false;
    -                                                   break;
    -                                           }
    -
    -                                           // emit the actual record. this 
also updates offset state atomically
    -                                           // and deals with timestamps 
and watermark generation
    -                                           emitRecord(value, partition, 
record.offset(), record);
    +                                   for (final ConsumerRecord<byte[], 
byte[]> record : partitionRecords) {
    +                                           final Collector<T> collector = 
new Collector<T>() {
    --- End diff --
    
    Same question as in the Kafka 0.8 impl


> DeserializationSchema should handle zero or more outputs for every input
> ------------------------------------------------------------------------
>
>                 Key: FLINK-3679
>                 URL: https://issues.apache.org/jira/browse/FLINK-3679
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, Kafka Connector
>            Reporter: Jamie Grier
>            Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to