[ 
https://issues.apache.org/jira/browse/BEAM-10392?focusedWorklogId=455364&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-455364
 ]

ASF GitHub Bot logged work on BEAM-10392:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jul/20 11:19
            Start Date: 07/Jul/20 11:19
    Worklog Time Spent: 10m 
      Work Description: mxm commented on a change in pull request #12184:
URL: https://github.com/apache/beam/pull/12184#discussion_r450789800



##########
File path: 
sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java
##########
@@ -196,59 +200,74 @@ private void doExchangeTest(ExchangeTestPlan testPlan, 
boolean simulateIncompati
         exchangeType = "fanout";
       }
     }
+    final String finalExchangeType = exchangeType;
+    final CountDownLatch waitForExchangeToBeDeclared = new CountDownLatch(1);
+    final BlockingQueue<byte[]> recordsToPublish = new LinkedBlockingQueue<>();
+    
recordsToPublish.addAll(RabbitMqTestUtils.generateRecords(testPlan.getNumRecordsToPublish()));
+    Thread publisher =
+        new Thread(
+            () -> {
+              Connection connection = null;
+              Channel channel = null;
+              try {
+                ConnectionFactory connectionFactory = new ConnectionFactory();
+                connectionFactory.setAutomaticRecoveryEnabled(false);
+                connectionFactory.setUri(uri);
+                connection = connectionFactory.newConnection();
+                channel = connection.createChannel();
+                channel.exchangeDeclare(exchange, finalExchangeType);
+                // We are relying on the pipeline to declare the queue and 
messages that are
+                // published without a queue being declared are "unroutable". 
Since there is a race
+                // between when the pipeline declares and when we can start 
publishing, we add a
+                // handler to republish messages that are returned to us.
+                channel.addReturnListener(
+                    (replyCode, replyText, exchange1, routingKey, properties, 
body) -> {
+                      try {
+                        recordsToPublish.put(body);
+                      } catch (Exception e) {
+                        throw new RuntimeException(e);
+                      }
+                    });
+                waitForExchangeToBeDeclared.countDown();
+                while (true) {
+                  byte[] record = recordsToPublish.take();
+                  if (record == terminalRecord) {
+                    return;
+                  }
+                  channel.basicPublish(
+                      exchange,
+                      testPlan.publishRoutingKeyGen().get(),
+                      true, // ensure that messages are returned to sender
+                      testPlan.getPublishProperties(),
+                      record);
+                }
 
-    ConnectionFactory connectionFactory = new ConnectionFactory();
-    connectionFactory.setAutomaticRecoveryEnabled(false);
-    connectionFactory.setUri(uri);
-    Connection connection = null;
-    Channel channel = null;
-
-    try {
-      connection = connectionFactory.newConnection();
-      channel = connection.createChannel();
-      channel.exchangeDeclare(exchange, exchangeType);
-      final Channel finalChannel = channel;
-      Thread publisher =
-          new Thread(
-              () -> {
-                try {
-                  Thread.sleep(5000);
-                } catch (Exception e) {
-                  LOG.error(e.getMessage(), e);
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              } finally {
+                if (channel != null) {
+                  // channel may have already been closed automatically due to 
protocol failure
+                  try {
+                    channel.close();
+                  } catch (Exception e) {
+                    /* ignored */
+                  }
                 }
-                for (int i = 0; i < testPlan.getNumRecordsToPublish(); i++) {
+                if (connection != null) {
+                  // connection may have already been closed automatically due 
to protocol failure
                   try {
-                    finalChannel.basicPublish(
-                        exchange,
-                        testPlan.publishRoutingKeyGen().get(),
-                        testPlan.getPublishProperties(),
-                        RabbitMqTestUtils.generateRecord(i));
+                    connection.close();
                   } catch (Exception e) {
-                    LOG.error(e.getMessage(), e);
+                    /* ignored */
                   }
                 }
-              });
-      publisher.start();
-      p.run();
-      publisher.join();
-    } finally {
-      if (channel != null) {
-        // channel may have already been closed automatically due to protocol 
failure
-        try {
-          channel.close();
-        } catch (Exception e) {
-          /* ignored */
-        }
-      }
-      if (connection != null) {
-        // connection may have already been closed automatically due to 
protocol failure
-        try {
-          connection.close();
-        } catch (Exception e) {
-          /* ignored */
-        }
-      }
-    }
+              }
+            });
+    publisher.start();
+    waitForExchangeToBeDeclared.countDown();

Review comment:
       I think this has to be:
   
   ```suggestion
       waitForExchangeToBeDeclared.await();
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 455364)
    Time Spent: 2h 20m  (was: 2h 10m)

> :sdks:java:io:rabbitmq:test gets stuck regularly
> ------------------------------------------------
>
>                 Key: BEAM-10392
>                 URL: https://issues.apache.org/jira/browse/BEAM-10392
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-rabbitmq
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: P2
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> See [https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/12017/] for an 
> example.
> Here is one with the stack trace 
> https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/12035/testReport/junit/org.apache.beam.sdk.io.rabbitmq/RabbitMqIOTest/classMethod/:
> {code:java}
> org.junit.runners.model.TestTimedOutException: test timed out after 5 minutes
>       at sun.misc.Unsafe.park(Native Method)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at 
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>       at 
> org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$QueueMessageReceiver.tryNext(ExecutorServiceParallelExecutor.java:410)
>       at 
> org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$QueueMessageReceiver.access$300(ExecutorServiceParallelExecutor.java:383)
>       at 
> org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.waitUntilFinish(ExecutorServiceParallelExecutor.java:245)
>       at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:343)
>       at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>       at 
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>       at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>       at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>       at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>       at 
> org.apache.beam.sdk.io.rabbitmq.RabbitMqIOTest.doExchangeTest(RabbitMqIOTest.java:232)
>       at 
> org.apache.beam.sdk.io.rabbitmq.RabbitMqIOTest.doExchangeTest(RabbitMqIOTest.java:255)
>       at 
> org.apache.beam.sdk.io.rabbitmq.RabbitMqIOTest.testUseCorrelationIdFailsWhenIdsMissing(RabbitMqIOTest.java:362)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
>       at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>       at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>       at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>       at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to