Repository: beam Updated Branches: refs/heads/master 0b866d4d7 -> ad90d91cb
[BEAM-837] Add authentication support in JmsIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97060bda Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97060bda Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97060bda Branch: refs/heads/master Commit: 97060bda8dea4fee6d62a189f34d6cd3a3b34aad Parents: 0b866d4 Author: Jean-Baptiste Onofré <[email protected]> Authored: Fri Jan 6 17:50:03 2017 +0100 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Thu Feb 2 10:02:27 2017 +0100 ---------------------------------------------------------------------- sdks/java/io/jms/pom.xml | 6 ++ .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 68 +++++++++++++++-- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 78 +++++++++++++++++--- 3 files changed, 137 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/97060bda/sdks/java/io/jms/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml index 80d1f6c..eacde76 100644 --- a/sdks/java/io/jms/pom.xml +++ b/sdks/java/io/jms/pom.xml @@ -97,6 +97,12 @@ </dependency> <dependency> <groupId>org.apache.activemq</groupId> + <artifactId>activemq-jaas</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> <artifactId>activemq-kahadb-store</artifactId> <version>${activemq.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/97060bda/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index b6de26a..c1f1cb4 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -129,6 +129,8 @@ public class JmsIO { @Nullable abstract ConnectionFactory getConnectionFactory(); @Nullable abstract String getQueue(); @Nullable abstract String getTopic(); + @Nullable abstract String getUsername(); + @Nullable abstract String getPassword(); abstract long getMaxNumRecords(); @Nullable abstract Duration getMaxReadTime(); @@ -139,6 +141,8 @@ public class JmsIO { abstract Builder setConnectionFactory(ConnectionFactory connectionFactory); abstract Builder setQueue(String queue); abstract Builder setTopic(String topic); + abstract Builder setUsername(String username); + abstract Builder setPassword(String password); abstract Builder setMaxNumRecords(long maxNumRecords); abstract Builder setMaxReadTime(Duration maxReadTime); abstract Read build(); @@ -211,6 +215,24 @@ public class JmsIO { } /** + * Define the username to connect to the JMS broker (authenticated). + */ + public Read withUsername(String username) { + checkArgument(username != null, "JmsIO.read().withUsername(username) called with null " + + "username"); + return builder().setUsername(username).build(); + } + + /** + * Define the password to connect to the JMS broker (authenticated). + */ + public Read withPassword(String password) { + checkArgument(password != null, "JmsIO.read().withPassword(password) called with null " + + "password"); + return builder().setPassword(password).build(); + } + + /** * Define the max number of records that the source will read. Using a max number of records * different from {@code Long.MAX_VALUE} means the source will be {@code Bounded}, and will * stop once the max number of records read is reached. @@ -369,17 +391,23 @@ public class JmsIO { @Override public boolean start() throws IOException { - ConnectionFactory connectionFactory = source.spec.getConnectionFactory(); + Read spec = source.spec; + ConnectionFactory connectionFactory = spec.getConnectionFactory(); try { - this.connection = connectionFactory.createConnection(); + if (spec.getUsername() != null) { + this.connection = + connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); + } else { + this.connection = connectionFactory.createConnection(); + } this.connection.start(); this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - if (source.spec.getTopic() != null) { + if (spec.getTopic() != null) { this.consumer = - this.session.createConsumer(this.session.createTopic(source.spec.getTopic())); + this.session.createConsumer(this.session.createTopic(spec.getTopic())); } else { this.consumer = - this.session.createConsumer(this.session.createQueue(source.spec.getQueue())); + this.session.createConsumer(this.session.createQueue(spec.getQueue())); } return advance(); @@ -495,6 +523,8 @@ public class JmsIO { @Nullable abstract ConnectionFactory getConnectionFactory(); @Nullable abstract String getQueue(); @Nullable abstract String getTopic(); + @Nullable abstract String getUsername(); + @Nullable abstract String getPassword(); abstract Builder builder(); @@ -503,6 +533,8 @@ public class JmsIO { abstract Builder setConnectionFactory(ConnectionFactory connectionFactory); abstract Builder setQueue(String queue); abstract Builder setTopic(String topic); + abstract Builder setUsername(String username); + abstract Builder setPassword(String password); abstract Write build(); } @@ -572,6 +604,24 @@ public class JmsIO { return builder().setTopic(topic).build(); } + /** + * Define the username to connect to the JMS broker (authenticated). + */ + public Write withUsername(String username) { + checkArgument(username != null, "JmsIO.write().withUsername(username) called with null " + + "username"); + return builder().setUsername(username).build(); + } + + /** + * Define the password to connect to the JMS broker (authenticated). + */ + public Write withPassword(String password) { + checkArgument(password != null, "JmsIO.write().withPassword(password) called with null " + + "password"); + return builder().setPassword(password).build(); + } + @Override public PDone expand(PCollection<String> input) { input.apply(ParDo.of(new WriterFn(this))); @@ -601,7 +651,13 @@ public class JmsIO { @StartBundle public void startBundle(Context c) throws Exception { if (producer == null) { - this.connection = spec.getConnectionFactory().createConnection(); + if (spec.getUsername() != null) { + this.connection = + spec.getConnectionFactory() + .createConnection(spec.getUsername(), spec.getPassword()); + } else { + this.connection = spec.getConnectionFactory().createConnection(); + } this.connection.start(); // false means we don't use JMS transaction. this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/beam/blob/97060bda/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 7259ce8..c756cd0 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io.jms; import java.util.ArrayList; +import java.util.List; + import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; @@ -26,7 +28,10 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -40,6 +45,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -49,15 +55,21 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class JmsIOTest { - private static final String BROKER_URL = "vm://localhost"; + private static final String USERNAME = "test_user"; + private static final String PASSWORD = "test_password"; + private static final String QUEUE = "test_queue"; + private BrokerService broker; private ConnectionFactory connectionFactory; @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Before public void startBroker() throws Exception { broker = new BrokerService(); @@ -65,6 +77,18 @@ public class JmsIOTest { broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); broker.addConnector(BROKER_URL); broker.setBrokerName("localhost"); + broker.setPopulateJMSXUserID(true); + broker.setUseAuthenticatedPrincipalForJMSXUserID(true); + + // enable authentication + List<AuthenticationUser> users = new ArrayList<>(); + // username and password to use to connect to the broker. + // This user has users privilege (able to browse, consume, produce, list destinations) + users.add(new AuthenticationUser(USERNAME, PASSWORD, "users")); + SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users); + BrokerPlugin[] plugins = new BrokerPlugin[]{ plugin }; + broker.setPlugins(plugins); + broker.start(); // create JMS connection factory @@ -78,12 +102,42 @@ public class JmsIOTest { @Test @Category(NeedsRunner.class) + public void testAuthenticationRequired() { + expectedException.expect(Exception.class); + expectedException.expectMessage("User name [null] or password is invalid."); + + pipeline.apply( + JmsIO.read() + .withConnectionFactory(connectionFactory) + .withQueue(QUEUE)); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testAuthenticationWithBadPassword() { + expectedException.expect(Exception.class); + expectedException.expectMessage("User name [" + USERNAME + "] or password is invalid."); + + pipeline.apply( + JmsIO.read() + .withConnectionFactory(connectionFactory) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword("BAD")); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) public void testReadMessages() throws Exception { // produce message - Connection connection = connectionFactory.createConnection(); + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue("test")); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE)); TextMessage message = session.createTextMessage("This Is A Test"); producer.send(message); producer.send(message); @@ -99,7 +153,9 @@ public class JmsIOTest { PCollection<JmsRecord> output = pipeline.apply( JmsIO.read() .withConnectionFactory(connectionFactory) - .withQueue("test") + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD) .withMaxNumRecords(5)); PAssert @@ -107,9 +163,9 @@ public class JmsIOTest { .isEqualTo(new Long(5)); pipeline.run(); - connection = connectionFactory.createConnection(); + connection = connectionFactory.createConnection(USERNAME, PASSWORD); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue("test")); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); Message msg = consumer.receiveNoWait(); Assert.assertNull(msg); } @@ -123,14 +179,18 @@ public class JmsIOTest { data.add("Message " + i); } pipeline.apply(Create.of(data)) - .apply(JmsIO.write().withConnectionFactory(connectionFactory).withQueue("test")); + .apply(JmsIO.write() + .withConnectionFactory(connectionFactory) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD)); pipeline.run(); - Connection connection = connectionFactory.createConnection(); + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue("test")); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); int count = 0; while (consumer.receive(1000) != null) { count++;
