[ 
https://issues.apache.org/jira/browse/FLINK-3102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15045110#comment-15045110
 ] 

ASF GitHub Bot commented on FLINK-3102:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46836078
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 ---
    @@ -372,34 +372,28 @@ public void open(Configuration parameters) throws 
Exception {
                                throw new RuntimeException("Requested unknown 
offset store " + offsetStore);
                }
                
    -           // set up operator state
    -           lastOffsets = new long[partitions.length];
    -           commitedOffsets = new long[partitions.length];
    -           
    -           Arrays.fill(lastOffsets, OFFSET_NOT_SET);
    -           Arrays.fill(commitedOffsets, OFFSET_NOT_SET);
    -           
    +           commitedOffsets = new HashMap<>();
    +
                // seek to last known pos, from restore request
                if (restoreToOffset != null) {
                        if (LOG.isInfoEnabled()) {
    -                           LOG.info("Consumer {} found offsets from 
previous checkpoint: {}",
    -                                           thisComsumerIndex,  
Arrays.toString(restoreToOffset));
    +                           LOG.info("Consumer {} is restored from previous 
checkpoint: {}",
    +                                           thisComsumerIndex, 
KafkaTopicPartition.toString(restoreToOffset));
                        }
                        
    -                   for (int i = 0; i < restoreToOffset.length; i++) {
    -                           long restoredOffset = restoreToOffset[i];
    -                           if (restoredOffset != OFFSET_NOT_SET) {
    -                                   // if this fails because we are not 
subscribed to the topic, then the
    -                                   // partition assignment is not 
deterministic!
    -                                   
    -                                   // we set the offset +1 here, because 
seek() is accepting the next offset to read,
    -                                   // but the restore offset is the last 
read offset
    -                                   fetcher.seek(new TopicPartition(topic, 
i), restoredOffset + 1);
    -                                   lastOffsets[i] = restoredOffset;
    -                           }
    +                   for(Map.Entry<KafkaTopicPartition, Long> 
restorePartition: restoreToOffset.entrySet()) {
    +                           // seek fetcher to restore position
    +                           // we set the offset +1 here, because seek() is 
accepting the next offset to read,
    +                           // but the restore offset is the last read 
offset
    +                           fetcher.seek(restorePartition.getKey(), 
restorePartition.getValue() + 1);
                        }
    +                   // initialize offsets with restored state
    +                   this.lastOffsets = restoreToOffset;
    --- End diff --
    
    For GC friendlyness, how about setting `restoreToOffset = null;`


> Allow reading from multiple topics with one FlinkKafkaConsumer
> --------------------------------------------------------------
>
>                 Key: FLINK-3102
>                 URL: https://issues.apache.org/jira/browse/FLINK-3102
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>
> Currently, a Kafka consumer allows to read from only one topic.
> For cases where multiple topics contain messages with the same schema, it is 
> useful to allow to subscribe to many topics using one FlinkKafkaConsumer 
> instance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to