Repository: storm Updated Branches: refs/heads/1.x-branch c46036930 -> c70d7d49f
STORM-2722: close the JMSSpout in the tests when done Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c70d7d49 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c70d7d49 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c70d7d49 Branch: refs/heads/1.x-branch Commit: c70d7d49fbd36a7ab4c95c75205e1311f8fc41e2 Parents: c460369 Author: Robert (Bobby) Evans <[email protected]> Authored: Mon Sep 18 14:50:07 2017 -0500 Committer: Stig Rohde Døssing <[email protected]> Committed: Sat Nov 11 12:45:15 2017 +0100 ---------------------------------------------------------------------- .../apache/storm/jms/spout/JmsSpoutTest.java | 72 +++++++++++--------- 1 file changed, 41 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c70d7d49/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java index da312da..b6406c8 100644 --- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java +++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java @@ -45,28 +45,33 @@ public class JmsSpoutTest { @Test public void testFailure() throws JMSException, Exception { JmsSpout spout = new JmsSpout(); - JmsProvider mockProvider = new MockJmsProvider(); - MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector(); - SpoutOutputCollector collector = + try { + JmsProvider mockProvider = new MockJmsProvider(); + MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector(); + SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector); - spout.setJmsProvider(new MockJmsProvider()); - spout.setJmsTupleProducer(new MockTupleProducer()); - spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); - spout.setRecoveryPeriodMs(10); // Rapid recovery for testing. - spout.open(new HashMap<>(), null, collector); - ConnectionFactory connectionFactory = mockProvider.connectionFactory(); - Destination destination = mockProvider.destination(); - Message msg = this.sendMessage(connectionFactory, destination); - Thread.sleep(100); - spout.nextTuple(); // Pretend to be storm. - Assert.assertTrue(mockCollector.emitted); + spout.setJmsProvider(new MockJmsProvider()); + spout.setJmsTupleProducer(new MockTupleProducer()); + spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); + spout.setRecoveryPeriodMs(10); // Rapid recovery for testing. + spout.open(new HashMap<>(), null, collector); + ConnectionFactory connectionFactory = mockProvider.connectionFactory(); + Destination destination = mockProvider.destination(); + Message msg = this.sendMessage(connectionFactory, destination); + Thread.sleep(100); + LOG.info("Calling nextTuple on the spout..."); + spout.nextTuple(); // Pretend to be storm. + Assert.assertTrue(mockCollector.emitted); - mockCollector.reset(); - spout.fail(msg.getJMSMessageID()); // Mock failure - Thread.sleep(5000); - spout.nextTuple(); // Pretend to be storm. - Thread.sleep(5000); - Assert.assertTrue(mockCollector.emitted); // Should have been re-emitted + mockCollector.reset(); + spout.fail(msg.getJMSMessageID()); // Mock failure + Thread.sleep(5000); + spout.nextTuple(); // Pretend to be storm. + Thread.sleep(5000); + Assert.assertTrue(mockCollector.emitted); // Should have been re-emitted + } finally { + spout.close(); + } } @Test @@ -87,21 +92,26 @@ public class JmsSpoutTest { @Test public void testOpenWorksMultipleTypesOfNumberObjects() throws Exception { JmsSpout spout = new JmsSpout(); - spout.setJmsProvider(new MockJmsProvider()); - spout.setJmsTupleProducer(new MockTupleProducer()); - Map<String, Object> configuration = new HashMap<String, Object>(); - MockSpoutOutputCollector delegateCollector = + try { + spout.setJmsProvider(new MockJmsProvider()); + spout.setJmsTupleProducer(new MockTupleProducer()); + Map<String, Object> configuration = new HashMap<String, Object>(); + MockSpoutOutputCollector delegateCollector = new MockSpoutOutputCollector(); - SpoutOutputCollector collector = + SpoutOutputCollector collector = new SpoutOutputCollector(delegateCollector); - // Test with long value - configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000L); - spout.open(configuration, null, collector); + // Test with long value + configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000L); + spout.open(configuration, null, collector); + spout.close(); - // Test with integer value - configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000); - spout.open(configuration, null, collector); + // Test with integer value + configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000); + spout.open(configuration, null, collector); + } finally { + spout.close(); + } } public Message sendMessage(ConnectionFactory connectionFactory,
