Repository: kafka Updated Branches: refs/heads/trunk 287e45ab4 -> 0a7b20e28
HOTFIX: fix partition ordering in assignment workround partition ordering not preserved by the consumer group management. guozhangwang Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #868 from ymatsuda/partitionOrder Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a7b20e2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a7b20e2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a7b20e2 Branch: refs/heads/trunk Commit: 0a7b20e2863b2519e63d85bc83610e63c58c6d46 Parents: 287e45a Author: Yasuhiro Matsuda <[email protected]> Authored: Thu Feb 4 14:14:08 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Feb 4 14:14:08 2016 -0800 ---------------------------------------------------------------------- .../internals/StreamPartitionAssignor.java | 47 +++++++++++++++++--- 1 file changed, 42 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0a7b20e2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 74770a5..e600cf7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -50,6 +50,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -62,6 +63,34 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class); + private static class AssignedPartition implements Comparable<AssignedPartition> { + public final TaskId taskId; + public final TopicPartition partition; + + public AssignedPartition(TaskId taskId, TopicPartition partition) { + this.taskId = taskId; + this.partition = partition; + } + + @Override + public int compareTo(AssignedPartition that) { + return PARTITION_COMPARATOR.compare(this.partition, that.partition); + } + } + + private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() { + @Override + public int compare(TopicPartition p1, TopicPartition p2) { + int result = p1.topic().compareTo(p2.topic()); + + if (result != 0) { + return result; + } else { + return p1.partition() < p2.partition() ? -1 : (p1.partition() > p2.partition() ? 1 : 0); + } + } + }; + private StreamThread streamThread; private int numStandbyReplicas; @@ -341,20 +370,18 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } final int numConsumers = consumers.size(); - List<TaskId> active = new ArrayList<>(); Map<TaskId, Set<TopicPartition>> standby = new HashMap<>(); int i = 0; for (String consumer : consumers) { - List<TopicPartition> activePartitions = new ArrayList<>(); + ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>(); final int numTaskIds = taskIds.size(); for (int j = i; j < numTaskIds; j += numConsumers) { TaskId taskId = taskIds.get(j); if (j < numActiveTasks) { for (TopicPartition partition : partitionsForTask.get(taskId)) { - activePartitions.add(partition); - active.add(taskId); + assignedPartitions.add(new AssignedPartition(taskId, partition)); } } else { Set<TopicPartition> standbyPartitions = standby.get(taskId); @@ -366,6 +393,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } } + Collections.sort(assignedPartitions); + List<TaskId> active = new ArrayList<>(); + List<TopicPartition> activePartitions = new ArrayList<>(); + for (AssignedPartition partition : assignedPartitions) { + active.add(partition.taskId); + activePartitions.add(partition.partition); + } + AssignmentInfo data = new AssignmentInfo(active, standby); assignment.put(consumer, new Assignment(activePartitions, data.encode())); i++; @@ -441,7 +476,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable @Override public void onAssignment(Assignment assignment) { - List<TopicPartition> partitions = assignment.partitions(); + List<TopicPartition> partitions = new ArrayList<>(assignment.partitions()); + + Collections.sort(partitions, PARTITION_COMPARATOR); AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); this.standbyTasks = info.standbyTasks;
