lukecwik commented on a change in pull request #12184:
URL: https://github.com/apache/beam/pull/12184#discussion_r451123739



##########
File path: 
sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java
##########
@@ -196,59 +200,74 @@ private void doExchangeTest(ExchangeTestPlan testPlan, 
boolean simulateIncompati
         exchangeType = "fanout";
       }
     }
+    final String finalExchangeType = exchangeType;
+    final CountDownLatch waitForExchangeToBeDeclared = new CountDownLatch(1);
+    final BlockingQueue<byte[]> recordsToPublish = new LinkedBlockingQueue<>();
+    
recordsToPublish.addAll(RabbitMqTestUtils.generateRecords(testPlan.getNumRecordsToPublish()));
+    Thread publisher =
+        new Thread(
+            () -> {
+              Connection connection = null;
+              Channel channel = null;
+              try {
+                ConnectionFactory connectionFactory = new ConnectionFactory();
+                connectionFactory.setAutomaticRecoveryEnabled(false);
+                connectionFactory.setUri(uri);
+                connection = connectionFactory.newConnection();
+                channel = connection.createChannel();
+                channel.exchangeDeclare(exchange, finalExchangeType);
+                // We are relying on the pipeline to declare the queue and 
messages that are
+                // published without a queue being declared are "unroutable". 
Since there is a race
+                // between when the pipeline declares and when we can start 
publishing, we add a
+                // handler to republish messages that are returned to us.
+                channel.addReturnListener(
+                    (replyCode, replyText, exchange1, routingKey, properties, 
body) -> {
+                      try {
+                        recordsToPublish.put(body);
+                      } catch (Exception e) {
+                        throw new RuntimeException(e);
+                      }
+                    });
+                waitForExchangeToBeDeclared.countDown();
+                while (true) {
+                  byte[] record = recordsToPublish.take();
+                  if (record == terminalRecord) {
+                    return;
+                  }
+                  channel.basicPublish(
+                      exchange,
+                      testPlan.publishRoutingKeyGen().get(),
+                      true, // ensure that messages are returned to sender
+                      testPlan.getPublishProperties(),
+                      record);
+                }
 
-    ConnectionFactory connectionFactory = new ConnectionFactory();
-    connectionFactory.setAutomaticRecoveryEnabled(false);
-    connectionFactory.setUri(uri);
-    Connection connection = null;
-    Channel channel = null;
-
-    try {
-      connection = connectionFactory.newConnection();
-      channel = connection.createChannel();
-      channel.exchangeDeclare(exchange, exchangeType);
-      final Channel finalChannel = channel;
-      Thread publisher =
-          new Thread(
-              () -> {
-                try {
-                  Thread.sleep(5000);
-                } catch (Exception e) {
-                  LOG.error(e.getMessage(), e);
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              } finally {
+                if (channel != null) {
+                  // channel may have already been closed automatically due to 
protocol failure
+                  try {
+                    channel.close();
+                  } catch (Exception e) {
+                    /* ignored */
+                  }
                 }
-                for (int i = 0; i < testPlan.getNumRecordsToPublish(); i++) {
+                if (connection != null) {
+                  // connection may have already been closed automatically due 
to protocol failure
                   try {
-                    finalChannel.basicPublish(
-                        exchange,
-                        testPlan.publishRoutingKeyGen().get(),
-                        testPlan.getPublishProperties(),
-                        RabbitMqTestUtils.generateRecord(i));
+                    connection.close();
                   } catch (Exception e) {
-                    LOG.error(e.getMessage(), e);
+                    /* ignored */
                   }
                 }
-              });
-      publisher.start();
-      p.run();
-      publisher.join();
-    } finally {
-      if (channel != null) {
-        // channel may have already been closed automatically due to protocol 
failure
-        try {
-          channel.close();
-        } catch (Exception e) {
-          /* ignored */
-        }
-      }
-      if (connection != null) {
-        // connection may have already been closed automatically due to 
protocol failure
-        try {
-          connection.close();
-        } catch (Exception e) {
-          /* ignored */
-        }
-      }
-    }
+              }
+            });
+    publisher.start();
+    waitForExchangeToBeDeclared.countDown();

Review comment:
       Your right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to