Repository: beam Updated Branches: refs/heads/master 9088a3e39 -> 07e8cd5fc
[BEAM-2684] Fix flaky AmqpIOTest by introducing ActiveMQ AMQP broker instead of peer-to-peer mode Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02eb0913 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02eb0913 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02eb0913 Branch: refs/heads/master Commit: 02eb09135ce1ae234052caf7ff2787256908f918 Parents: 9088a3e Author: Alex Filatov <alex-fila...@users.noreply.github.com> Authored: Thu Aug 10 23:02:37 2017 +0300 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Fri Aug 11 07:20:06 2017 +0200 ---------------------------------------------------------------------- sdks/java/io/amqp/pom.xml | 22 ++++ .../org/apache/beam/sdk/io/amqp/AmqpIOTest.java | 112 ++++++++----------- 2 files changed, 66 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/02eb0913/sdks/java/io/amqp/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml index 4369bb8..c28436b 100644 --- a/sdks/java/io/amqp/pom.xml +++ b/sdks/java/io/amqp/pom.xml @@ -30,6 +30,10 @@ <name>Apache Beam :: SDKs :: Java :: IO :: AMQP</name> <description>IO to read and write using AMQP 1.0 protocol (http://www.amqp.org).</description> + <properties> + <activemq.version>5.13.1</activemq.version> + </properties> + <dependencies> <dependency> <groupId>org.apache.beam</groupId> @@ -96,6 +100,24 @@ <artifactId>beam-runners-direct-java</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-amqp</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq.tooling</groupId> + <artifactId>activemq-junit</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/02eb0913/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java index c8fe4e8..947929f 100644 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java @@ -20,11 +20,11 @@ package org.apache.beam.sdk.io.amqp; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.net.ServerSocket; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.activemq.junit.EmbeddedActiveMQBroker; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; @@ -33,7 +33,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.messenger.Messenger; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -49,95 +48,57 @@ public class AmqpIOTest { private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class); - private int port; - @Rule public TestPipeline pipeline = TestPipeline.create(); - @Before - public void findFreeNetworkPort() throws Exception { - LOG.info("Finding free network port"); - ServerSocket socket = new ServerSocket(0); - port = socket.getLocalPort(); - socket.close(); - } + @Rule public EmbeddedAmqpBroker broker = new EmbeddedAmqpBroker(); @Test public void testRead() throws Exception { PCollection<Message> output = pipeline.apply(AmqpIO.read() .withMaxNumRecords(100) - .withAddresses(Collections.singletonList("amqp://~localhost:" + port))); + .withAddresses(Collections.singletonList(broker.getQueueUri("testRead")))); PAssert.thatSingleton(output.apply(Count.<Message>globally())).isEqualTo(100L); - Thread sender = new Thread() { - public void run() { - try { - Thread.sleep(500); - Messenger sender = Messenger.Factory.create(); - sender.start(); - for (int i = 0; i < 100; i++) { - Message message = Message.Factory.create(); - message.setAddress("amqp://localhost:" + port); - message.setBody(new AmqpValue("Test " + i)); - sender.put(message); - sender.send(); - } - sender.stop(); - } catch (Exception e) { - LOG.error("Sender error", e); - } - } - }; - try { - sender.start(); - pipeline.run(); - } finally { - sender.join(); + Messenger sender = Messenger.Factory.create(); + sender.start(); + for (int i = 0; i < 100; i++) { + Message message = Message.Factory.create(); + message.setAddress(broker.getQueueUri("testRead")); + message.setBody(new AmqpValue("Test " + i)); + sender.put(message); + sender.send(); } + sender.stop(); + + pipeline.run(); } @Test public void testWrite() throws Exception { - final List<String> received = new ArrayList<>(); - Thread receiver = new Thread() { - @Override - public void run() { - try { - Messenger messenger = Messenger.Factory.create(); - messenger.start(); - messenger.subscribe("amqp://~localhost:" + port); - while (received.size() < 100) { - messenger.recv(); - while (messenger.incoming() > 0) { - Message message = messenger.get(); - LOG.info("Received: " + message.getBody().toString()); - received.add(message.getBody().toString()); - } - } - messenger.stop(); - } catch (Exception e) { - LOG.error("Receiver error", e); - } - } - }; - LOG.info("Starting AMQP receiver"); - receiver.start(); - List<Message> data = new ArrayList<>(); for (int i = 0; i < 100; i++) { Message message = Message.Factory.create(); message.setBody(new AmqpValue("Test " + i)); - message.setAddress("amqp://localhost:" + port); + message.setAddress(broker.getQueueUri("testWrite")); message.setSubject("test"); data.add(message); } pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write()); - LOG.info("Starting pipeline"); - try { - pipeline.run(); - } finally { - LOG.info("Join receiver thread"); - receiver.join(); + pipeline.run().waitUntilFinish(); + + List<String> received = new ArrayList<>(); + Messenger messenger = Messenger.Factory.create(); + messenger.start(); + messenger.subscribe(broker.getQueueUri("testWrite")); + while (received.size() < 100) { + messenger.recv(); + while (messenger.incoming() > 0) { + Message message = messenger.get(); + LOG.info("Received: " + message.getBody().toString()); + received.add(message.getBody().toString()); + } } + messenger.stop(); assertEquals(100, received.size()); for (int i = 0; i < 100; i++) { @@ -145,4 +106,19 @@ public class AmqpIOTest { } } + private static class EmbeddedAmqpBroker extends EmbeddedActiveMQBroker { + @Override + protected void configure() { + try { + getBrokerService().addConnector("amqp://localhost:0"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public String getQueueUri(String queueName) { + return getBrokerService().getDefaultSocketURIString() + "/" + queueName; + } + } + }