Repository: flink Updated Branches: refs/heads/release-1.2 c6a807250 -> 75db91e62
[hotfix] [kafka] Add log info for partitions subscribed by FlinkKafkaConsumer subtasks Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75db91e6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75db91e6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75db91e6 Branch: refs/heads/release-1.2 Commit: 75db91e62342db1be15020d41c3e23f15b2aca36 Parents: c6a8072 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Sat Mar 25 14:03:03 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Sat Mar 25 14:07:17 2017 +0800 ---------------------------------------------------------------------- .../kafka/FlinkKafkaConsumerBase.java | 23 ++++++++++---------- 1 file changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/75db91e6/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index bfc347f..bdeb478 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -324,10 +324,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); } - LOG.info("Setting restore state in the FlinkKafkaConsumer."); - if (LOG.isDebugEnabled()) { - LOG.debug("Using the following offsets: {}", restoreToOffset); - } + LOG.info("Setting restore state in the FlinkKafkaConsumer for consumer subtask {}: {}", + getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset); } else if (restoreToOffset.isEmpty()) { restoreToOffset = null; } @@ -387,15 +385,10 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti @Override public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) { - LOG.info("{} (taskIdx={}) restoring offsets from an older version.", - getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask()); + LOG.info("{} (taskIdx={}) restoring offsets from an older version: {}", + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoredOffsets); restoreToOffset = restoredOffsets; - - if (LOG.isDebugEnabled()) { - LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}", - getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset); - } } @Override @@ -492,6 +485,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti for (Map.Entry<KafkaTopicPartition, Long> restoredPartitionState : restoreToOffset.entrySet()) { subscribedPartitions.add(restoredPartitionState.getKey()); } + + LOG.info("Consumer subtask {} will use the partitions in restored state as subscribed partitions: {}", + getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitions); } else { List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics); @@ -508,11 +504,16 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti } }); + LOG.info("Fetched a total of {} kafka partitions: {}", kafkaTopicPartitions.size(), kafkaTopicPartitions); + subscribedPartitions = new ArrayList<>( (kafkaTopicPartitions.size() / getRuntimeContext().getNumberOfParallelSubtasks()) + 1); for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) { subscribedPartitions.add(kafkaTopicPartitions.get(i)); } + + LOG.info("Consumer subtask {} will subscribe to {} partitions: {}", + getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitions.size(), subscribedPartitions); } }
