Repository: storm
Updated Branches:
  refs/heads/1.x-branch c46036930 -> c70d7d49f


STORM-2722: close the JMSSpout in the tests when done


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c70d7d49
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c70d7d49
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c70d7d49

Branch: refs/heads/1.x-branch
Commit: c70d7d49fbd36a7ab4c95c75205e1311f8fc41e2
Parents: c460369
Author: Robert (Bobby) Evans <[email protected]>
Authored: Mon Sep 18 14:50:07 2017 -0500
Committer: Stig Rohde Døssing <[email protected]>
Committed: Sat Nov 11 12:45:15 2017 +0100

----------------------------------------------------------------------
 .../apache/storm/jms/spout/JmsSpoutTest.java    | 72 +++++++++++---------
 1 file changed, 41 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c70d7d49/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java 
b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
index da312da..b6406c8 100644
--- 
a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
+++ 
b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
@@ -45,28 +45,33 @@ public class JmsSpoutTest {
     @Test
     public void testFailure() throws JMSException, Exception {
         JmsSpout spout = new JmsSpout();
-        JmsProvider mockProvider = new MockJmsProvider();
-        MockSpoutOutputCollector mockCollector = new 
MockSpoutOutputCollector();
-        SpoutOutputCollector collector =
+        try {
+            JmsProvider mockProvider = new MockJmsProvider();
+            MockSpoutOutputCollector mockCollector = new 
MockSpoutOutputCollector();
+            SpoutOutputCollector collector =
                 new SpoutOutputCollector(mockCollector);
-        spout.setJmsProvider(new MockJmsProvider());
-        spout.setJmsTupleProducer(new MockTupleProducer());
-        spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
-        spout.setRecoveryPeriodMs(10); // Rapid recovery for testing.
-        spout.open(new HashMap<>(), null, collector);
-        ConnectionFactory connectionFactory = mockProvider.connectionFactory();
-        Destination destination = mockProvider.destination();
-        Message msg = this.sendMessage(connectionFactory, destination);
-        Thread.sleep(100);
-        spout.nextTuple(); // Pretend to be storm.
-        Assert.assertTrue(mockCollector.emitted);
+            spout.setJmsProvider(new MockJmsProvider());
+            spout.setJmsTupleProducer(new MockTupleProducer());
+            spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
+            spout.setRecoveryPeriodMs(10); // Rapid recovery for testing.
+            spout.open(new HashMap<>(), null, collector);
+            ConnectionFactory connectionFactory = 
mockProvider.connectionFactory();
+            Destination destination = mockProvider.destination();
+            Message msg = this.sendMessage(connectionFactory, destination);
+            Thread.sleep(100);
+            LOG.info("Calling nextTuple on the spout...");
+            spout.nextTuple(); // Pretend to be storm.
+            Assert.assertTrue(mockCollector.emitted);
 
-        mockCollector.reset();
-        spout.fail(msg.getJMSMessageID()); // Mock failure
-        Thread.sleep(5000);
-        spout.nextTuple(); // Pretend to be storm.
-        Thread.sleep(5000);
-        Assert.assertTrue(mockCollector.emitted); // Should have been 
re-emitted
+            mockCollector.reset();
+            spout.fail(msg.getJMSMessageID()); // Mock failure
+            Thread.sleep(5000);
+            spout.nextTuple(); // Pretend to be storm.
+            Thread.sleep(5000);
+            Assert.assertTrue(mockCollector.emitted); // Should have been 
re-emitted
+        } finally {
+            spout.close();
+        }
     }
 
     @Test
@@ -87,21 +92,26 @@ public class JmsSpoutTest {
     @Test
     public void testOpenWorksMultipleTypesOfNumberObjects() throws Exception {
         JmsSpout spout = new JmsSpout();
-        spout.setJmsProvider(new MockJmsProvider());
-        spout.setJmsTupleProducer(new MockTupleProducer());
-        Map<String, Object> configuration = new HashMap<String, Object>();
-        MockSpoutOutputCollector delegateCollector =
+        try {
+            spout.setJmsProvider(new MockJmsProvider());
+            spout.setJmsTupleProducer(new MockTupleProducer());
+            Map<String, Object> configuration = new HashMap<String, Object>();
+            MockSpoutOutputCollector delegateCollector =
                 new MockSpoutOutputCollector();
-        SpoutOutputCollector collector =
+            SpoutOutputCollector collector =
                 new SpoutOutputCollector(delegateCollector);
 
-        // Test with long value
-        configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000L);
-        spout.open(configuration, null, collector);
+            // Test with long value
+            configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000L);
+            spout.open(configuration, null, collector);
+            spout.close();
 
-        // Test with integer value
-        configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000);
-        spout.open(configuration, null, collector);
+            // Test with integer value
+            configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000);
+            spout.open(configuration, null, collector);
+        } finally {
+            spout.close();
+        }
     }
 
     public Message sendMessage(ConnectionFactory connectionFactory,

Reply via email to