Repository: beam Updated Branches: refs/heads/master 63dc08ee1 -> 8c1a577fe
[BEAM-1384] JmsIO: better errors during start, better testing For BEAM-1384, the test has been failing because the error may be surfaced in either start() or close(), depending on execution path. The underlying bug is that start was implemented so that close might fail (this.connection would be set, but this.connection.close would fail in a bad way). Rewrite start() to fix the invariant needed by close() and also to provide better error messages. Fixup the tests. Unfortunately, expectedException doesn't really support testing causes and nested caused, so the rewrite dropped its use. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8ef78d72 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8ef78d72 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8ef78d72 Branch: refs/heads/master Commit: 8ef78d72b903bded2c8db4125286e6f07e4a4a0b Parents: 63dc08e Author: Dan Halperin <[email protected]> Authored: Tue Feb 7 09:35:35 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Tue Feb 7 12:49:02 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 24 +++++++++---- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 37 +++++++++++++------- 2 files changed, 42 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8ef78d72/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 270fe31..a935b56 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -405,14 +405,26 @@ public class JmsIO { Read spec = source.spec; ConnectionFactory connectionFactory = spec.getConnectionFactory(); try { + Connection connection; if (spec.getUsername() != null) { - this.connection = + connection = connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); } else { - this.connection = connectionFactory.createConnection(); + connection = connectionFactory.createConnection(); } - this.connection.start(); + connection.start(); + this.connection = connection; + } catch (Exception e) { + throw new IOException("Error connecting to JMS", e); + } + + try { this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (Exception e) { + throw new IOException("Error creating JMS session", e); + } + + try { if (spec.getTopic() != null) { this.consumer = this.session.createConsumer(this.session.createTopic(spec.getTopic())); @@ -420,11 +432,11 @@ public class JmsIO { this.consumer = this.session.createConsumer(this.session.createQueue(spec.getQueue())); } - - return advance(); } catch (Exception e) { - throw new IOException(e); + throw new IOException("Error creating JMS consumer", e); } + + return advance(); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/8ef78d72/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index a06bba3..f07247d 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -17,14 +17,20 @@ */ package org.apache.beam.sdk.io.jms; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import java.io.IOException; import java.util.ArrayList; import java.util.List; - import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -49,7 +55,6 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -72,9 +77,6 @@ public class JmsIOTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Before public void startBroker() throws Exception { broker = new BrokerService(); @@ -105,26 +107,34 @@ public class JmsIOTest { broker.stop(); } + private void runPipelineExpectingJmsConnectException(String innerMessage) { + try { + pipeline.run(); + fail(); + } catch (Exception e) { + Throwable cause = e.getCause(); + assertThat(cause, instanceOf(IOException.class)); + assertThat(cause.getMessage(), equalTo("Error connecting to JMS")); + Throwable innerCause = cause.getCause(); + assertThat(innerCause, instanceOf(JMSException.class)); + assertThat(innerCause.getMessage(), containsString(innerMessage)); + } + } + @Test @Category(NeedsRunner.class) public void testAuthenticationRequired() { - expectedException.expect(Exception.class); - expectedException.expectMessage("User name [null] or password is invalid."); - pipeline.apply( JmsIO.read() .withConnectionFactory(connectionFactory) .withQueue(QUEUE)); - pipeline.run(); + runPipelineExpectingJmsConnectException("User name [null] or password is invalid."); } @Test @Category(NeedsRunner.class) public void testAuthenticationWithBadPassword() { - expectedException.expect(Exception.class); - expectedException.expectMessage("User name [" + USERNAME + "] or password is invalid."); - pipeline.apply( JmsIO.read() .withConnectionFactory(connectionFactory) @@ -132,7 +142,8 @@ public class JmsIOTest { .withUsername(USERNAME) .withPassword("BAD")); - pipeline.run(); + runPipelineExpectingJmsConnectException( + "User name [" + USERNAME + "] or password is invalid."); } @Test
