Repository: storm
Updated Branches:
  refs/heads/1.x-branch d5ead6681 -> 3b5f9e7c7


STORM-3013: Keep KafkaConsumer open when storm-kafka-client spout is 
deactivated, in order to keep metrics working


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b7e19c7b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b7e19c7b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b7e19c7b

Branch: refs/heads/1.x-branch
Commit: b7e19c7b238da2ab4e89c09db2a60f61ba3ccfe6
Parents: 8a16fec
Author: Stig Rohde Døssing <s...@apache.org>
Authored: Fri Mar 30 19:12:01 2018 +0200
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Tue Jul 10 10:50:57 2018 +0200

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 43 +++++++---------
 .../kafka/spout/metrics/KafkaOffsetMetric.java  |  6 +--
 .../kafka/spout/KafkaSpoutAbstractTest.java     |  6 ++-
 .../kafka/spout/KafkaSpoutReactivationTest.java | 54 ++++++++++++++------
 .../kafka/spout/KafkaSpoutSingleTopicTest.java  |  2 +-
 ...outTopologyDeployActivateDeactivateTest.java |  2 -
 6 files changed, 66 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b7e19c7b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 76c546e..805bbd2 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -24,7 +24,6 @@ import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -103,7 +102,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient Timer refreshSubscriptionTimer;
     private transient TopologyContext context;
     private transient CommitMetadataManager commitMetadataManager;
-    private transient KafkaOffsetMetric kafkaOffsetMetric;
+    private transient KafkaOffsetMetric<K, V> kafkaOffsetMetric;
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
         this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>());
@@ -141,6 +140,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         waitingToEmit = new HashMap<>();
         commitMetadataManager = new CommitMetadataManager(context, 
kafkaSpoutConfig.getProcessingGuarantee());
 
+        kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
+
         tupleListener.open(conf, context);
         if (canRegisterMetrics()) {
             registerMetric();
@@ -192,7 +193,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
 
             if (isAtLeastOnceProcessing()) {
-                commitOffsetsForAckedTuples(new HashSet<>(partitions));
+                commitOffsetsForAckedTuples();
             }
         }
 
@@ -287,7 +288,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
             if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
                 if (isAtLeastOnceProcessing()) {
-                    commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+                    commitOffsetsForAckedTuples();
                 } else if (kafkaSpoutConfig.getProcessingGuarantee() == 
ProcessingGuarantee.NO_GUARANTEE) {
                     Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
                         
createFetchedOffsetsMetadata(kafkaConsumer.assignment());
@@ -517,17 +518,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return offsetsToCommit;
     }
 
-    private void commitOffsetsForAckedTuples(Set<TopicPartition> 
assignedPartitions) {
-        // Find offsets that are ready to be committed for every assigned 
topic partition
-        final Map<TopicPartition, OffsetManager> assignedOffsetManagers = new 
HashMap<>();
-        for (Entry<TopicPartition, OffsetManager> entry : 
offsetManagers.entrySet()) {
-            if (assignedPartitions.contains(entry.getKey())) {
-                assignedOffsetManagers.put(entry.getKey(), entry.getValue());
-            }
-        }
-
+    private void commitOffsetsForAckedTuples() {
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new 
HashMap<>();
-        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : 
assignedOffsetManagers.entrySet()) {
+        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : 
offsetManagers.entrySet()) {
             final OffsetAndMetadata nextCommitOffset = 
tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
             if (nextCommitOffset != null) {
                 nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
@@ -568,7 +561,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     }
                 }
 
-                final OffsetManager offsetManager = 
assignedOffsetManagers.get(tp);
+                final OffsetManager offsetManager = offsetManagers.get(tp);
                 offsetManager.commit(tpOffset.getValue());
                 LOG.debug("[{}] uncommitted offsets for partition [{}] after 
commit", offsetManager.getNumUncommittedOffsets(), tp);
             }
@@ -644,22 +637,20 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void activate() {
         try {
-            subscribeKafkaConsumer();
+            refreshAssignment();
         } catch (InterruptException e) {
             throwKafkaConsumerInterruptedException();
         }
     }
 
-    private void subscribeKafkaConsumer() {
-        kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
-
+    private void refreshAssignment() {
         kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new 
KafkaSpoutConsumerRebalanceListener(), context);
     }
 
     @Override
     public void deactivate() {
         try {
-            shutdown();
+            commitIfNecessary();
         } catch (InterruptException e) {
             throwKafkaConsumerInterruptedException();
         }
@@ -674,11 +665,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
+    private void commitIfNecessary() {
+        if (isAtLeastOnceProcessing()) {
+            commitOffsetsForAckedTuples();
+        }
+    }
+
     private void shutdown() {
         try {
-            if (isAtLeastOnceProcessing()) {
-                commitOffsetsForAckedTuples(kafkaConsumer.assignment());
-            }
+            commitIfNecessary();
         } finally {
             //remove resources
             kafkaConsumer.close();
@@ -743,7 +738,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     @VisibleForTesting
-    KafkaOffsetMetric getKafkaOffsetMetric() {
+    KafkaOffsetMetric<K, V> getKafkaOffsetMetric() {
         return kafkaOffsetMetric;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b7e19c7b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index afe8f74..71fa710 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -51,11 +51,11 @@ import java.util.Set;
  * topicName/totalRecordsInPartitions //total number of records in all the 
associated partitions of this spout
  * </p>
  */
-public class KafkaOffsetMetric implements IMetric {
+public class KafkaOffsetMetric<K, V> implements IMetric {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
     private final Supplier<Map<TopicPartition, OffsetManager>> 
offsetManagerSupplier;
-    private final Supplier<KafkaConsumer> consumerSupplier;
+    private final Supplier<KafkaConsumer<K,V>> consumerSupplier;
 
     public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
         this.offsetManagerSupplier = offsetManagerSupplier;
@@ -66,7 +66,7 @@ public class KafkaOffsetMetric implements IMetric {
     public Object getValueAndReset() {
 
         Map<TopicPartition, OffsetManager> offsetManagers = 
offsetManagerSupplier.get();
-        KafkaConsumer kafkaConsumer = consumerSupplier.get();
+        KafkaConsumer<K,V> kafkaConsumer = consumerSupplier.get();
 
         if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
             LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");

http://git-wip-us.apache.org/repos/asf/storm/blob/b7e19c7b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
index 0467383..8ffa36a 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -56,7 +56,7 @@ public abstract class KafkaSpoutAbstractTest {
     final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
     final long commitOffsetPeriodMs;
 
-    KafkaConsumer<String, String> consumerSpy;
+    private KafkaConsumer<String, String> consumerSpy;
     KafkaSpout<String, String> spout;
 
     @Captor
@@ -84,6 +84,10 @@ public abstract class KafkaSpoutAbstractTest {
 
         simulatedTime = new Time.SimulatedTime();
     }
+    
+    protected KafkaConsumer<String, String> getKafkaConsumer() {
+        return consumerSpy;
+    }
 
     private KafkaConsumerFactory<String, String> createConsumerFactory() {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b7e19c7b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
index c2c46b5..f884479 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -17,7 +17,8 @@
 package org.apache.storm.kafka.spout;
 
 import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS;
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyString;
@@ -35,13 +36,13 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.KafkaUnitRule;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -63,31 +64,24 @@ public class KafkaSpoutReactivationTest {
     private final SpoutOutputCollector collector = 
mock(SpoutOutputCollector.class);
     private final long commitOffsetPeriodMs = 2_000;
     private KafkaConsumer<String, String> consumerSpy;
-    private KafkaConsumer<String, String> postReactivationConsumerSpy;
     private KafkaSpout<String, String> spout;
     private final int maxPollRecords = 10;
 
-    @Before
-    public void setUp() {
+    public void prepareSpout(int messageCount, FirstPollOffsetStrategy 
firstPollOffsetStrategy) throws Exception {
         KafkaSpoutConfig<String, String> spoutConfig =
             SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
                 KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitRule.getKafkaUnit().getKafkaPort(),
                     SingleTopicKafkaSpoutConfiguration.TOPIC))
-                .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
+                .setFirstPollOffsetStrategy(firstPollOffsetStrategy)
                 .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
                 .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords)
                 .build();
         KafkaConsumerFactory<String, String> consumerFactory = new 
KafkaConsumerFactoryDefault<>();
         this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
-        this.postReactivationConsumerSpy = 
spy(consumerFactory.createConsumer(spoutConfig));
         KafkaConsumerFactory<String, String> consumerFactoryMock = 
mock(KafkaConsumerFactory.class);
         when(consumerFactoryMock.createConsumer(any(KafkaSpoutConfig.class)))
-            .thenReturn(consumerSpy)
-            .thenReturn(postReactivationConsumerSpy);
+            .thenReturn(consumerSpy);
         this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock);
-    }
-
-    private void prepareSpout(int messageCount) throws Exception {
         
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
         SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
     }
@@ -100,11 +94,10 @@ public class KafkaSpoutReactivationTest {
         return messageId.getValue();
     }
 
-    @Test
-    public void testSpoutMustHandleReactivationGracefully() throws Exception {
+    private void doReactivationTest(FirstPollOffsetStrategy 
firstPollOffsetStrategy) throws Exception {
         try (Time.SimulatedTime time = new Time.SimulatedTime()) {
             int messageCount = maxPollRecords * 2;
-            prepareSpout(messageCount);
+            prepareSpout(messageCount, firstPollOffsetStrategy);
 
             //Emit and ack some tuples, ensure that some polled tuples remain 
cached in the spout by emitting less than maxPollRecords
             int beforeReactivationEmits = maxPollRecords - 3;
@@ -118,6 +111,7 @@ public class KafkaSpoutReactivationTest {
             //Cycle spout activation
             spout.deactivate();
             
SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, 
commitCapture, beforeReactivationEmits - 1);
+            reset(consumerSpy);
             //Tuples may be acked/failed after the spout deactivates, so we 
have to be able to handle this too
             spout.ack(ackAfterDeactivateMessageId);
             spout.activate();
@@ -133,7 +127,7 @@ public class KafkaSpoutReactivationTest {
             spout.nextTuple();
 
             //Verify that no more tuples are emitted and all tuples are 
committed
-            
SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(postReactivationConsumerSpy,
 commitCapture, messageCount);
+            
SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, 
commitCapture, messageCount);
 
             reset(collector);
             spout.nextTuple();
@@ -142,4 +136,32 @@ public class KafkaSpoutReactivationTest {
 
     }
 
+    @Test
+    public void 
testSpoutShouldResumeWhereItLeftOffWithUncommittedEarliestStrategy() throws 
Exception {
+        //With uncommitted earliest the spout should pick up where it left off 
when reactivating.
+        doReactivationTest(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+    }
+
+    @Test
+    public void testSpoutShouldResumeWhereItLeftOffWithEarliestStrategy() 
throws Exception {
+        //With earliest, the spout should also resume where it left off, 
rather than restart at the earliest offset.
+        doReactivationTest(FirstPollOffsetStrategy.EARLIEST);
+    }
+
+    @Test
+    public void testSpoutMustHandleGettingMetricsWhileDeactivated() throws 
Exception {
+        //Storm will try to get metrics from the spout even while deactivated, 
the spout must be able to handle this
+        prepareSpout(10, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+
+        for (int i = 0; i < 5; i++) {
+            KafkaSpoutMessageId msgId = emitOne();
+            spout.ack(msgId);
+        }
+
+        spout.deactivate();
+
+        Map<String, Long> offsetMetric = (Map<String, Long>) 
spout.getKafkaOffsetMetric().getValueAndReset();
+        assertThat(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + 
"/totalSpoutLag"), is(5L));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b7e19c7b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 0bf9219..9d334a7 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -105,7 +105,7 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
         Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
         spout.ack(failedIdReplayCaptor.getValue());
         spout.nextTuple();
-        verify(consumerSpy).commitSync(commitCapture.capture());
+        verify(getKafkaConsumer()).commitSync(commitCapture.capture());
 
         Map<TopicPartition, OffsetAndMetadata> capturedCommit = 
commitCapture.getValue();
         TopicPartition expectedTp = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);

http://git-wip-us.apache.org/repos/asf/storm/blob/b7e19c7b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
index a860cef..afcefdf 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
@@ -53,8 +53,6 @@ public class KafkaSpoutTopologyDeployActivateDeactivateTest 
extends KafkaSpoutAb
 
         verifyAllMessagesCommitted(1);
 
-        consumerSpy = createConsumerSpy();
-
         spout.activate();
 
         nextTuple_verifyEmitted_ack_resetCollector(1);

Reply via email to