Repository: storm
Updated Branches:
  refs/heads/master 67f13b530 -> b246d3741


STORM-2850: Make ManualPartitionSubscription call  rebalance listener on revoke 
hook before assigning new partitions to the consumer


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c71f4c35
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c71f4c35
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c71f4c35

Branch: refs/heads/master
Commit: c71f4c351511b8683b7b28528259774a7f11c3de
Parents: 67f13b5
Author: Stig Rohde Døssing <[email protected]>
Authored: Fri Dec 8 12:44:44 2017 +0100
Committer: Stig Rohde Døssing <[email protected]>
Committed: Fri Dec 8 12:44:44 2017 +0100

----------------------------------------------------------------------
 .../ManualPartitionSubscription.java            |  2 +-
 .../ManualPartitionSubscriptionTest.java        | 77 ++++++++++++++++++++
 2 files changed, 78 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c71f4c35/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
index 32376d4..ebfd30c 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
@@ -56,11 +56,11 @@ public class ManualPartitionSubscription extends 
Subscription {
         Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
         Set<TopicPartition> newAssignment = new 
HashSet<>(partitioner.partition(allPartitions, context));
         if (!newAssignment.equals(currentAssignment)) {
-            consumer.assign(newAssignment);
             if (currentAssignment != null) {
                 listener.onPartitionsRevoked(currentAssignment);
             }
             currentAssignment = newAssignment;
+            consumer.assign(newAssignment);
             listener.onPartitionsAssigned(newAssignment);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c71f4c35/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
new file mode 100644
index 0000000..8a7b316
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+public class ManualPartitionSubscriptionTest {
+
+    @Test
+    public void testCanReassignPartitions() {
+        ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
+        TopicFilter filterMock = mock(TopicFilter.class);
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        ConsumerRebalanceListener listenerMock = 
mock(ConsumerRebalanceListener.class);
+        TopologyContext contextMock = mock(TopologyContext.class);
+        ManualPartitionSubscription subscription = new 
ManualPartitionSubscription(partitionerMock, filterMock);
+        
+        List<TopicPartition> onePartition = Collections.singletonList(new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+        List<TopicPartition> twoPartitions = new ArrayList<>();
+        twoPartitions.add(new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+        twoPartitions.add(new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1));
+        when(partitionerMock.partition(anyList(), any(TopologyContext.class)))
+            .thenReturn(onePartition)
+            .thenReturn(twoPartitions);
+        
+        //Set the first assignment
+        subscription.subscribe(consumerMock, listenerMock, contextMock);
+        
+        InOrder inOrder = inOrder(consumerMock, listenerMock);
+        inOrder.verify(consumerMock).assign(new HashSet<>(onePartition));
+        inOrder.verify(listenerMock).onPartitionsAssigned(new 
HashSet<>(onePartition));
+        
+        clearInvocations(consumerMock, listenerMock);
+        
+        //Update to set the second assignment
+        subscription.refreshAssignment();
+        
+        //The partition revocation hook must be called before the new 
partitions are assigned to the consumer,
+        //to allow the revocation hook to commit offsets for the revoked 
partitions.
+        inOrder.verify(listenerMock).onPartitionsRevoked(new 
HashSet<>(onePartition));
+        inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions));
+        inOrder.verify(listenerMock).onPartitionsAssigned(new 
HashSet<>(twoPartitions));
+    }
+    
+}

Reply via email to