[ https://issues.apache.org/jira/browse/FLINK-3102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15045110#comment-15045110 ]
ASF GitHub Bot commented on FLINK-3102: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1437#discussion_r46836078 --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java --- @@ -372,34 +372,28 @@ public void open(Configuration parameters) throws Exception { throw new RuntimeException("Requested unknown offset store " + offsetStore); } - // set up operator state - lastOffsets = new long[partitions.length]; - commitedOffsets = new long[partitions.length]; - - Arrays.fill(lastOffsets, OFFSET_NOT_SET); - Arrays.fill(commitedOffsets, OFFSET_NOT_SET); - + commitedOffsets = new HashMap<>(); + // seek to last known pos, from restore request if (restoreToOffset != null) { if (LOG.isInfoEnabled()) { - LOG.info("Consumer {} found offsets from previous checkpoint: {}", - thisComsumerIndex, Arrays.toString(restoreToOffset)); + LOG.info("Consumer {} is restored from previous checkpoint: {}", + thisComsumerIndex, KafkaTopicPartition.toString(restoreToOffset)); } - for (int i = 0; i < restoreToOffset.length; i++) { - long restoredOffset = restoreToOffset[i]; - if (restoredOffset != OFFSET_NOT_SET) { - // if this fails because we are not subscribed to the topic, then the - // partition assignment is not deterministic! - - // we set the offset +1 here, because seek() is accepting the next offset to read, - // but the restore offset is the last read offset - fetcher.seek(new TopicPartition(topic, i), restoredOffset + 1); - lastOffsets[i] = restoredOffset; - } + for(Map.Entry<KafkaTopicPartition, Long> restorePartition: restoreToOffset.entrySet()) { + // seek fetcher to restore position + // we set the offset +1 here, because seek() is accepting the next offset to read, + // but the restore offset is the last read offset + fetcher.seek(restorePartition.getKey(), restorePartition.getValue() + 1); } + // initialize offsets with restored state + this.lastOffsets = restoreToOffset; --- End diff -- For GC friendlyness, how about setting `restoreToOffset = null;` > Allow reading from multiple topics with one FlinkKafkaConsumer > -------------------------------------------------------------- > > Key: FLINK-3102 > URL: https://issues.apache.org/jira/browse/FLINK-3102 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector > Reporter: Robert Metzger > Assignee: Robert Metzger > > Currently, a Kafka consumer allows to read from only one topic. > For cases where multiple topics contain messages with the same schema, it is > useful to allow to subscribe to many topics using one FlinkKafkaConsumer > instance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)