Repository: incubator-beam
Updated Branches:
  refs/heads/master 1c9bf8d66 -> 1e148cd7d


[BEAM-716] Use AutoValue in JmsIO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/caf1c720
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/caf1c720
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/caf1c720

Branch: refs/heads/master
Commit: caf1c720f66de4d502f79b6c11c64b49c53329b0
Parents: 1c9bf8d
Author: Jean-Baptiste Onofré <jbono...@apache.org>
Authored: Sun Dec 11 07:43:41 2016 +0100
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Mon Dec 19 07:24:00 2016 +0100

----------------------------------------------------------------------
 sdks/java/io/jms/pom.xml                        |   7 +
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  | 321 +++++++++++++------
 2 files changed, 228 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/caf1c720/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
index bca0152..b88254e 100644
--- a/sdks/java/io/jms/pom.xml
+++ b/sdks/java/io/jms/pom.xml
@@ -81,6 +81,13 @@
       <artifactId>jsr305</artifactId>
     </dependency>
 
+    <!-- compile dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.activemq</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/caf1c720/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java 
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 24fa67d..76dee67 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.jms;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -101,37 +102,148 @@ public class JmsIO {
   private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
 
   public static Read read() {
-    return new Read(null, null, null, Long.MAX_VALUE, null);
+    return new 
AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build();
   }
 
   public static Write write() {
-    return new Write(null, null, null);
+    return new AutoValue_JmsIO_Write.Builder().build();
   }
 
   /**
    * A {@link PTransform} to read from a JMS destination. See {@link JmsIO} 
for more
    * information on usage and configuration.
    */
-  public static class Read extends PTransform<PBegin, PCollection<JmsRecord>> {
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, 
PCollection<JmsRecord>> {
 
+    /**
+     * NB: According to 
http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html
+     * "It is expected that JMS providers will provide the tools an 
administrator needs to create
+     * and configure administered objects in a JNDI namespace. JMS provider 
implementations of
+     * administered objects should be both javax.jndi.Referenceable and 
java.io.Serializable so
+     * that they can be stored in all JNDI naming contexts. In addition, it is 
recommended that
+     * these implementations follow the JavaBeansTM design patterns."
+     *
+     * <p>So, a {@link ConnectionFactory} implementation is serializable.
+     */
+    @Nullable abstract ConnectionFactory getConnectionFactory();
+    @Nullable abstract String getQueue();
+    @Nullable abstract String getTopic();
+    abstract long getMaxNumRecords();
+    @Nullable abstract Duration getMaxReadTime();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConnectionFactory(ConnectionFactory 
connectionFactory);
+      abstract Builder setQueue(String queue);
+      abstract Builder setTopic(String topic);
+      abstract Builder setMaxNumRecords(long maxNumRecords);
+      abstract Builder setMaxReadTime(Duration maxReadTime);
+      abstract Read build();
+    }
+
+    /**
+     * <p>Specify the JMS connection factory to connect to the JMS broker.
+     *
+     * <p>For instance:
+     *
+     * <pre>
+     *   {@code
+     *    
pipeline.apply(JmsIO.read().withConnectionFactory(myConnectionFactory)
+     *   }
+     * </pre>
+     *
+     * @param connectionFactory The JMS {@link ConnectionFactory}.
+     * @return The corresponding {@link JmsIO.Read}.
+     */
     public Read withConnectionFactory(ConnectionFactory connectionFactory) {
-      return new Read(connectionFactory, queue, topic, maxNumRecords, 
maxReadTime);
+      return builder().setConnectionFactory(connectionFactory).build();
     }
 
+    /**
+     * <p>Specify the JMS queue destination name where to read messages from. 
The
+     * {@link JmsIO.Read} acts as a consumer on the queue.
+     *
+     * <p>This method is exclusive with {@link JmsIO.Read#withTopic(String)}. 
The user has to
+     * specify a destination: queue or topic.
+     *
+     * <p>For instance:
+     *
+     * <pre>
+     *   {@code
+     *    pipeline.apply(JmsIO.read().withQueue("my-queue")
+     *   }
+     * </pre>
+     *
+     * @param queue The JMS queue name where to read messages from.
+     * @return The corresponding {@link JmsIO.Read}.
+     */
     public Read withQueue(String queue) {
-      return new Read(connectionFactory, queue, topic, maxNumRecords, 
maxReadTime);
+      return builder().setQueue(queue).build();
     }
 
+    /**
+     * <p>Specify the JMS topic destination name where to receive messages 
from. The
+     * {@link JmsIO.Read} acts as a subscriber on the topic.
+     *
+     * <p>This method is exclusive with {@link JmsIO.Read#withQueue(String)}. 
The user has to
+     * specify a destination: queue or topic.
+     *
+     * <p>For instance:
+     *
+     * <pre>
+     *   {@code
+     *    pipeline.apply(JmsIO.read().withTopic("my-topic")
+     *   }
+     * </pre>
+     *
+     * @param topic The JMS topic name.
+     * @return The corresponding {@link JmsIO.Read}.
+     */
     public Read withTopic(String topic) {
-      return new Read(connectionFactory, queue, topic, maxNumRecords, 
maxReadTime);
+      return builder().setTopic(topic).build();
     }
 
+    /**
+     * <p>Define the max number of records that the source will read. Using a 
max number of records
+     * different from {@code Long.MAX_VALUE} means the source will be {@code 
Bounded}, and will
+     * stop once the max number of records read is reached.
+     *
+     * <p>For instance:
+     *
+     * <pre>
+     *   {@code
+     *    pipeline.apply(JmsIO.read().withNumRecords(1000)
+     *   }
+     * </pre>
+     *
+     * @param maxNumRecords The max number of records to read from the JMS 
destination.
+     * @return The corresponding {@link JmsIO.Read}.
+     */
     public Read withMaxNumRecords(long maxNumRecords) {
-      return new Read(connectionFactory, queue, topic, maxNumRecords, 
maxReadTime);
+      return builder().setMaxNumRecords(maxNumRecords).build();
     }
 
+    /**
+     * <p>Define the max read time that the source will read. Using a non null 
max read time
+     * duration means the source will be {@code Bounded}, and will stop once 
the max read time is
+     * reached.
+     *
+     * <p>For instance:
+     *
+     * <pre>
+     *   {@code
+     *    pipeline.apply(JmsIO.read().withMaxReadTime(Duration.minutes(10))
+     *   }
+     * </pre>
+     *
+     * @param maxReadTime The max read time duration.
+     * @return The corresponding {@link JmsIO.Read}.
+     */
     public Read withMaxReadTime(Duration maxReadTime) {
-      return new Read(connectionFactory, queue, topic, maxNumRecords, 
maxReadTime);
+      return builder().setMaxReadTime(maxReadTime).build();
     }
 
     @Override
@@ -141,10 +253,10 @@ public class JmsIO {
 
       PTransform<PBegin, PCollection<JmsRecord>> transform = unbounded;
 
-      if (maxNumRecords != Long.MAX_VALUE) {
-        transform = unbounded.withMaxNumRecords(maxNumRecords);
-      } else if (maxReadTime != null) {
-        transform = unbounded.withMaxReadTime(maxReadTime);
+      if (getMaxNumRecords() != Long.MAX_VALUE) {
+        transform = unbounded.withMaxNumRecords(getMaxNumRecords());
+      } else if (getMaxReadTime() != null) {
+        transform = unbounded.withMaxReadTime(getMaxReadTime());
       }
 
       return input.getPipeline().apply(transform);
@@ -152,65 +264,29 @@ public class JmsIO {
 
     @Override
     public void validate(PBegin input) {
-      checkNotNull(connectionFactory, "ConnectionFactory not specified");
-      checkArgument((queue != null || topic != null), "Either queue or topic 
not specified");
+      checkNotNull(getConnectionFactory(), "ConnectionFactory not specified");
+      checkArgument((getQueue() != null || getTopic() != null), "Either queue 
or topic not "
+          + "specified");
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-
-      builder.addIfNotNull(DisplayData.item("queue", queue));
-      builder.addIfNotNull(DisplayData.item("topic", topic));
+      builder.addIfNotNull(DisplayData.item("queue", getQueue()));
+      builder.addIfNotNull(DisplayData.item("topic", getTopic()));
 
     }
 
     
///////////////////////////////////////////////////////////////////////////////////////
 
     /**
-     * NB: According to 
http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html
-     * "It is expected that JMS providers will provide the tools an 
administrator needs to create
-     * and configure administered objects in a JNDI namespace. JMS provider 
implementations of
-     * administered objects should be both javax.jndi.Referenceable and 
java.io.Serializable so
-     * that they can be stored in all JNDI naming contexts. In addition, it is 
recommended that
-     * these implementations follow the JavaBeansTM design patterns."
-     *
-     * <p>So, a {@link ConnectionFactory} implementation is serializable.
-     */
-    protected ConnectionFactory connectionFactory;
-    @Nullable
-    protected String queue;
-    @Nullable
-    protected String topic;
-    protected long maxNumRecords;
-    protected Duration maxReadTime;
-
-    private Read(
-        ConnectionFactory connectionFactory,
-        String queue,
-        String topic,
-        long maxNumRecords,
-        Duration maxReadTime) {
-      super("JmsIO.Read");
-
-      this.connectionFactory = connectionFactory;
-      this.queue = queue;
-      this.topic = topic;
-      this.maxNumRecords = maxNumRecords;
-      this.maxReadTime = maxReadTime;
-    }
-
-    /**
      * Creates an {@link UnboundedSource UnboundedSource&lt;JmsRecord, ?&gt;} 
with the configuration
      * in {@link Read}. Primary use case is unit tests, should not be used in 
an
      * application.
      */
     @VisibleForTesting
     UnboundedSource<JmsRecord, JmsCheckpointMark> createSource() {
-      return new UnboundedJmsSource(
-          connectionFactory,
-          queue,
-          topic);
+      return new UnboundedJmsSource(this);
     }
 
   }
@@ -219,17 +295,10 @@ public class JmsIO {
 
   private static class UnboundedJmsSource extends UnboundedSource<JmsRecord, 
JmsCheckpointMark> {
 
-    private final ConnectionFactory connectionFactory;
-    private final String queue;
-    private final String topic;
+    private final Read spec;
 
-    public UnboundedJmsSource(
-        ConnectionFactory connectionFactory,
-        String queue,
-        String topic) {
-      this.connectionFactory = connectionFactory;
-      this.queue = queue;
-      this.topic = topic;
+    public UnboundedJmsSource(Read spec) {
+      this.spec = spec;
     }
 
     @Override
@@ -237,7 +306,7 @@ public class JmsIO {
         int desiredNumSplits, PipelineOptions options) throws Exception {
       List<UnboundedJmsSource> sources = new ArrayList<>();
       for (int i = 0; i < desiredNumSplits; i++) {
-        sources.add(new UnboundedJmsSource(connectionFactory, queue, topic));
+        sources.add(new UnboundedJmsSource(spec));
       }
       return sources;
     }
@@ -250,8 +319,7 @@ public class JmsIO {
 
     @Override
     public void validate() {
-      checkNotNull(connectionFactory, "ConnectionFactory is not defined");
-      checkArgument((queue != null || topic != null), "Either queue or topic 
is not defined");
+      spec.validate(null);
     }
 
     @Override
@@ -291,15 +359,17 @@ public class JmsIO {
 
     @Override
     public boolean start() throws IOException {
-      ConnectionFactory connectionFactory = source.connectionFactory;
+      ConnectionFactory connectionFactory = source.spec.getConnectionFactory();
       try {
         this.connection = connectionFactory.createConnection();
         this.connection.start();
         this.session = this.connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        if (source.topic != null) {
-          this.consumer = 
this.session.createConsumer(this.session.createTopic(source.topic));
+        if (source.spec.getTopic() != null) {
+          this.consumer =
+              
this.session.createConsumer(this.session.createTopic(source.spec.getTopic()));
         } else {
-          this.consumer = 
this.session.createConsumer(this.session.createQueue(source.queue));
+          this.consumer =
+              
this.session.createConsumer(this.session.createQueue(source.spec.getQueue()));
         }
 
         return advance();
@@ -409,70 +479,122 @@ public class JmsIO {
    * A {@link PTransform} to write to a JMS queue. See {@link JmsIO} for
    * more information on usage and configuration.
    */
-  public static class Write extends PTransform<PCollection<String>, PDone> {
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<String>, 
PDone> {
 
-    protected ConnectionFactory connectionFactory;
-    protected String queue;
-    protected String topic;
+    @Nullable abstract ConnectionFactory getConnectionFactory();
+    @Nullable abstract String getQueue();
+    @Nullable abstract String getTopic();
 
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConnectionFactory(ConnectionFactory 
connectionFactory);
+      abstract Builder setQueue(String queue);
+      abstract Builder setTopic(String topic);
+      abstract Write build();
+    }
+
+    /**
+     * <p>Specify the JMS connection factory to connect to the JMS broker.
+     *
+     * <p>For instance:
+     *
+     * <pre>
+     *   {@code
+     *    .apply(JmsIO.write().withConnectionFactory(myConnectionFactory)
+     *   }
+     * </pre>
+     *
+     * @param connectionFactory The JMS {@link ConnectionFactory}.
+     * @return The corresponding {@link JmsIO.Read}.
+     */
     public Write withConnectionFactory(ConnectionFactory connectionFactory) {
-      return new Write(connectionFactory, queue, topic);
+      return builder().setConnectionFactory(connectionFactory).build();
     }
 
+    /**
+     * <p>Specify the JMS queue destination name where to send messages to. The
+     * {@link JmsIO.Write} acts as a producer on the queue.
+     *
+     * <p>This method is exclusive with {@link JmsIO.Write#withTopic(String)}. 
The user has to
+     * specify a destination: queue or topic.
+     *
+     * <p>For instance:
+     *
+     * <pre>
+     *   {@code
+     *    .apply(JmsIO.write().withQueue("my-queue")
+     *   }
+     * </pre>
+     *
+     * @param queue The JMS queue name where to send messages to.
+     * @return The corresponding {@link JmsIO.Read}.
+     */
     public Write withQueue(String queue) {
-      return new Write(connectionFactory, queue, topic);
+      return builder().setQueue(queue).build();
     }
 
+    /**
+     * <p>Specify the JMS topic destination name where to send messages to. The
+     * {@link JmsIO.Read} acts as a publisher on the topic.
+     *
+     * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)}. 
The user has to
+     * specify a destination: queue or topic.
+     *
+     * <p>For instance:
+     *
+     * <pre>
+     *   {@code
+     *    .apply(JmsIO.write().withTopic("my-topic")
+     *   }
+     * </pre>
+     *
+     * @param topic The JMS topic name.
+     * @return The corresponding {@link JmsIO.Read}.
+     */
     public Write withTopic(String topic) {
-      return new Write(connectionFactory, queue, topic);
-    }
-
-    private Write(ConnectionFactory connectionFactory, String queue, String 
topic) {
-      this.connectionFactory = connectionFactory;
-      this.queue = queue;
-      this.topic = topic;
+      return builder().setTopic(topic).build();
     }
 
     @Override
     public PDone expand(PCollection<String> input) {
-      input.apply(ParDo.of(new JmsWriter(connectionFactory, queue, topic)));
+      input.apply(ParDo.of(new WriterFn(this)));
       return PDone.in(input.getPipeline());
     }
 
     @Override
     public void validate(PCollection<String> input) {
-      checkNotNull(connectionFactory, "ConnectionFactory is not defined");
-      checkArgument((queue != null || topic != null), "Either queue or topic 
is required");
+      checkNotNull(getConnectionFactory(), "ConnectionFactory is not defined");
+      checkArgument((getQueue() != null || getTopic() != null), "Either queue 
or topic is "
+          + "required");
     }
 
-    private static class JmsWriter extends DoFn<String, Void> {
+    private static class WriterFn extends DoFn<String, Void> {
 
-      private ConnectionFactory connectionFactory;
-      private String queue;
-      private String topic;
+      private Write spec;
 
       private Connection connection;
       private Session session;
       private MessageProducer producer;
 
-      public JmsWriter(ConnectionFactory connectionFactory, String queue, 
String topic) {
-        this.connectionFactory = connectionFactory;
-        this.queue = queue;
-        this.topic = topic;
+      public WriterFn(Write spec) {
+        this.spec = spec;
       }
 
       @StartBundle
       public void startBundle(Context c) throws Exception {
         if (producer == null) {
-          this.connection = connectionFactory.createConnection();
+          this.connection = spec.getConnectionFactory().createConnection();
           this.connection.start();
           // false means we don't use JMS transaction.
           this.session = this.connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
           Destination destination;
-          if (queue != null) {
-            destination = session.createQueue(queue);
+          if (spec.getQueue() != null) {
+            destination = session.createQueue(spec.getQueue());
           } else {
-            destination = session.createTopic(topic);
+            destination = session.createTopic(spec.getTopic());
           }
           this.producer = this.session.createProducer(destination);
         }
@@ -481,7 +603,6 @@ public class JmsIO {
       @ProcessElement
       public void processElement(ProcessContext ctx) throws Exception {
         String value = ctx.element();
-
         try {
           TextMessage message = session.createTextMessage(value);
           producer.send(message);

Reply via email to