Repository: beam
Updated Branches:
  refs/heads/master 63dc08ee1 -> 8c1a577fe


[BEAM-1384] JmsIO: better errors during start, better testing

For BEAM-1384, the test has been failing because the error may be surfaced in 
either
start() or close(), depending on execution path. The underlying bug is that 
start
was implemented so that close might fail (this.connection would be set, but
this.connection.close would fail in a bad way).

Rewrite start() to fix the invariant needed by close() and also to provide
better error messages. Fixup the tests. Unfortunately, expectedException doesn't
really support testing causes and nested caused, so the rewrite dropped its use.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8ef78d72
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8ef78d72
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8ef78d72

Branch: refs/heads/master
Commit: 8ef78d72b903bded2c8db4125286e6f07e4a4a0b
Parents: 63dc08e
Author: Dan Halperin <[email protected]>
Authored: Tue Feb 7 09:35:35 2017 -0800
Committer: Dan Halperin <[email protected]>
Committed: Tue Feb 7 12:49:02 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  | 24 +++++++++----
 .../org/apache/beam/sdk/io/jms/JmsIOTest.java   | 37 +++++++++++++-------
 2 files changed, 42 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8ef78d72/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java 
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 270fe31..a935b56 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -405,14 +405,26 @@ public class JmsIO {
       Read spec = source.spec;
       ConnectionFactory connectionFactory = spec.getConnectionFactory();
       try {
+        Connection connection;
         if (spec.getUsername() != null) {
-          this.connection =
+          connection =
               connectionFactory.createConnection(spec.getUsername(), 
spec.getPassword());
         } else {
-          this.connection = connectionFactory.createConnection();
+          connection = connectionFactory.createConnection();
         }
-        this.connection.start();
+        connection.start();
+        this.connection = connection;
+      } catch (Exception e) {
+        throw new IOException("Error connecting to JMS", e);
+      }
+
+      try {
         this.session = this.connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      } catch (Exception e) {
+        throw new IOException("Error creating JMS session", e);
+      }
+
+      try {
         if (spec.getTopic() != null) {
           this.consumer =
               
this.session.createConsumer(this.session.createTopic(spec.getTopic()));
@@ -420,11 +432,11 @@ public class JmsIO {
           this.consumer =
               
this.session.createConsumer(this.session.createQueue(spec.getQueue()));
         }
-
-        return advance();
       } catch (Exception e) {
-        throw new IOException(e);
+        throw new IOException("Error creating JMS consumer", e);
       }
+
+      return advance();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/8ef78d72/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java 
b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index a06bba3..f07247d 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -17,14 +17,20 @@
  */
 package org.apache.beam.sdk.io.jms;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -49,7 +55,6 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -72,9 +77,6 @@ public class JmsIOTest {
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
 
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
   @Before
   public void startBroker() throws Exception {
     broker = new BrokerService();
@@ -105,26 +107,34 @@ public class JmsIOTest {
     broker.stop();
   }
 
+  private void runPipelineExpectingJmsConnectException(String innerMessage) {
+    try {
+      pipeline.run();
+      fail();
+    } catch (Exception e) {
+      Throwable cause = e.getCause();
+      assertThat(cause, instanceOf(IOException.class));
+      assertThat(cause.getMessage(), equalTo("Error connecting to JMS"));
+      Throwable innerCause = cause.getCause();
+      assertThat(innerCause, instanceOf(JMSException.class));
+      assertThat(innerCause.getMessage(), containsString(innerMessage));
+    }
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void testAuthenticationRequired() {
-    expectedException.expect(Exception.class);
-    expectedException.expectMessage("User name [null] or password is 
invalid.");
-
     pipeline.apply(
         JmsIO.read()
             .withConnectionFactory(connectionFactory)
             .withQueue(QUEUE));
 
-    pipeline.run();
+    runPipelineExpectingJmsConnectException("User name [null] or password is 
invalid.");
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testAuthenticationWithBadPassword() {
-    expectedException.expect(Exception.class);
-    expectedException.expectMessage("User name [" + USERNAME + "] or password 
is invalid.");
-
     pipeline.apply(
         JmsIO.read()
             .withConnectionFactory(connectionFactory)
@@ -132,7 +142,8 @@ public class JmsIOTest {
             .withUsername(USERNAME)
             .withPassword("BAD"));
 
-    pipeline.run();
+    runPipelineExpectingJmsConnectException(
+        "User name [" + USERNAME + "] or password is invalid.");
   }
 
   @Test

Reply via email to