Repository: beam Updated Branches: refs/heads/master d6ac39a23 -> 13c06bf79
[BEAM-2153] Move connection management in JmsIO.write() to setup/teardown methods Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5228f7d1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5228f7d1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5228f7d1 Branch: refs/heads/master Commit: 5228f7d1f48bf39c0f97c28ab64064089e5f8df9 Parents: d6ac39a Author: Borisa Zivkovic <[email protected]> Authored: Fri May 12 14:03:02 2017 +0100 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Fri May 12 16:28:30 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5228f7d1/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 4493e56..b8355ad 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -673,8 +673,8 @@ public class JmsIO { this.spec = spec; } - @StartBundle - public void startBundle() throws Exception { + @Setup + public void setup() throws Exception { if (producer == null) { if (spec.getUsername() != null) { this.connection = @@ -699,17 +699,12 @@ public class JmsIO { @ProcessElement public void processElement(ProcessContext ctx) throws Exception { String value = ctx.element(); - try { - TextMessage message = session.createTextMessage(value); - producer.send(message); - } catch (Exception t) { - finishBundle(); - throw t; - } + TextMessage message = session.createTextMessage(value); + producer.send(message); } - @FinishBundle - public void finishBundle() throws Exception { + @Teardown + public void teardown() throws Exception { producer.close(); producer = null; session.close();
