Repository: beam
Updated Branches:
  refs/heads/master 9088a3e39 -> 07e8cd5fc


[BEAM-2684] Fix flaky AmqpIOTest by introducing ActiveMQ AMQP broker instead of 
peer-to-peer mode


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02eb0913
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02eb0913
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02eb0913

Branch: refs/heads/master
Commit: 02eb09135ce1ae234052caf7ff2787256908f918
Parents: 9088a3e
Author: Alex Filatov <alex-fila...@users.noreply.github.com>
Authored: Thu Aug 10 23:02:37 2017 +0300
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Fri Aug 11 07:20:06 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/amqp/pom.xml                       |  22 ++++
 .../org/apache/beam/sdk/io/amqp/AmqpIOTest.java | 112 ++++++++-----------
 2 files changed, 66 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/02eb0913/sdks/java/io/amqp/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml
index 4369bb8..c28436b 100644
--- a/sdks/java/io/amqp/pom.xml
+++ b/sdks/java/io/amqp/pom.xml
@@ -30,6 +30,10 @@
   <name>Apache Beam :: SDKs :: Java :: IO :: AMQP</name>
   <description>IO to read and write using AMQP 1.0 protocol 
(http://www.amqp.org).</description>
 
+  <properties>
+    <activemq.version>5.13.1</activemq.version>
+  </properties>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>
@@ -96,6 +100,24 @@
       <artifactId>beam-runners-direct-java</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <version>${activemq.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-amqp</artifactId>
+      <version>${activemq.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq.tooling</groupId>
+      <artifactId>activemq-junit</artifactId>
+      <version>${activemq.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/02eb0913/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java 
b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
index c8fe4e8..947929f 100644
--- 
a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
+++ 
b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
@@ -20,11 +20,11 @@ package org.apache.beam.sdk.io.amqp;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.activemq.junit.EmbeddedActiveMQBroker;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -33,7 +33,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.messenger.Messenger;
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -49,95 +48,57 @@ public class AmqpIOTest {
 
   private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class);
 
-  private int port;
-
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  @Before
-  public void findFreeNetworkPort() throws Exception {
-    LOG.info("Finding free network port");
-    ServerSocket socket = new ServerSocket(0);
-    port = socket.getLocalPort();
-    socket.close();
-  }
+  @Rule public EmbeddedAmqpBroker broker = new EmbeddedAmqpBroker();
 
   @Test
   public void testRead() throws Exception {
     PCollection<Message> output = pipeline.apply(AmqpIO.read()
         .withMaxNumRecords(100)
-        .withAddresses(Collections.singletonList("amqp://~localhost:" + 
port)));
+        
.withAddresses(Collections.singletonList(broker.getQueueUri("testRead"))));
     
PAssert.thatSingleton(output.apply(Count.<Message>globally())).isEqualTo(100L);
 
-    Thread sender = new Thread() {
-      public void run() {
-        try {
-          Thread.sleep(500);
-          Messenger sender = Messenger.Factory.create();
-          sender.start();
-          for (int i = 0; i < 100; i++) {
-            Message message = Message.Factory.create();
-            message.setAddress("amqp://localhost:" + port);
-            message.setBody(new AmqpValue("Test " + i));
-            sender.put(message);
-            sender.send();
-          }
-          sender.stop();
-        } catch (Exception e) {
-          LOG.error("Sender error", e);
-        }
-      }
-    };
-    try {
-      sender.start();
-      pipeline.run();
-    } finally {
-      sender.join();
+    Messenger sender = Messenger.Factory.create();
+    sender.start();
+    for (int i = 0; i < 100; i++) {
+      Message message = Message.Factory.create();
+      message.setAddress(broker.getQueueUri("testRead"));
+      message.setBody(new AmqpValue("Test " + i));
+      sender.put(message);
+      sender.send();
     }
+    sender.stop();
+
+    pipeline.run();
   }
 
   @Test
   public void testWrite() throws Exception {
-    final List<String> received = new ArrayList<>();
-    Thread receiver = new Thread() {
-      @Override
-      public void run() {
-        try {
-          Messenger messenger = Messenger.Factory.create();
-          messenger.start();
-          messenger.subscribe("amqp://~localhost:" + port);
-          while (received.size() < 100) {
-            messenger.recv();
-            while (messenger.incoming() > 0) {
-              Message message = messenger.get();
-              LOG.info("Received: " + message.getBody().toString());
-              received.add(message.getBody().toString());
-            }
-          }
-          messenger.stop();
-        } catch (Exception e) {
-          LOG.error("Receiver error", e);
-        }
-      }
-    };
-    LOG.info("Starting AMQP receiver");
-    receiver.start();
-
     List<Message> data = new ArrayList<>();
     for (int i = 0; i < 100; i++) {
       Message message = Message.Factory.create();
       message.setBody(new AmqpValue("Test " + i));
-      message.setAddress("amqp://localhost:" + port);
+      message.setAddress(broker.getQueueUri("testWrite"));
       message.setSubject("test");
       data.add(message);
     }
     
pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write());
-    LOG.info("Starting pipeline");
-    try {
-      pipeline.run();
-    } finally {
-      LOG.info("Join receiver thread");
-      receiver.join();
+    pipeline.run().waitUntilFinish();
+
+    List<String> received = new ArrayList<>();
+    Messenger messenger = Messenger.Factory.create();
+    messenger.start();
+    messenger.subscribe(broker.getQueueUri("testWrite"));
+    while (received.size() < 100) {
+      messenger.recv();
+      while (messenger.incoming() > 0) {
+        Message message = messenger.get();
+        LOG.info("Received: " + message.getBody().toString());
+        received.add(message.getBody().toString());
+      }
     }
+    messenger.stop();
 
     assertEquals(100, received.size());
     for (int i = 0; i < 100; i++) {
@@ -145,4 +106,19 @@ public class AmqpIOTest {
     }
   }
 
+  private static class EmbeddedAmqpBroker extends EmbeddedActiveMQBroker {
+    @Override
+    protected void configure() {
+      try {
+        getBrokerService().addConnector("amqp://localhost:0");
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public String getQueueUri(String queueName) {
+      return getBrokerService().getDefaultSocketURIString() + "/" + queueName;
+    }
+  }
+
 }

Reply via email to