Updated Branches: refs/heads/flume-1.5 ffa9fae30 -> db1abb0a8
FLUME-2311 - Use standard way of finding queue/topic (Hugo Lassiège via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/db1abb0a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/db1abb0a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/db1abb0a Branch: refs/heads/flume-1.5 Commit: db1abb0a82c46bd7c8dd44629f4a1c98e01abf9e Parents: ffa9fae Author: Brock Noland <[email protected]> Authored: Fri Feb 7 10:45:52 2014 -0600 Committer: Brock Noland <[email protected]> Committed: Fri Feb 7 10:46:55 2014 -0600 ---------------------------------------------------------------------- .../flume/source/jms/JMSDestinationLocator.java | 23 ++++++++++++++++ .../flume/source/jms/JMSMessageConsumer.java | 28 +++++++++++++------- .../source/jms/JMSMessageConsumerFactory.java | 14 +++++----- .../org/apache/flume/source/jms/JMSSource.java | 24 ++++++++++++----- .../source/jms/JMSSourceConfiguration.java | 3 +++ .../source/jms/JMSMessageConsumerTestBase.java | 10 ++++--- .../source/jms/TestIntegrationActiveMQ.java | 12 +++++++++ .../apache/flume/source/jms/TestJMSSource.java | 18 ++++++------- 8 files changed, 97 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/db1abb0a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSDestinationLocator.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSDestinationLocator.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSDestinationLocator.java new file mode 100644 index 0000000..c590c8e --- /dev/null +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSDestinationLocator.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source.jms; + +public enum JMSDestinationLocator { + JNDI, CDI +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/db1abb0a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java index 9463e9a..7a9461b 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java @@ -28,6 +28,8 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; +import javax.naming.InitialContext; +import javax.naming.NamingException; import org.apache.flume.Event; import org.apache.flume.FlumeException; @@ -50,11 +52,11 @@ class JMSMessageConsumer { private final Destination destination; private final MessageConsumer messageConsumer; - JMSMessageConsumer(ConnectionFactory connectionFactory, - String destinationName, JMSDestinationType destinationType, - String messageSelector, int batchSize, long pollTimeout, - JMSMessageConverter messageConverter, Optional<String> userName, - Optional<String> password) { + JMSMessageConsumer(InitialContext initialContext, ConnectionFactory connectionFactory, String destinationName, + JMSDestinationLocator destinationLocator, JMSDestinationType destinationType, + String messageSelector, int batchSize, long pollTimeout, + JMSMessageConverter messageConverter, + Optional<String> userName, Optional<String> password) { this.batchSize = batchSize; this.pollTimeout = pollTimeout; this.messageConverter = messageConverter; @@ -79,7 +81,9 @@ class JMSMessageConsumer { } catch (JMSException e) { throw new FlumeException("Could not create session", e); } - try { + + try { + if (destinationLocator.equals(JMSDestinationLocator.CDI)) { switch (destinationType) { case QUEUE: destination = session.createQueue(destinationName); @@ -90,12 +94,16 @@ class JMSMessageConsumer { default: throw new IllegalStateException(String.valueOf(destinationType)); } - } catch (JMSException e) { - throw new FlumeException("Could not create destination " - + destinationName, e); + } else { + destination = (Destination) initialContext.lookup(destinationName); } + } catch (JMSException e) { + throw new FlumeException("Could not create destination " + destinationName, e); + } catch (NamingException e) { + throw new FlumeException("Could not find destination " + destinationName, e); + } - try { + try { messageConsumer = session.createConsumer(destination, messageSelector.isEmpty() ? null: messageSelector); } catch (JMSException e) { http://git-wip-us.apache.org/repos/asf/flume/blob/db1abb0a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java index af2a68a..af74bf4 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java @@ -18,19 +18,19 @@ package org.apache.flume.source.jms; import javax.jms.ConnectionFactory; +import javax.naming.InitialContext; import com.google.common.base.Optional; public class JMSMessageConsumerFactory { - JMSMessageConsumer create(ConnectionFactory connectionFactory, - String destinationName, JMSDestinationType destinationType, - String messageSelector, int batchSize, long pollTimeout, - JMSMessageConverter messageConverter, Optional<String> userName, - Optional<String> password) { - return new JMSMessageConsumer(connectionFactory, destinationName, - destinationType, messageSelector, batchSize, pollTimeout, + JMSMessageConsumer create(InitialContext initialContext, ConnectionFactory connectionFactory, + String destinationName, JMSDestinationType destinationType, JMSDestinationLocator destinationLocator, + String messageSelector, int batchSize, long pollTimeout, JMSMessageConverter messageConverter, + Optional<String> userName, Optional<String> password) { + return new JMSMessageConsumer(initialContext, connectionFactory, destinationName, + destinationLocator, destinationType, messageSelector, batchSize, pollTimeout, messageConverter, userName, password); } } http://git-wip-us.apache.org/repos/asf/flume/blob/db1abb0a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java index 6ebb2bb..addd97a 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java @@ -64,6 +64,7 @@ public class JMSSource extends AbstractPollableSource { private String providerUrl; private String destinationName; private JMSDestinationType destinationType; + private JMSDestinationLocator destinationLocator; private String messageSelector; private Optional<String> userName; private Optional<String> password; @@ -72,6 +73,7 @@ public class JMSSource extends AbstractPollableSource { private long pollTimeout; private int jmsExceptionCounter; + private InitialContext initialContext; public JMSSource() { @@ -101,6 +103,10 @@ public class JMSSource extends AbstractPollableSource { String destinationTypeName = context.getString(JMSSourceConfiguration. DESTINATION_TYPE, "").trim().toUpperCase(); + String destinationLocatorName = context.getString(JMSSourceConfiguration. + DESTINATION_LOCATOR, JMSSourceConfiguration.DESTINATION_LOCATOR_DEFAULT) + .trim().toUpperCase(); + messageSelector = context.getString(JMSSourceConfiguration. MESSAGE_SELECTOR, "").trim(); @@ -196,11 +202,16 @@ public class JMSSource extends AbstractPollableSource { "invalid.", destinationTypeName), e); } + try { + destinationLocator = JMSDestinationLocator.valueOf(destinationLocatorName); + } catch (IllegalArgumentException e) { + throw new FlumeException(String.format("Destination locator '%s' is " + + "invalid.", destinationLocatorName), e); + } + Preconditions.checkArgument(batchSize > 0, "Batch size must be greater " + "than 0"); - InitialContext initalContext; - try { Properties contextProperties = new Properties(); contextProperties.setProperty( @@ -208,7 +219,7 @@ public class JMSSource extends AbstractPollableSource { initialContextFactoryName); contextProperties.setProperty( javax.naming.Context.PROVIDER_URL, providerUrl); - initalContext = initialContextFactory.create(contextProperties); + initialContext = initialContextFactory.create(contextProperties); } catch (NamingException e) { throw new FlumeException(String.format( "Could not create initial context %s provider %s", @@ -216,7 +227,7 @@ public class JMSSource extends AbstractPollableSource { } try { - connectionFactory = (ConnectionFactory) initalContext. + connectionFactory = (ConnectionFactory) initialContext. lookup(connectionFactoryName); } catch (NamingException e) { throw new FlumeException("Could not lookup ConnectionFactory", e); @@ -302,8 +313,9 @@ public class JMSSource extends AbstractPollableSource { } private JMSMessageConsumer createConsumer() throws JMSException { logger.info("Creating new consumer for " + destinationName); - JMSMessageConsumer consumer = consumerFactory.create(connectionFactory, - destinationName, destinationType, messageSelector, batchSize, + JMSMessageConsumer consumer = consumerFactory.create(initialContext, + connectionFactory, destinationName, destinationType, destinationLocator, + messageSelector, batchSize, pollTimeout, converter, userName, password); jmsExceptionCounter = 0; return consumer; http://git-wip-us.apache.org/repos/asf/flume/blob/db1abb0a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java index c0ec9b6..98bf8ab 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSourceConfiguration.java @@ -29,6 +29,9 @@ public class JMSSourceConfiguration { public static final String DESTINATION_NAME = "destinationName"; public static final String DESTINATION_TYPE = "destinationType"; + public static final String DESTINATION_LOCATOR = "destinationLocator"; + public static final String DESTINATION_LOCATOR_DEFAULT = "CDI"; + public static final String DESTINATION_TYPE_QUEUE = "queue"; public static final String DESTINATION_TYPE_TOPIC = "topic"; http://git-wip-us.apache.org/repos/asf/flume/blob/db1abb0a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java index e40e95a..6881967 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java @@ -32,6 +32,7 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import javax.naming.InitialContext; import org.apache.flume.Context; import org.apache.flume.Event; @@ -47,12 +48,14 @@ public abstract class JMSMessageConsumerTestBase { static final String DESTINATION_NAME = "destinationName"; static final String SELECTOR = "selector"; static final String TEXT = "text"; + static final InitialContext WONT_USE = null; Context context; JMSMessageConsumer consumer; ConnectionFactory connectionFactory; String destinationName; JMSDestinationType destinationType; + JMSDestinationLocator destinationLocator; String messageSelector; int batchSize; long pollTimeout; @@ -100,6 +103,7 @@ public abstract class JMSMessageConsumerTestBase { when(messageConsumer.receive(anyLong())).thenReturn(message); destinationName = DESTINATION_NAME; destinationType = JMSDestinationType.QUEUE; + destinationLocator = JMSDestinationLocator.CDI; messageSelector = SELECTOR; batchSize = 10; pollTimeout = 500L; @@ -129,9 +133,9 @@ public abstract class JMSMessageConsumerTestBase { } JMSMessageConsumer create() { - return new JMSMessageConsumer(connectionFactory, destinationName, - destinationType, messageSelector, batchSize, pollTimeout, converter, - userName, password); + return new JMSMessageConsumer(WONT_USE, connectionFactory, destinationName, + destinationLocator, destinationType, messageSelector, batchSize, + pollTimeout, converter, userName, password); } @After public void tearDown() throws Exception { http://git-wip-us.apache.org/repos/asf/flume/blob/db1abb0a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java index 20c0d2e..e28e02a 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java @@ -55,11 +55,14 @@ import com.google.common.collect.Lists; import com.google.common.io.Files; public class TestIntegrationActiveMQ { + private final static String INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"; public static final String BROKER_BIND_URL = "tcp://localhost:61516"; private final static String DESTINATION_NAME = "test"; private static final String USERNAME = "user"; private static final String PASSWORD = "pass"; + // specific for dynamic queues on ActiveMq + public static final String JNDI_PREFIX = "dynamicQueues/"; private File baseDir; private File tmpDir; @@ -171,6 +174,15 @@ public class TestIntegrationActiveMQ { } @Test + public void testQueueLocatedWithJndi() throws Exception { + context.put(JMSSourceConfiguration.DESTINATION_NAME, + JNDI_PREFIX + DESTINATION_NAME); + context.put(JMSSourceConfiguration.DESTINATION_LOCATOR, + JMSDestinationLocator.JNDI.name()); + testQueue(); + } + + @Test public void testQueue() throws Exception { context.put(JMSSourceConfiguration.DESTINATION_TYPE, JMSSourceConfiguration.DESTINATION_TYPE_QUEUE); http://git-wip-us.apache.org/repos/asf/flume/blob/db1abb0a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java index ddfd767..5423f8f 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java @@ -54,7 +54,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { private JMSSource source; private Context context; - private InitialContext initialConext; + private InitialContext initialContext; private ChannelProcessor channelProcessor; private List<Event> events; private JMSMessageConsumerFactory consumerFactory; @@ -67,7 +67,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { baseDir = Files.createTempDir(); passwordFile = new File(baseDir, "password"); Assert.assertTrue(passwordFile.createNewFile()); - initialConext = mock(InitialContext.class); + initialContext = mock(InitialContext.class); channelProcessor = mock(ChannelProcessor.class); events = Lists.newArrayList(); doAnswer(new Answer<Void>() { @@ -79,13 +79,13 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { }).when(channelProcessor).processEventBatch(any(List.class)); consumerFactory = mock(JMSMessageConsumerFactory.class); consumer = spy(create()); - when(consumerFactory.create(any(ConnectionFactory.class), anyString(), - any(JMSDestinationType.class), anyString(), anyInt(), anyLong(), + when(consumerFactory.create(any(InitialContext.class), any(ConnectionFactory.class), anyString(), + any(JMSDestinationType.class), any(JMSDestinationLocator.class), anyString(), anyInt(), anyLong(), any(JMSMessageConverter.class), any(Optional.class), any(Optional.class))).thenReturn(consumer); - when(initialConext.lookup(anyString())).thenReturn(connectionFactory); + when(initialContext.lookup(anyString())).thenReturn(connectionFactory); contextFactory = mock(InitialContextFactory.class); - when(contextFactory.create(any(Properties.class))).thenReturn(initialConext); + when(contextFactory.create(any(Properties.class))).thenReturn(initialContext); source = new JMSSource(consumerFactory, contextFactory); source.setName("JMSSource-" + UUID.randomUUID()); source.setChannelProcessor(channelProcessor); @@ -136,8 +136,8 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { @SuppressWarnings("unchecked") @Test public void testStartConsumerCreateThrowsException() throws Exception { - when(consumerFactory.create(any(ConnectionFactory.class), anyString(), - any(JMSDestinationType.class), anyString(), anyInt(), anyLong(), + when(consumerFactory.create(any(InitialContext.class), any(ConnectionFactory.class), anyString(), + any(JMSDestinationType.class), any(JMSDestinationLocator.class), anyString(), anyInt(), anyLong(), any(JMSMessageConverter.class), any(Optional.class), any(Optional.class))).thenThrow(new RuntimeException()); source.configure(context); @@ -151,7 +151,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { } @Test(expected = FlumeException.class) public void testConfigureWithContextLookupThrowsException() throws Exception { - when(initialConext.lookup(anyString())).thenThrow(new NamingException()); + when(initialContext.lookup(anyString())).thenThrow(new NamingException()); source.configure(context); } @Test(expected = FlumeException.class)
