Repository: flink
Updated Branches:
  refs/heads/release-1.2 c6a807250 -> 75db91e62


[hotfix] [kafka] Add log info for partitions subscribed by FlinkKafkaConsumer 
subtasks


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75db91e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75db91e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75db91e6

Branch: refs/heads/release-1.2
Commit: 75db91e62342db1be15020d41c3e23f15b2aca36
Parents: c6a8072
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sat Mar 25 14:03:03 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Sat Mar 25 14:07:17 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/75db91e6/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index bfc347f..bdeb478 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -324,10 +324,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                        restoreToOffset.put(kafkaOffset.f0, 
kafkaOffset.f1);
                                }
 
-                               LOG.info("Setting restore state in the 
FlinkKafkaConsumer.");
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Using the following offsets: 
{}", restoreToOffset);
-                               }
+                               LOG.info("Setting restore state in the 
FlinkKafkaConsumer for consumer subtask {}: {}",
+                                       
getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
                        } else if (restoreToOffset.isEmpty()) {
                                restoreToOffset = null;
                        }
@@ -387,15 +385,10 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
 
        @Override
        public void restoreState(HashMap<KafkaTopicPartition, Long> 
restoredOffsets) {
-               LOG.info("{} (taskIdx={}) restoring offsets from an older 
version.",
-                       getClass().getSimpleName(), 
getRuntimeContext().getIndexOfThisSubtask());
+               LOG.info("{} (taskIdx={}) restoring offsets from an older 
version: {}",
+                       getClass().getSimpleName(), 
getRuntimeContext().getIndexOfThisSubtask(), restoredOffsets);
 
                restoreToOffset = restoredOffsets;
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("{} (taskIdx={}) restored offsets from an 
older Flink version: {}",
-                               getClass().getSimpleName(), 
getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
-               }
        }
 
        @Override
@@ -492,6 +485,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        for (Map.Entry<KafkaTopicPartition, Long> 
restoredPartitionState : restoreToOffset.entrySet()) {
                                
subscribedPartitions.add(restoredPartitionState.getKey());
                        }
+
+                       LOG.info("Consumer subtask {} will use the partitions 
in restored state as subscribed partitions: {}",
+                               getRuntimeContext().getIndexOfThisSubtask(), 
subscribedPartitions);
                } else {
                        List<KafkaTopicPartition> kafkaTopicPartitions = 
getKafkaPartitions(topics);
 
@@ -508,11 +504,16 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                }
                        });
 
+                       LOG.info("Fetched a total of {} kafka partitions: {}", 
kafkaTopicPartitions.size(), kafkaTopicPartitions);
+
                        subscribedPartitions = new ArrayList<>(
                                (kafkaTopicPartitions.size() / 
getRuntimeContext().getNumberOfParallelSubtasks()) + 1);
                        for (int i = 
getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i 
+= getRuntimeContext().getNumberOfParallelSubtasks()) {
                                
subscribedPartitions.add(kafkaTopicPartitions.get(i));
                        }
+
+                       LOG.info("Consumer subtask {} will subscribe to {} 
partitions: {}",
+                               getRuntimeContext().getIndexOfThisSubtask(), 
subscribedPartitions.size(), subscribedPartitions);
                }
        }
 

Reply via email to