Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2756#discussion_r200818673 --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java --- @@ -80,22 +81,24 @@ public void refresh() { List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex); Set<Partition> curr = _managers.keySet(); - Set<Partition> newPartitions = new HashSet<Partition>(mine); + Set<Partition> newPartitions = new HashSet<>(mine); newPartitions.removeAll(curr); - Set<Partition> deletedPartitions = new HashSet<Partition>(curr); + Set<Partition> deletedPartitions = new HashSet<>(curr); deletedPartitions.removeAll(mine); - LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); + LOG.info("{}Deleted partition managers: {}", + taskId(_taskIndex, _totalTasks), deletedPartitions); - Map<Integer, PartitionManager> deletedManagers = new HashMap<>(); + Map<TopicAndPartition, PartitionManager> deletedManagers = new HashMap<>(); for (Partition id : deletedPartitions) { - deletedManagers.put(id.partition, _managers.remove(id)); + PartitionManager manager = _managers.remove(id); + manager.close(); + deletedManagers.put(new TopicAndPartition(id.topic, id.partition), manager); } - for (PartitionManager manager : deletedManagers.values()) { - if (manager != null) manager.close(); - } - LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); + + LOG.info("{}New partition managers: {}", --- End diff -- Missing space
---