Repository: flume
Updated Branches:
  refs/heads/trunk 4f1268a14 -> e08ab04de


FLUME-3270: Close JMS resources in JMSMessageConsumer constructor in

case of failure

This closes #227

Reviewers: Endre Major, Ferenc Szabo

(Peter Turcsanyi via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e08ab04d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e08ab04d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e08ab04d

Branch: refs/heads/trunk
Commit: e08ab04de3f71177a1cd0a36df9be353ebee6796
Parents: 4f1268a
Author: Peter Turcsanyi <[email protected]>
Authored: Tue Sep 4 10:16:26 2018 +0200
Committer: Ferenc Szabo <[email protected]>
Committed: Tue Sep 4 10:16:26 2018 +0200

----------------------------------------------------------------------
 .../flume/source/jms/JMSMessageConsumer.java    | 105 ++++++++++---------
 .../source/jms/TestJMSMessageConsumer.java      |  53 ++++++++--
 2 files changed, 97 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e08ab04d/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
index b0b1c08..6477f9a 100644
--- 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
+++ 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
@@ -65,65 +65,70 @@ class JMSMessageConsumer {
         + "than zero");
     Preconditions.checkArgument(pollTimeout >= 0, "Poll timeout cannot be " +
         "negative");
+
     try {
-      if (userName.isPresent()) {
-        connection = connectionFactory.createConnection(userName.get(),
-            password.get());
-      } else {
-        connection = connectionFactory.createConnection();
-      }
-      if (clientId.isPresent()) {
-        connection.setClientID(clientId.get());
+      try {
+        if (userName.isPresent()) {
+          connection = connectionFactory.createConnection(userName.get(), 
password.get());
+        } else {
+          connection = connectionFactory.createConnection();
+        }
+        if (clientId.isPresent()) {
+          connection.setClientID(clientId.get());
+        }
+        connection.start();
+      } catch (JMSException e) {
+        throw new FlumeException("Could not create connection to broker", e);
       }
-      connection.start();
-    } catch (JMSException e) {
-      throw new FlumeException("Could not create connection to broker", e);
-    }
 
-    try {
-      session = connection.createSession(true, Session.SESSION_TRANSACTED);
-    } catch (JMSException e) {
-      throw new FlumeException("Could not create session", e);
-    }
+      try {
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      } catch (JMSException e) {
+        throw new FlumeException("Could not create session", e);
+      }
 
-    try {
-      if (destinationLocator.equals(JMSDestinationLocator.CDI)) {
-        switch (destinationType) {
-          case QUEUE:
-            destination = session.createQueue(destinationName);
-            break;
-          case TOPIC:
-            destination = session.createTopic(destinationName);
-            break;
-          default:
-            throw new IllegalStateException(String.valueOf(destinationType));
+      try {
+        if (destinationLocator.equals(JMSDestinationLocator.CDI)) {
+          switch (destinationType) {
+            case QUEUE:
+              destination = session.createQueue(destinationName);
+              break;
+            case TOPIC:
+              destination = session.createTopic(destinationName);
+              break;
+            default:
+              throw new IllegalStateException(String.valueOf(destinationType));
+          }
+        } else {
+          destination = (Destination) initialContext.lookup(destinationName);
         }
-      } else {
-        destination = (Destination) initialContext.lookup(destinationName);
+      } catch (JMSException e) {
+        throw new FlumeException("Could not create destination " + 
destinationName, e);
+      } catch (NamingException e) {
+        throw new FlumeException("Could not find destination " + 
destinationName, e);
       }
-    } catch (JMSException e) {
-      throw new FlumeException("Could not create destination " + 
destinationName, e);
-    } catch (NamingException e) {
-      throw new FlumeException("Could not find destination " + 
destinationName, e);
-    }
 
-    try {
-      if (createDurableSubscription) {
-        messageConsumer = session.createDurableSubscriber(
-            (Topic) destination, durableSubscriptionName,
-            messageSelector.isEmpty() ? null : messageSelector, true);
-      } else {
-        messageConsumer = session.createConsumer(destination,
-            messageSelector.isEmpty() ? null : messageSelector);
+      try {
+        if (createDurableSubscription) {
+          messageConsumer = session.createDurableSubscriber(
+                  (Topic) destination, durableSubscriptionName,
+                  messageSelector.isEmpty() ? null : messageSelector, true);
+        } else {
+          messageConsumer = session.createConsumer(destination,
+                  messageSelector.isEmpty() ? null : messageSelector);
+        }
+      } catch (JMSException e) {
+        throw new FlumeException("Could not create consumer", e);
       }
-    } catch (JMSException e) {
-      throw new FlumeException("Could not create consumer", e);
+      String startupMsg = String.format("Connected to '%s' of type '%s' with " 
+
+                      "user '%s', batch size '%d', selector '%s' ", 
destinationName,
+              destinationType, userName.isPresent() ? userName.get() : "null",
+              batchSize, messageSelector.isEmpty() ? null : messageSelector);
+      logger.info(startupMsg);
+    } catch (Exception e) {
+      close();
+      throw e;
     }
-    String startupMsg = String.format("Connected to '%s' of type '%s' with " +
-            "user '%s', batch size '%d', selector '%s' ", destinationName,
-        destinationType, userName.isPresent() ? userName.get() : "null",
-        batchSize, messageSelector.isEmpty() ? null : messageSelector);
-    logger.info(startupMsg);
   }
 
   List<Event> take() throws JMSException {

http://git-wip-us.apache.org/repos/asf/flume/blob/e08ab04d/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
index 41262af..04f3f48 100644
--- 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
+++ 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
@@ -44,32 +44,53 @@ public class TestJMSMessageConsumer extends 
JMSMessageConsumerTestBase {
       .thenThrow(new JMSException(""));
     create();
   }
-  @Test(expected = FlumeException.class)
+  @Test
   public void testCreateSessionFails() throws Exception {
     when(connection.createSession(true, Session.SESSION_TRANSACTED))
       .thenThrow(new JMSException(""));
-    create();
+    try {
+      create();
+      fail("Expected exception: org.apache.flume.FlumeException");
+    } catch (FlumeException e) {
+      verify(connection).close();
+    }
   }
-  @Test(expected = FlumeException.class)
+  @Test
   public void testCreateQueueFails() throws Exception {
     when(session.createQueue(destinationName))
       .thenThrow(new JMSException(""));
-    create();
+    try {
+      create();
+      fail("Expected exception: org.apache.flume.FlumeException");
+    } catch (FlumeException e) {
+      verify(session).close();
+      verify(connection).close();
+    }
   }
-  @Test(expected = FlumeException.class)
+  @Test
   public void testCreateTopicFails() throws Exception {
     destinationType = JMSDestinationType.TOPIC;
-    when(session.createQueue(destinationName)).thenThrow(new AssertionError());
-    when(session.createTopic(destinationName)).thenReturn(topic);
     when(session.createTopic(destinationName))
       .thenThrow(new JMSException(""));
-    create();
+    try {
+      create();
+      fail("Expected exception: org.apache.flume.FlumeException");
+    } catch (FlumeException e) {
+      verify(session).close();
+      verify(connection).close();
+    }
   }
-  @Test(expected = FlumeException.class)
+  @Test
   public void testCreateConsumerFails() throws Exception {
     when(session.createConsumer(any(Destination.class), anyString()))
       .thenThrow(new JMSException(""));
-    create();
+    try {
+      create();
+      fail("Expected exception: org.apache.flume.FlumeException");
+    } catch (FlumeException e) {
+      verify(session).close();
+      verify(connection).close();
+    }
   }
   @Test(expected = IllegalArgumentException.class)
   public void testInvalidBatchSizeZero() throws Exception {
@@ -88,14 +109,24 @@ public class TestJMSMessageConsumer extends 
JMSMessageConsumerTestBase {
   }
 
   @Test
+  public void testQueue() throws Exception {
+    destinationType = JMSDestinationType.QUEUE;
+    when(session.createQueue(destinationName)).thenReturn(queue);
+    consumer = create();
+    List<Event> events = consumer.take();
+    assertEquals(batchSize, events.size());
+    assertBodyIsExpected(events);
+    verify(session, never()).createTopic(anyString());
+  }
+  @Test
   public void testTopic() throws Exception {
     destinationType = JMSDestinationType.TOPIC;
-    when(session.createQueue(destinationName)).thenThrow(new AssertionError());
     when(session.createTopic(destinationName)).thenReturn(topic);
     consumer = create();
     List<Event> events = consumer.take();
     assertEquals(batchSize, events.size());
     assertBodyIsExpected(events);
+    verify(session, never()).createQueue(anyString());
   }
   @Test
   public void testUserPass() throws Exception {

Reply via email to