[ 
https://issues.apache.org/jira/browse/ARTEMIS-4296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868531#comment-17868531
 ] 

Justin Bertram commented on ARTEMIS-4296:
-----------------------------------------

I took another look at this and modified the test to use an {{ExecutorService}} 
to execute the call-back logic on independent threads and everything worked 
fine. I push the test up to 200,000 iterations and there were no issues. This 
indicates to me that the problem was what I originally suspected - a threading 
issue on the Paho client.

The test-suite has changed a bit since I posted the test in my previous 
comment. Here is the new test that I used:
{code:java}
@Test
public void testRequestResponseMessages() throws Exception {
   server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteQueues(true));
   final int timeout = 120;
   final Integer REQUEST_NOT_SENT = 0;
   final Integer SENDING_REQUEST = 1;
   final Integer REQUEST_RECEIVED = 2;
   final Integer SENDING_RESPONSE = 3;
   final Integer RESPONSE_RECEIVED = 4;

   final AtomicLong failures = new AtomicLong(0);
   long start = System.currentTimeMillis();
   int messageCount = 200_000;
   Map<String, Integer> results = new ConcurrentHashMap();
   for (int i = 0; i < messageCount; i++) {
      results.put(RandomUtil.randomString(), REQUEST_NOT_SENT);
   }

   ExecutorService pool = Executors.newFixedThreadPool(100);

   // receive requests and send responses
   MqttClient responder = createPahoClient("responder");
   responder.setTimeToWait(2500);
   CountDownLatch responderLatch = new CountDownLatch(messageCount);
   responder.connect();
   responder.subscribe("requests", 2);
   responder.setCallback(new DefaultMqttCallback() {
      @Override
      public void messageArrived(String topic, MqttMessage message) {
         pool.execute(() -> {
            String i = new String(message.getPayload());
            results.put(i, REQUEST_RECEIVED);
            logger.info("Received request {} from topic '{}'", message, topic);
            responderLatch.countDown();
            String responseTopic = message.getProperties().getResponseTopic();
            logger.info("Sending response to topic '{}'...", responseTopic);
            try {
               results.put(i, SENDING_RESPONSE);
               responder.publish(responseTopic, message.getPayload(), 2, false);
               logger.info("Sent response to topic '{}'", responseTopic);
            } catch (Exception ex) {
               failures.incrementAndGet();
               logger.error("Failed to send response to topic '{}'", 
responseTopic, ex);
            }
         });
      }
   });

   // receive responses
   CountDownLatch requestorLatch = new CountDownLatch(messageCount);
   MqttClient requestor = createPahoClient("requestor");
   requestor.setTimeToWait(2500);
   requestor.connect();
   requestor.subscribe("responses/+", 2);
   requestor.setCallback(new DefaultMqttCallback() {
      @Override
      public void messageArrived(String topic, MqttMessage message) {
         pool.execute(() -> {
            results.put(new String(message.getPayload()), RESPONSE_RECEIVED);
            logger.info("Received response {} at topic '{}'", message, topic);
            requestorLatch.countDown();
         });
      }
   });

   // send requests
   for (String id : results.keySet()) {
      MqttProperties properties = new MqttProperties();
      properties.setResponseTopic("responses/" + id);
      logger.info("Sending request to respond to " + 
properties.getResponseTopic());
      results.put(id, SENDING_REQUEST);
      requestor.publish("requests", new 
MqttMessage(id.getBytes(StandardCharsets.UTF_8), 2, false, properties));
   }

   assertEquals(0, failures.get());
   boolean responderResult = responderLatch.await(timeout, TimeUnit.SECONDS);
   boolean requestorResult = requestorLatch.await(timeout, TimeUnit.SECONDS);

   logger.info("Sent " + messageCount + " messages in " + 
(System.currentTimeMillis() - start) + " milliseconds.");

   for (Map.Entry<String, Integer> entry : results.entrySet()) {
      logger.info(entry.getKey() + ": " + entry.getValue());
   }

   responder.disconnect();
   responder.close();

   requestor.disconnect();
   requestor.close();

   assertTrue(responderResult);
   assertTrue(requestorResult);

   pool.shutdownNow();
}{code}

> Lost MQTT messages
> ------------------
>
>                 Key: ARTEMIS-4296
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4296
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.28.0
>            Reporter: Daniel Martin
>            Priority: Major
>
> Please, see [https://github.com/apache/activemq-artemis/pull/4492]:
> {quote}In the following test, a broker client sends 1000 messages which are 
> expected to be received by a second client, which in turn replies to every 
> message with new ones (thus being labeled "server"). _It fails every time_ – 
> the broker just seems to get stuck and stop sending messages at a given 
> moment.
> In case it is useful, when lowering the number of messages sent to 100, you 
> should see that the test sometimes passes.
> I have been running the tests with:
> {code:java}
> $ cd tests/integration-tests
> $ mvn verify -DskipIntegrationTests=false 
> -Dtest="MQTT5Test#testRequestResponseMessages*"{code}
> I created this test to reproduce a slightly more complicated case I am 
> dealing with at the moment where, in a production setting, messages are 
> getting lost left and right and I haven't got a clue why.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to