Repository: beam
Updated Branches:
  refs/heads/master 0b866d4d7 -> ad90d91cb


[BEAM-837] Add authentication support in JmsIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97060bda
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97060bda
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97060bda

Branch: refs/heads/master
Commit: 97060bda8dea4fee6d62a189f34d6cd3a3b34aad
Parents: 0b866d4
Author: Jean-Baptiste Onofré <[email protected]>
Authored: Fri Jan 6 17:50:03 2017 +0100
Committer: Jean-Baptiste Onofré <[email protected]>
Committed: Thu Feb 2 10:02:27 2017 +0100

----------------------------------------------------------------------
 sdks/java/io/jms/pom.xml                        |  6 ++
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  | 68 +++++++++++++++--
 .../org/apache/beam/sdk/io/jms/JmsIOTest.java   | 78 +++++++++++++++++---
 3 files changed, 137 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/97060bda/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
index 80d1f6c..eacde76 100644
--- a/sdks/java/io/jms/pom.xml
+++ b/sdks/java/io/jms/pom.xml
@@ -97,6 +97,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-jaas</artifactId>
+      <version>${activemq.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-kahadb-store</artifactId>
       <version>${activemq.version}</version>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/97060bda/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java 
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index b6de26a..c1f1cb4 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -129,6 +129,8 @@ public class JmsIO {
     @Nullable abstract ConnectionFactory getConnectionFactory();
     @Nullable abstract String getQueue();
     @Nullable abstract String getTopic();
+    @Nullable abstract String getUsername();
+    @Nullable abstract String getPassword();
     abstract long getMaxNumRecords();
     @Nullable abstract Duration getMaxReadTime();
 
@@ -139,6 +141,8 @@ public class JmsIO {
       abstract Builder setConnectionFactory(ConnectionFactory 
connectionFactory);
       abstract Builder setQueue(String queue);
       abstract Builder setTopic(String topic);
+      abstract Builder setUsername(String username);
+      abstract Builder setPassword(String password);
       abstract Builder setMaxNumRecords(long maxNumRecords);
       abstract Builder setMaxReadTime(Duration maxReadTime);
       abstract Read build();
@@ -211,6 +215,24 @@ public class JmsIO {
     }
 
     /**
+     * Define the username to connect to the JMS broker (authenticated).
+     */
+    public Read withUsername(String username) {
+      checkArgument(username != null, "JmsIO.read().withUsername(username) 
called with null "
+          + "username");
+      return builder().setUsername(username).build();
+    }
+
+    /**
+     * Define the password to connect to the JMS broker (authenticated).
+     */
+    public Read withPassword(String password) {
+      checkArgument(password != null, "JmsIO.read().withPassword(password) 
called with null "
+          + "password");
+      return builder().setPassword(password).build();
+    }
+
+    /**
      * Define the max number of records that the source will read. Using a max 
number of records
      * different from {@code Long.MAX_VALUE} means the source will be {@code 
Bounded}, and will
      * stop once the max number of records read is reached.
@@ -369,17 +391,23 @@ public class JmsIO {
 
     @Override
     public boolean start() throws IOException {
-      ConnectionFactory connectionFactory = source.spec.getConnectionFactory();
+      Read spec = source.spec;
+      ConnectionFactory connectionFactory = spec.getConnectionFactory();
       try {
-        this.connection = connectionFactory.createConnection();
+        if (spec.getUsername() != null) {
+          this.connection =
+              connectionFactory.createConnection(spec.getUsername(), 
spec.getPassword());
+        } else {
+          this.connection = connectionFactory.createConnection();
+        }
         this.connection.start();
         this.session = this.connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        if (source.spec.getTopic() != null) {
+        if (spec.getTopic() != null) {
           this.consumer =
-              
this.session.createConsumer(this.session.createTopic(source.spec.getTopic()));
+              
this.session.createConsumer(this.session.createTopic(spec.getTopic()));
         } else {
           this.consumer =
-              
this.session.createConsumer(this.session.createQueue(source.spec.getQueue()));
+              
this.session.createConsumer(this.session.createQueue(spec.getQueue()));
         }
 
         return advance();
@@ -495,6 +523,8 @@ public class JmsIO {
     @Nullable abstract ConnectionFactory getConnectionFactory();
     @Nullable abstract String getQueue();
     @Nullable abstract String getTopic();
+    @Nullable abstract String getUsername();
+    @Nullable abstract String getPassword();
 
     abstract Builder builder();
 
@@ -503,6 +533,8 @@ public class JmsIO {
       abstract Builder setConnectionFactory(ConnectionFactory 
connectionFactory);
       abstract Builder setQueue(String queue);
       abstract Builder setTopic(String topic);
+      abstract Builder setUsername(String username);
+      abstract Builder setPassword(String password);
       abstract Write build();
     }
 
@@ -572,6 +604,24 @@ public class JmsIO {
       return builder().setTopic(topic).build();
     }
 
+    /**
+     * Define the username to connect to the JMS broker (authenticated).
+     */
+    public Write withUsername(String username) {
+      checkArgument(username != null,  "JmsIO.write().withUsername(username) 
called with null "
+          + "username");
+      return builder().setUsername(username).build();
+    }
+
+    /**
+     * Define the password to connect to the JMS broker (authenticated).
+     */
+    public Write withPassword(String password) {
+      checkArgument(password != null, "JmsIO.write().withPassword(password) 
called with null "
+          + "password");
+      return builder().setPassword(password).build();
+    }
+
     @Override
     public PDone expand(PCollection<String> input) {
       input.apply(ParDo.of(new WriterFn(this)));
@@ -601,7 +651,13 @@ public class JmsIO {
       @StartBundle
       public void startBundle(Context c) throws Exception {
         if (producer == null) {
-          this.connection = spec.getConnectionFactory().createConnection();
+          if (spec.getUsername() != null) {
+            this.connection =
+                spec.getConnectionFactory()
+                    .createConnection(spec.getUsername(), spec.getPassword());
+          } else {
+            this.connection = spec.getConnectionFactory().createConnection();
+          }
           this.connection.start();
           // false means we don't use JMS transaction.
           this.session = this.connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);

http://git-wip-us.apache.org/repos/asf/beam/blob/97060bda/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java 
b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 7259ce8..c756cd0 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.io.jms;
 
 import java.util.ArrayList;
+import java.util.List;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Message;
@@ -26,7 +28,10 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -40,6 +45,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -49,15 +55,21 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class JmsIOTest {
 
-
   private static final String BROKER_URL = "vm://localhost";
 
+  private static final String USERNAME = "test_user";
+  private static final String PASSWORD = "test_password";
+  private static final String QUEUE = "test_queue";
+
   private BrokerService broker;
   private ConnectionFactory connectionFactory;
 
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Before
   public void startBroker() throws Exception {
     broker = new BrokerService();
@@ -65,6 +77,18 @@ public class JmsIOTest {
     broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
     broker.addConnector(BROKER_URL);
     broker.setBrokerName("localhost");
+    broker.setPopulateJMSXUserID(true);
+    broker.setUseAuthenticatedPrincipalForJMSXUserID(true);
+
+    // enable authentication
+    List<AuthenticationUser> users = new ArrayList<>();
+    // username and password to use to connect to the broker.
+    // This user has users privilege (able to browse, consume, produce, list 
destinations)
+    users.add(new AuthenticationUser(USERNAME, PASSWORD, "users"));
+    SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users);
+    BrokerPlugin[] plugins = new BrokerPlugin[]{ plugin };
+    broker.setPlugins(plugins);
+
     broker.start();
 
     // create JMS connection factory
@@ -78,12 +102,42 @@ public class JmsIOTest {
 
   @Test
   @Category(NeedsRunner.class)
+  public void testAuthenticationRequired() {
+    expectedException.expect(Exception.class);
+    expectedException.expectMessage("User name [null] or password is 
invalid.");
+
+    pipeline.apply(
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withQueue(QUEUE));
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAuthenticationWithBadPassword() {
+    expectedException.expect(Exception.class);
+    expectedException.expectMessage("User name [" + USERNAME + "] or password 
is invalid.");
+
+    pipeline.apply(
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withQueue(QUEUE)
+            .withUsername(USERNAME)
+            .withPassword("BAD"));
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
   public void testReadMessages() throws Exception {
 
     // produce message
-    Connection connection = connectionFactory.createConnection();
+    Connection connection = connectionFactory.createConnection(USERNAME, 
PASSWORD);
     Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-    MessageProducer producer = 
session.createProducer(session.createQueue("test"));
+    MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE));
     TextMessage message = session.createTextMessage("This Is A Test");
     producer.send(message);
     producer.send(message);
@@ -99,7 +153,9 @@ public class JmsIOTest {
     PCollection<JmsRecord> output = pipeline.apply(
         JmsIO.read()
             .withConnectionFactory(connectionFactory)
-            .withQueue("test")
+            .withQueue(QUEUE)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
             .withMaxNumRecords(5));
 
     PAssert
@@ -107,9 +163,9 @@ public class JmsIOTest {
         .isEqualTo(new Long(5));
     pipeline.run();
 
-    connection = connectionFactory.createConnection();
+    connection = connectionFactory.createConnection(USERNAME, PASSWORD);
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-    MessageConsumer consumer = 
session.createConsumer(session.createQueue("test"));
+    MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE));
     Message msg = consumer.receiveNoWait();
     Assert.assertNull(msg);
   }
@@ -123,14 +179,18 @@ public class JmsIOTest {
       data.add("Message " + i);
     }
     pipeline.apply(Create.of(data))
-        
.apply(JmsIO.write().withConnectionFactory(connectionFactory).withQueue("test"));
+        .apply(JmsIO.write()
+            .withConnectionFactory(connectionFactory)
+            .withQueue(QUEUE)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD));
 
     pipeline.run();
 
-    Connection connection = connectionFactory.createConnection();
+    Connection connection = connectionFactory.createConnection(USERNAME, 
PASSWORD);
     connection.start();
     Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-    MessageConsumer consumer = 
session.createConsumer(session.createQueue("test"));
+    MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE));
     int count = 0;
     while (consumer.receive(1000) != null) {
       count++;

Reply via email to