Updated Branches:
  refs/heads/trunk d9061a037 -> e30cbd544

FLUME-2311 - Use standard way of finding queue/topic (Hugo Lassiège via Brock 
Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e30cbd54
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e30cbd54
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e30cbd54

Branch: refs/heads/trunk
Commit: e30cbd544e6ca0b8a19d2e9db491c679e9ca4ee8
Parents: d9061a0
Author: Brock Noland <[email protected]>
Authored: Fri Feb 7 10:45:52 2014 -0600
Committer: Brock Noland <[email protected]>
Committed: Fri Feb 7 10:45:52 2014 -0600

----------------------------------------------------------------------
 .../flume/source/jms/JMSDestinationLocator.java | 23 ++++++++++++++++
 .../flume/source/jms/JMSMessageConsumer.java    | 28 +++++++++++++-------
 .../source/jms/JMSMessageConsumerFactory.java   | 14 +++++-----
 .../org/apache/flume/source/jms/JMSSource.java  | 24 ++++++++++++-----
 .../source/jms/JMSSourceConfiguration.java      |  3 +++
 .../source/jms/JMSMessageConsumerTestBase.java  | 10 ++++---
 .../source/jms/TestIntegrationActiveMQ.java     | 12 +++++++++
 .../apache/flume/source/jms/TestJMSSource.java  | 18 ++++++-------
 8 files changed, 97 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e30cbd54/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSDestinationLocator.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSDestinationLocator.java
 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSDestinationLocator.java
new file mode 100644
index 0000000..c590c8e
--- /dev/null
+++ 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSDestinationLocator.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source.jms;
+
+public enum JMSDestinationLocator {
+  JNDI, CDI
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/e30cbd54/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
index 9463e9a..7a9461b 100644
--- 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
+++ 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
@@ -28,6 +28,8 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
 
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
@@ -50,11 +52,11 @@ class JMSMessageConsumer {
   private final Destination destination;
   private final MessageConsumer messageConsumer;
 
-  JMSMessageConsumer(ConnectionFactory connectionFactory,
-      String destinationName, JMSDestinationType destinationType,
-      String messageSelector, int batchSize, long pollTimeout,
-      JMSMessageConverter messageConverter, Optional<String> userName,
-      Optional<String> password) {
+  JMSMessageConsumer(InitialContext initialContext, ConnectionFactory 
connectionFactory, String destinationName,
+    JMSDestinationLocator destinationLocator, JMSDestinationType 
destinationType,
+    String messageSelector, int batchSize, long pollTimeout,
+    JMSMessageConverter messageConverter,
+    Optional<String> userName, Optional<String> password) {
     this.batchSize = batchSize;
     this.pollTimeout = pollTimeout;
     this.messageConverter = messageConverter;
@@ -79,7 +81,9 @@ class JMSMessageConsumer {
     } catch (JMSException e) {
       throw new FlumeException("Could not create session", e);
     }
-    try {
+
+  try {
+    if (destinationLocator.equals(JMSDestinationLocator.CDI)) {
       switch (destinationType) {
         case QUEUE:
           destination = session.createQueue(destinationName);
@@ -90,12 +94,16 @@ class JMSMessageConsumer {
         default:
           throw new IllegalStateException(String.valueOf(destinationType));
       }
-    } catch (JMSException e) {
-      throw new FlumeException("Could not create destination "
-          + destinationName, e);
+    } else {
+      destination = (Destination) initialContext.lookup(destinationName);
     }
+  } catch (JMSException e) {
+    throw new FlumeException("Could not create destination " + 
destinationName, e);
+  } catch (NamingException e) {
+    throw new FlumeException("Could not find destination " + destinationName, 
e);
+  }
 
-    try {
+  try {
       messageConsumer = session.createConsumer(destination,
           messageSelector.isEmpty() ? null: messageSelector);
     } catch (JMSException e) {

http://git-wip-us.apache.org/repos/asf/flume/blob/e30cbd54/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
index af2a68a..af74bf4 100644
--- 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
+++ 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
@@ -18,19 +18,19 @@
 package org.apache.flume.source.jms;
 
 import javax.jms.ConnectionFactory;
+import javax.naming.InitialContext;
 
 import com.google.common.base.Optional;
 
 
 public class JMSMessageConsumerFactory {
 
-  JMSMessageConsumer create(ConnectionFactory connectionFactory,
-      String destinationName, JMSDestinationType destinationType,
-      String messageSelector, int batchSize, long pollTimeout,
-      JMSMessageConverter messageConverter, Optional<String> userName,
-      Optional<String> password) {
-    return new JMSMessageConsumer(connectionFactory, destinationName,
-        destinationType, messageSelector, batchSize, pollTimeout,
+  JMSMessageConsumer create(InitialContext initialContext, ConnectionFactory 
connectionFactory,
+    String destinationName, JMSDestinationType destinationType, 
JMSDestinationLocator destinationLocator,
+    String messageSelector, int batchSize, long pollTimeout, 
JMSMessageConverter messageConverter,
+    Optional<String> userName, Optional<String> password) {
+    return new JMSMessageConsumer(initialContext, connectionFactory, 
destinationName,
+      destinationLocator, destinationType, messageSelector, batchSize, 
pollTimeout,
         messageConverter, userName, password);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e30cbd54/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index 6ebb2bb..addd97a 100644
--- 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -64,6 +64,7 @@ public class JMSSource extends AbstractPollableSource {
   private String providerUrl;
   private String destinationName;
   private JMSDestinationType destinationType;
+  private JMSDestinationLocator destinationLocator;
   private String messageSelector;
   private Optional<String> userName;
   private Optional<String> password;
@@ -72,6 +73,7 @@ public class JMSSource extends AbstractPollableSource {
   private long pollTimeout;
 
   private int jmsExceptionCounter;
+  private InitialContext initialContext;
 
 
   public JMSSource() {
@@ -101,6 +103,10 @@ public class JMSSource extends AbstractPollableSource {
     String destinationTypeName = context.getString(JMSSourceConfiguration.
         DESTINATION_TYPE, "").trim().toUpperCase();
 
+    String destinationLocatorName = context.getString(JMSSourceConfiguration.
+        DESTINATION_LOCATOR, 
JMSSourceConfiguration.DESTINATION_LOCATOR_DEFAULT)
+      .trim().toUpperCase();
+
     messageSelector = context.getString(JMSSourceConfiguration.
         MESSAGE_SELECTOR, "").trim();
 
@@ -196,11 +202,16 @@ public class JMSSource extends AbstractPollableSource {
           "invalid.", destinationTypeName), e);
     }
 
+    try {
+      destinationLocator = 
JMSDestinationLocator.valueOf(destinationLocatorName);
+    } catch (IllegalArgumentException e) {
+      throw new FlumeException(String.format("Destination locator '%s' is " +
+          "invalid.", destinationLocatorName), e);
+    }
+
     Preconditions.checkArgument(batchSize > 0, "Batch size must be greater " +
         "than 0");
 
-    InitialContext initalContext;
-
     try {
       Properties contextProperties = new Properties();
       contextProperties.setProperty(
@@ -208,7 +219,7 @@ public class JMSSource extends AbstractPollableSource {
           initialContextFactoryName);
       contextProperties.setProperty(
           javax.naming.Context.PROVIDER_URL, providerUrl);
-      initalContext = initialContextFactory.create(contextProperties);
+      initialContext = initialContextFactory.create(contextProperties);
     } catch (NamingException e) {
       throw new FlumeException(String.format(
           "Could not create initial context %s provider %s",
@@ -216,7 +227,7 @@ public class JMSSource extends AbstractPollableSource {
     }
 
     try {
-      connectionFactory = (ConnectionFactory) initalContext.
+      connectionFactory = (ConnectionFactory) initialContext.
           lookup(connectionFactoryName);
     } catch (NamingException e) {
       throw new FlumeException("Could not lookup ConnectionFactory", e);
@@ -302,8 +313,9 @@ public class JMSSource extends AbstractPollableSource {
   }
   private JMSMessageConsumer createConsumer() throws JMSException {
     logger.info("Creating new consumer for " + destinationName);
-    JMSMessageConsumer consumer = consumerFactory.create(connectionFactory,
-        destinationName, destinationType, messageSelector, batchSize,
+    JMSMessageConsumer consumer = consumerFactory.create(initialContext,
+    connectionFactory, destinationName, destinationType, destinationLocator,
+    messageSelector, batchSize,
         pollTimeout, converter, userName, password);
     jmsExceptionCounter = 0;
     return consumer;

http://git-wip-us.apache.org/repos/asf/flume/blob/e30cbd54/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java
 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java
index c0ec9b6..98bf8ab 100644
--- 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java
+++ 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java
@@ -29,6 +29,9 @@ public class JMSSourceConfiguration {
   public static final String DESTINATION_NAME = "destinationName";
 
   public static final String DESTINATION_TYPE = "destinationType";
+  public static final String DESTINATION_LOCATOR = "destinationLocator";
+  public static final String DESTINATION_LOCATOR_DEFAULT = "CDI";
+
   public static final String DESTINATION_TYPE_QUEUE = "queue";
   public static final String DESTINATION_TYPE_TOPIC = "topic";
 

http://git-wip-us.apache.org/repos/asf/flume/blob/e30cbd54/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
index e40e95a..6881967 100644
--- 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
+++ 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
@@ -32,6 +32,7 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
+import javax.naming.InitialContext;
 
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -47,12 +48,14 @@ public abstract class JMSMessageConsumerTestBase {
   static final String DESTINATION_NAME = "destinationName";
   static final String SELECTOR = "selector";
   static final String TEXT = "text";
+  static final InitialContext WONT_USE = null;
 
   Context context;
   JMSMessageConsumer consumer;
   ConnectionFactory connectionFactory;
   String destinationName;
   JMSDestinationType destinationType;
+  JMSDestinationLocator destinationLocator;
   String messageSelector;
   int batchSize;
   long pollTimeout;
@@ -100,6 +103,7 @@ public abstract class JMSMessageConsumerTestBase {
     when(messageConsumer.receive(anyLong())).thenReturn(message);
     destinationName = DESTINATION_NAME;
     destinationType = JMSDestinationType.QUEUE;
+    destinationLocator = JMSDestinationLocator.CDI;
     messageSelector = SELECTOR;
     batchSize = 10;
     pollTimeout = 500L;
@@ -129,9 +133,9 @@ public abstract class JMSMessageConsumerTestBase {
   }
 
   JMSMessageConsumer create() {
-    return new JMSMessageConsumer(connectionFactory, destinationName,
-        destinationType, messageSelector, batchSize, pollTimeout, converter,
-        userName, password);
+    return new JMSMessageConsumer(WONT_USE, connectionFactory, destinationName,
+        destinationLocator, destinationType, messageSelector, batchSize,
+        pollTimeout, converter, userName, password);
   }
   @After
   public void tearDown() throws Exception {

http://git-wip-us.apache.org/repos/asf/flume/blob/e30cbd54/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
index 20c0d2e..e28e02a 100644
--- 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
+++ 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
@@ -55,11 +55,14 @@ import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 public class TestIntegrationActiveMQ {
+
   private final static String INITIAL_CONTEXT_FACTORY = 
"org.apache.activemq.jndi.ActiveMQInitialContextFactory";
   public static final String BROKER_BIND_URL = "tcp://localhost:61516";
   private final static  String DESTINATION_NAME = "test";
   private static final String USERNAME = "user";
   private static final String PASSWORD = "pass";
+  // specific for dynamic queues on ActiveMq
+  public static final String JNDI_PREFIX = "dynamicQueues/";
 
   private File baseDir;
   private File tmpDir;
@@ -171,6 +174,15 @@ public class TestIntegrationActiveMQ {
   }
 
   @Test
+  public void testQueueLocatedWithJndi() throws Exception {
+    context.put(JMSSourceConfiguration.DESTINATION_NAME,
+            JNDI_PREFIX + DESTINATION_NAME);
+    context.put(JMSSourceConfiguration.DESTINATION_LOCATOR,
+            JMSDestinationLocator.JNDI.name());
+    testQueue();
+  }
+
+  @Test
   public void testQueue() throws Exception {
     context.put(JMSSourceConfiguration.DESTINATION_TYPE,
         JMSSourceConfiguration.DESTINATION_TYPE_QUEUE);

http://git-wip-us.apache.org/repos/asf/flume/blob/e30cbd54/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
index ddfd767..5423f8f 100644
--- 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
+++ 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
@@ -54,7 +54,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase 
{
 
   private JMSSource source;
   private Context context;
-  private InitialContext initialConext;
+  private InitialContext initialContext;
   private ChannelProcessor channelProcessor;
   private List<Event> events;
   private JMSMessageConsumerFactory consumerFactory;
@@ -67,7 +67,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase 
{
     baseDir = Files.createTempDir();
     passwordFile = new File(baseDir, "password");
     Assert.assertTrue(passwordFile.createNewFile());
-    initialConext = mock(InitialContext.class);
+    initialContext = mock(InitialContext.class);
     channelProcessor = mock(ChannelProcessor.class);
     events = Lists.newArrayList();
     doAnswer(new Answer<Void>() {
@@ -79,13 +79,13 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     }).when(channelProcessor).processEventBatch(any(List.class));
     consumerFactory = mock(JMSMessageConsumerFactory.class);
     consumer = spy(create());
-    when(consumerFactory.create(any(ConnectionFactory.class), anyString(),
-        any(JMSDestinationType.class), anyString(), anyInt(), anyLong(),
+    when(consumerFactory.create(any(InitialContext.class), 
any(ConnectionFactory.class), anyString(),
+        any(JMSDestinationType.class), any(JMSDestinationLocator.class), 
anyString(), anyInt(), anyLong(),
         any(JMSMessageConverter.class), any(Optional.class),
         any(Optional.class))).thenReturn(consumer);
-    when(initialConext.lookup(anyString())).thenReturn(connectionFactory);
+    when(initialContext.lookup(anyString())).thenReturn(connectionFactory);
     contextFactory = mock(InitialContextFactory.class);
-    
when(contextFactory.create(any(Properties.class))).thenReturn(initialConext);
+    
when(contextFactory.create(any(Properties.class))).thenReturn(initialContext);
     source = new JMSSource(consumerFactory, contextFactory);
     source.setName("JMSSource-" + UUID.randomUUID());
     source.setChannelProcessor(channelProcessor);
@@ -136,8 +136,8 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
   @SuppressWarnings("unchecked")
   @Test
   public void testStartConsumerCreateThrowsException() throws Exception {
-    when(consumerFactory.create(any(ConnectionFactory.class), anyString(),
-        any(JMSDestinationType.class), anyString(), anyInt(), anyLong(),
+    when(consumerFactory.create(any(InitialContext.class), 
any(ConnectionFactory.class), anyString(),
+        any(JMSDestinationType.class), any(JMSDestinationLocator.class), 
anyString(), anyInt(), anyLong(),
         any(JMSMessageConverter.class), any(Optional.class),
         any(Optional.class))).thenThrow(new RuntimeException());
     source.configure(context);
@@ -151,7 +151,7 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
   }
   @Test(expected = FlumeException.class)
   public void testConfigureWithContextLookupThrowsException() throws Exception 
{
-    when(initialConext.lookup(anyString())).thenThrow(new NamingException());
+    when(initialContext.lookup(anyString())).thenThrow(new NamingException());
     source.configure(context);
   }
   @Test(expected = FlumeException.class)

Reply via email to