Repository: flume Updated Branches: refs/heads/trunk 62c59a68d -> 30f6e39aa
FLUME-2976 Exception when JMS source tries to connect to a Weblogic server without authentication changing the default "" value of the password to null Reviewers: Bessenyei Balazs Donat, Peter Turcsanyi, Ferenc Szabo (Denes Arvay via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/30f6e39a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/30f6e39a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/30f6e39a Branch: refs/heads/trunk Commit: 30f6e39aa66cbf2e3a7c3c77aa4ea68228b7e257 Parents: 62c59a6 Author: Ferenc Szabo <[email protected]> Authored: Wed May 30 18:42:30 2018 +0200 Committer: Ferenc Szabo <[email protected]> Committed: Wed May 30 18:42:30 2018 +0200 ---------------------------------------------------------------------- .../org/apache/flume/source/jms/JMSSource.java | 2 +- .../source/jms/TestIntegrationActiveMQ.java | 77 +++++++++++++++----- 2 files changed, 61 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/30f6e39a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java index 72fc074..e5ed969 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java @@ -133,7 +133,7 @@ public class JMSSource extends AbstractPollableSource { String passwordFile = context.getString(JMSSourceConfiguration.PASSWORD_FILE, "").trim(); if (passwordFile.isEmpty()) { - password = Optional.of(""); + password = Optional.absent(); } else { try { password = Optional.of(Files.toString(new File(passwordFile), http://git-wip-us.apache.org/repos/asf/flume/blob/30f6e39a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java index 5a35d73..e13502e 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java @@ -21,6 +21,8 @@ import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.io.File; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -47,34 +49,70 @@ import org.apache.flume.channel.ChannelProcessor; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import com.google.common.base.Charsets; import com.google.common.collect.Lists; import com.google.common.io.Files; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class TestIntegrationActiveMQ { + private static final Logger LOGGER = LoggerFactory.getLogger(TestIntegrationActiveMQ.class); + private static final String INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"; public static final String BROKER_BIND_URL = "tcp://localhost:61516"; private static final String DESTINATION_NAME = "test"; - private static final String USERNAME = "user"; - private static final String PASSWORD = "pass"; // specific for dynamic queues on ActiveMq public static final String JNDI_PREFIX = "dynamicQueues/"; + private enum TestMode { + WITH_AUTHENTICATION, + WITHOUT_AUTHENTICATION + } + private File baseDir; private File tmpDir; private File dataDir; - private File passwordFile; private BrokerService broker; private Context context; private JMSSource source; private List<Event> events; + private final String jmsUserName; + private final String jmsPassword; + + public TestIntegrationActiveMQ(TestMode testMode) { + LOGGER.info("Testing with test mode {}", testMode); + + switch (testMode) { + case WITH_AUTHENTICATION: + jmsUserName = "user"; + jmsPassword = "pass"; + break; + case WITHOUT_AUTHENTICATION: + jmsUserName = null; + jmsPassword = null; + break; + default: + throw new IllegalArgumentException("Unhandled test mode: " + testMode); + } + } + + @Parameterized.Parameters + public static Collection<Object[]> parameters() { + return Arrays.asList(new Object[][]{ + {TestMode.WITH_AUTHENTICATION}, + {TestMode.WITHOUT_AUTHENTICATION} + }); + } @SuppressWarnings("unchecked") @Before @@ -83,26 +121,31 @@ public class TestIntegrationActiveMQ { tmpDir = new File(baseDir, "tmp"); dataDir = new File(baseDir, "data"); Assert.assertTrue(tmpDir.mkdir()); - passwordFile = new File(baseDir, "password"); - Files.write(PASSWORD.getBytes(Charsets.UTF_8), passwordFile); broker = new BrokerService(); - broker.addConnector(BROKER_BIND_URL); broker.setTmpDataDirectory(tmpDir); broker.setDataDirectoryFile(dataDir); - List<AuthenticationUser> users = Lists.newArrayList(); - users.add(new AuthenticationUser(USERNAME, PASSWORD, "")); - SimpleAuthenticationPlugin authentication = new SimpleAuthenticationPlugin(users); - broker.setPlugins(new BrokerPlugin[]{authentication}); - broker.start(); context = new Context(); context.put(JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); context.put(JMSSourceConfiguration.PROVIDER_URL, BROKER_BIND_URL); context.put(JMSSourceConfiguration.DESTINATION_NAME, DESTINATION_NAME); - context.put(JMSSourceConfiguration.USERNAME, USERNAME); - context.put(JMSSourceConfiguration.PASSWORD_FILE, passwordFile.getAbsolutePath()); + + if (jmsUserName != null) { + File passwordFile = new File(baseDir, "password"); + Files.write(jmsPassword.getBytes(Charsets.UTF_8), passwordFile); + + AuthenticationUser jmsUser = new AuthenticationUser(jmsUserName, jmsPassword, ""); + List<AuthenticationUser> users = Collections.singletonList(jmsUser); + SimpleAuthenticationPlugin authentication = new SimpleAuthenticationPlugin(users); + broker.setPlugins(new BrokerPlugin[]{authentication}); + + context.put(JMSSourceConfiguration.USERNAME, jmsUserName); + context.put(JMSSourceConfiguration.PASSWORD_FILE, passwordFile.getAbsolutePath()); + } + + broker.start(); events = Lists.newArrayList(); source = new JMSSource(); @@ -130,8 +173,8 @@ public class TestIntegrationActiveMQ { } private void putQueue(List<String> events) throws Exception { - ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, - PASSWORD, BROKER_BIND_URL); + ConnectionFactory factory = new ActiveMQConnectionFactory(jmsUserName, jmsPassword, + BROKER_BIND_URL); Connection connection = factory.createConnection(); connection.start(); @@ -151,8 +194,8 @@ public class TestIntegrationActiveMQ { } private void putTopic(List<String> events) throws Exception { - ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, - PASSWORD, BROKER_BIND_URL); + ConnectionFactory factory = new ActiveMQConnectionFactory(jmsUserName, jmsPassword, + BROKER_BIND_URL); Connection connection = factory.createConnection(); connection.start();
