Repository: flume Updated Branches: refs/heads/trunk 4f1268a14 -> e08ab04de
FLUME-3270: Close JMS resources in JMSMessageConsumer constructor in case of failure This closes #227 Reviewers: Endre Major, Ferenc Szabo (Peter Turcsanyi via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e08ab04d Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e08ab04d Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e08ab04d Branch: refs/heads/trunk Commit: e08ab04de3f71177a1cd0a36df9be353ebee6796 Parents: 4f1268a Author: Peter Turcsanyi <[email protected]> Authored: Tue Sep 4 10:16:26 2018 +0200 Committer: Ferenc Szabo <[email protected]> Committed: Tue Sep 4 10:16:26 2018 +0200 ---------------------------------------------------------------------- .../flume/source/jms/JMSMessageConsumer.java | 105 ++++++++++--------- .../source/jms/TestJMSMessageConsumer.java | 53 ++++++++-- 2 files changed, 97 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/e08ab04d/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java index b0b1c08..6477f9a 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java @@ -65,65 +65,70 @@ class JMSMessageConsumer { + "than zero"); Preconditions.checkArgument(pollTimeout >= 0, "Poll timeout cannot be " + "negative"); + try { - if (userName.isPresent()) { - connection = connectionFactory.createConnection(userName.get(), - password.get()); - } else { - connection = connectionFactory.createConnection(); - } - if (clientId.isPresent()) { - connection.setClientID(clientId.get()); + try { + if (userName.isPresent()) { + connection = connectionFactory.createConnection(userName.get(), password.get()); + } else { + connection = connectionFactory.createConnection(); + } + if (clientId.isPresent()) { + connection.setClientID(clientId.get()); + } + connection.start(); + } catch (JMSException e) { + throw new FlumeException("Could not create connection to broker", e); } - connection.start(); - } catch (JMSException e) { - throw new FlumeException("Could not create connection to broker", e); - } - try { - session = connection.createSession(true, Session.SESSION_TRANSACTED); - } catch (JMSException e) { - throw new FlumeException("Could not create session", e); - } + try { + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } catch (JMSException e) { + throw new FlumeException("Could not create session", e); + } - try { - if (destinationLocator.equals(JMSDestinationLocator.CDI)) { - switch (destinationType) { - case QUEUE: - destination = session.createQueue(destinationName); - break; - case TOPIC: - destination = session.createTopic(destinationName); - break; - default: - throw new IllegalStateException(String.valueOf(destinationType)); + try { + if (destinationLocator.equals(JMSDestinationLocator.CDI)) { + switch (destinationType) { + case QUEUE: + destination = session.createQueue(destinationName); + break; + case TOPIC: + destination = session.createTopic(destinationName); + break; + default: + throw new IllegalStateException(String.valueOf(destinationType)); + } + } else { + destination = (Destination) initialContext.lookup(destinationName); } - } else { - destination = (Destination) initialContext.lookup(destinationName); + } catch (JMSException e) { + throw new FlumeException("Could not create destination " + destinationName, e); + } catch (NamingException e) { + throw new FlumeException("Could not find destination " + destinationName, e); } - } catch (JMSException e) { - throw new FlumeException("Could not create destination " + destinationName, e); - } catch (NamingException e) { - throw new FlumeException("Could not find destination " + destinationName, e); - } - try { - if (createDurableSubscription) { - messageConsumer = session.createDurableSubscriber( - (Topic) destination, durableSubscriptionName, - messageSelector.isEmpty() ? null : messageSelector, true); - } else { - messageConsumer = session.createConsumer(destination, - messageSelector.isEmpty() ? null : messageSelector); + try { + if (createDurableSubscription) { + messageConsumer = session.createDurableSubscriber( + (Topic) destination, durableSubscriptionName, + messageSelector.isEmpty() ? null : messageSelector, true); + } else { + messageConsumer = session.createConsumer(destination, + messageSelector.isEmpty() ? null : messageSelector); + } + } catch (JMSException e) { + throw new FlumeException("Could not create consumer", e); } - } catch (JMSException e) { - throw new FlumeException("Could not create consumer", e); + String startupMsg = String.format("Connected to '%s' of type '%s' with " + + "user '%s', batch size '%d', selector '%s' ", destinationName, + destinationType, userName.isPresent() ? userName.get() : "null", + batchSize, messageSelector.isEmpty() ? null : messageSelector); + logger.info(startupMsg); + } catch (Exception e) { + close(); + throw e; } - String startupMsg = String.format("Connected to '%s' of type '%s' with " + - "user '%s', batch size '%d', selector '%s' ", destinationName, - destinationType, userName.isPresent() ? userName.get() : "null", - batchSize, messageSelector.isEmpty() ? null : messageSelector); - logger.info(startupMsg); } List<Event> take() throws JMSException { http://git-wip-us.apache.org/repos/asf/flume/blob/e08ab04d/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java index 41262af..04f3f48 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java @@ -44,32 +44,53 @@ public class TestJMSMessageConsumer extends JMSMessageConsumerTestBase { .thenThrow(new JMSException("")); create(); } - @Test(expected = FlumeException.class) + @Test public void testCreateSessionFails() throws Exception { when(connection.createSession(true, Session.SESSION_TRANSACTED)) .thenThrow(new JMSException("")); - create(); + try { + create(); + fail("Expected exception: org.apache.flume.FlumeException"); + } catch (FlumeException e) { + verify(connection).close(); + } } - @Test(expected = FlumeException.class) + @Test public void testCreateQueueFails() throws Exception { when(session.createQueue(destinationName)) .thenThrow(new JMSException("")); - create(); + try { + create(); + fail("Expected exception: org.apache.flume.FlumeException"); + } catch (FlumeException e) { + verify(session).close(); + verify(connection).close(); + } } - @Test(expected = FlumeException.class) + @Test public void testCreateTopicFails() throws Exception { destinationType = JMSDestinationType.TOPIC; - when(session.createQueue(destinationName)).thenThrow(new AssertionError()); - when(session.createTopic(destinationName)).thenReturn(topic); when(session.createTopic(destinationName)) .thenThrow(new JMSException("")); - create(); + try { + create(); + fail("Expected exception: org.apache.flume.FlumeException"); + } catch (FlumeException e) { + verify(session).close(); + verify(connection).close(); + } } - @Test(expected = FlumeException.class) + @Test public void testCreateConsumerFails() throws Exception { when(session.createConsumer(any(Destination.class), anyString())) .thenThrow(new JMSException("")); - create(); + try { + create(); + fail("Expected exception: org.apache.flume.FlumeException"); + } catch (FlumeException e) { + verify(session).close(); + verify(connection).close(); + } } @Test(expected = IllegalArgumentException.class) public void testInvalidBatchSizeZero() throws Exception { @@ -88,14 +109,24 @@ public class TestJMSMessageConsumer extends JMSMessageConsumerTestBase { } @Test + public void testQueue() throws Exception { + destinationType = JMSDestinationType.QUEUE; + when(session.createQueue(destinationName)).thenReturn(queue); + consumer = create(); + List<Event> events = consumer.take(); + assertEquals(batchSize, events.size()); + assertBodyIsExpected(events); + verify(session, never()).createTopic(anyString()); + } + @Test public void testTopic() throws Exception { destinationType = JMSDestinationType.TOPIC; - when(session.createQueue(destinationName)).thenThrow(new AssertionError()); when(session.createTopic(destinationName)).thenReturn(topic); consumer = create(); List<Event> events = consumer.take(); assertEquals(batchSize, events.size()); assertBodyIsExpected(events); + verify(session, never()).createQueue(anyString()); } @Test public void testUserPass() throws Exception {
