Repository: storm Updated Branches: refs/heads/master 67f13b530 -> b246d3741
STORM-2850: Make ManualPartitionSubscription call rebalance listener on revoke hook before assigning new partitions to the consumer Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c71f4c35 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c71f4c35 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c71f4c35 Branch: refs/heads/master Commit: c71f4c351511b8683b7b28528259774a7f11c3de Parents: 67f13b5 Author: Stig Rohde Døssing <[email protected]> Authored: Fri Dec 8 12:44:44 2017 +0100 Committer: Stig Rohde Døssing <[email protected]> Committed: Fri Dec 8 12:44:44 2017 +0100 ---------------------------------------------------------------------- .../ManualPartitionSubscription.java | 2 +- .../ManualPartitionSubscriptionTest.java | 77 ++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c71f4c35/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java index 32376d4..ebfd30c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java @@ -56,11 +56,11 @@ public class ManualPartitionSubscription extends Subscription { Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); if (!newAssignment.equals(currentAssignment)) { - consumer.assign(newAssignment); if (currentAssignment != null) { listener.onPartitionsRevoked(currentAssignment); } currentAssignment = newAssignment; + consumer.assign(newAssignment); listener.onPartitionsAssigned(newAssignment); } } http://git-wip-us.apache.org/repos/asf/storm/blob/c71f4c35/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java new file mode 100644 index 0000000..8a7b316 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.task.TopologyContext; +import org.junit.Test; +import org.mockito.InOrder; + +public class ManualPartitionSubscriptionTest { + + @Test + public void testCanReassignPartitions() { + ManualPartitioner partitionerMock = mock(ManualPartitioner.class); + TopicFilter filterMock = mock(TopicFilter.class); + KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class); + ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class); + TopologyContext contextMock = mock(TopologyContext.class); + ManualPartitionSubscription subscription = new ManualPartitionSubscription(partitionerMock, filterMock); + + List<TopicPartition> onePartition = Collections.singletonList(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0)); + List<TopicPartition> twoPartitions = new ArrayList<>(); + twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0)); + twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1)); + when(partitionerMock.partition(anyList(), any(TopologyContext.class))) + .thenReturn(onePartition) + .thenReturn(twoPartitions); + + //Set the first assignment + subscription.subscribe(consumerMock, listenerMock, contextMock); + + InOrder inOrder = inOrder(consumerMock, listenerMock); + inOrder.verify(consumerMock).assign(new HashSet<>(onePartition)); + inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(onePartition)); + + clearInvocations(consumerMock, listenerMock); + + //Update to set the second assignment + subscription.refreshAssignment(); + + //The partition revocation hook must be called before the new partitions are assigned to the consumer, + //to allow the revocation hook to commit offsets for the revoked partitions. + inOrder.verify(listenerMock).onPartitionsRevoked(new HashSet<>(onePartition)); + inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions)); + inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(twoPartitions)); + } + +}
