This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 90535a2  ARTEMIS-3652 - tighten up test assertions and cleanup, track 
errors via connection listener and consider missing send reply
90535a2 is described below

commit 90535a24012ed89d7d6d3c45192e4d9fc8416d89
Author: gtully <[email protected]>
AuthorDate: Tue Jan 25 14:33:22 2022 +0000

    ARTEMIS-3652 - tighten up test assertions and cleanup, track errors via 
connection listener and consider missing send reply
---
 .../integration/balancing/ElasticQueueTest.java    | 100 +++++++++++++++------
 1 file changed, 75 insertions(+), 25 deletions(-)

diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
index e70c4e8..a71a26a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -73,6 +74,7 @@ public class ElasticQueueTest extends ActiveMQTestBase {
    static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
 
    final int base_port = 61616;
+   final Stack<Worker> workers = new Stack<>();
    final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
    private final String balancerConfigName = "role_name_sharder";
 
@@ -86,6 +88,10 @@ public class ElasticQueueTest extends ActiveMQTestBase {
          }
       }
       nodes.clear();
+      for (Worker worker : workers) {
+         worker.done.set(true);
+      }
+      workers.clear();
       executorService.shutdownNow();
    }
 
@@ -98,8 +104,8 @@ public class ElasticQueueTest extends ActiveMQTestBase {
          }
          builder.append("amqp://localhost:").append(port_start++);
       }
-      // fast reconnect, randomize to get to all brokers and timeout sends 
that block on no credit
-      
builder.append(")?failover.randomize=true&failover.maxReconnectAttempts=1&jms.sendTimeout="
 + 1000);
+      // fast reconnect, randomize to get to all brokers and timeout sends 
that block on no credit once connected
+      
builder.append(")?failover.randomize=true&failover.maxReconnectAttempts=0&jms.sendTimeout="
 + 1000);
       return builder.toString();
    }
 
@@ -107,9 +113,11 @@ public class ElasticQueueTest extends ActiveMQTestBase {
    static class ConnectionListener implements JmsConnectionListener {
 
       AtomicInteger connectionCount;
+      final AtomicReference<JMSException> failureReason;
 
-      ConnectionListener(AtomicInteger connectionCount) {
+      ConnectionListener(AtomicInteger connectionCount, 
AtomicReference<JMSException> failureReason) {
          this.connectionCount = connectionCount;
+         this.failureReason = failureReason;
       }
 
       @Override
@@ -118,6 +126,11 @@ public class ElasticQueueTest extends ActiveMQTestBase {
 
       @Override
       public void onConnectionFailure(Throwable throwable) {
+         if (failureReason != null) {
+            JMSException wrapper = new 
JMSException("ConnectionFailureViaListener");
+            wrapper.setLinkedException(new RuntimeException(throwable));
+            failureReason.set(wrapper);
+         }
       }
 
       @Override
@@ -146,12 +159,18 @@ public class ElasticQueueTest extends ActiveMQTestBase {
       }
    }
 
+   abstract class Worker implements Runnable {
+      final AtomicBoolean done = new AtomicBoolean();
+      Worker() {
+         workers.push(this);
+      }
+   }
+
    // slow consumer
-   class EQConsumer implements Runnable {
+   class EQConsumer extends Worker {
 
       final AtomicInteger consumedCount = new AtomicInteger();
       final AtomicInteger connectionCount = new AtomicInteger();
-      final AtomicBoolean done = new AtomicBoolean();
       final AtomicInteger delayMillis;
       private final String url;
       long lastConsumed = 0;
@@ -171,18 +190,18 @@ public class ElasticQueueTest extends ActiveMQTestBase {
          try {
             while (!done.get()) {
                JmsConnectionFactory factory = new 
JmsConnectionFactory("CONSUMER", "PASSWORD", url);
-
                try (JmsConnection connection = (JmsConnection) 
factory.createConnection()) {
 
                   // track disconnects via faiover listener
                   connectionCount.incrementAndGet();
-                  connection.addConnectionListener(new 
ConnectionListener(connectionCount));
+                  final AtomicReference<JMSException> fatalError = new 
AtomicReference<>();
+                  connection.addConnectionListener(new 
ConnectionListener(connectionCount, fatalError));
                   connection.start();
 
                   Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
                   MessageConsumer messageConsumer = 
session.createConsumer(session.createQueue(qName));
 
-                  while (!done.get()) {
+                  while (!done.get() && fatalError.get() == null) {
                      Message receivedMessage = messageConsumer.receiveNoWait();
                      if (receivedMessage != null) {
                         consumedCount.incrementAndGet();
@@ -205,11 +224,10 @@ public class ElasticQueueTest extends ActiveMQTestBase {
    }
 
    // regular producer
-   static class EQProducer implements Runnable {
+   class EQProducer extends Worker {
 
       final AtomicInteger producedCount = new AtomicInteger();
       final AtomicInteger connectionCount = new AtomicInteger();
-      final AtomicBoolean done = new AtomicBoolean();
       private final String url;
 
       EQProducer(String url) {
@@ -227,7 +245,8 @@ public class ElasticQueueTest extends ActiveMQTestBase {
 
                // track disconnects via faiover listener
                connectionCount.incrementAndGet();
-               connection.addConnectionListener(new 
ConnectionListener(connectionCount));
+               final AtomicReference<JMSException> fatalError = new 
AtomicReference<>();
+               connection.addConnectionListener(new 
ConnectionListener(connectionCount, fatalError));
                connection.start();
 
                Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -236,7 +255,7 @@ public class ElasticQueueTest extends ActiveMQTestBase {
 
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(new byte[1024]);
-               while (!done.get()) {
+               while (!done.get() && fatalError.get() == null) {
                   connectedToUri = connection.getConnectedURI();
                   message.setLongProperty("PID", producedCount.get() + 1);
                   messageProducer.send(message);
@@ -254,17 +273,17 @@ public class ElasticQueueTest extends ActiveMQTestBase {
    }
 
    // combined producer/ async consumer
-   static class EQProducerAsyncConsumer implements Runnable {
+   class EQProducerAsyncConsumer extends Worker {
 
       final AtomicInteger producedCount = new AtomicInteger();
       final AtomicInteger connectionCount = new AtomicInteger();
-      final AtomicBoolean done = new AtomicBoolean();
       final AtomicBoolean producerDone = new AtomicBoolean();
       final AtomicInteger consumerSleepMillis = new AtomicInteger(1000);
       private final String url;
       final AtomicInteger consumedCount = new AtomicInteger();
       private final String user;
       private long lastConsumed;
+      private AtomicReference<JmsConnection> currentConnection = new 
AtomicReference<>();
 
       EQProducerAsyncConsumer(String url, String user) {
          this.url = url;
@@ -277,11 +296,13 @@ public class ElasticQueueTest extends ActiveMQTestBase {
          while (!done.get()) {
             JmsConnectionFactory factory = new JmsConnectionFactory(user, 
"PASSWORD", url);
 
+            final AtomicReference<JMSException> fatalError = new 
AtomicReference<>();
             try (JmsConnection connection = (JmsConnection) 
factory.createConnection()) {
 
+               currentConnection.set(connection);
                // track disconnects via faiover listener
                connectionCount.incrementAndGet();
-               connection.addConnectionListener(new 
ConnectionListener(connectionCount));
+               connection.addConnectionListener(new 
ConnectionListener(connectionCount, fatalError));
                connection.start();
 
                Session clientSession = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
@@ -295,7 +316,11 @@ public class ElasticQueueTest extends ActiveMQTestBase {
                         TimeUnit.MILLISECONDS.sleep(consumerSleepMillis.get());
                      }
                      message.acknowledge();
-                  } catch (JMSException | InterruptedException ignored) {
+                  } catch (JMSException | InterruptedException treatAsFatal) {
+                     JMSException errorWrapper = new JMSException("ERROR from 
onMessage");
+                     errorWrapper.setLinkedException(treatAsFatal);
+                     fatalError.set(errorWrapper);
+                     System.out.println("OnMessage Got: " + treatAsFatal + ", 
lastConsumed:" + lastConsumed + ", connectionCount:" + connectionCount.get());
                   }
                });
 
@@ -304,16 +329,20 @@ public class ElasticQueueTest extends ActiveMQTestBase {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(new byte[1024]);
                while (!done.get()) {
+                  if (fatalError.get() != null) {
+                     throw fatalError.get();
+                  }
                   if (!producerDone.get()) {
                      message.setLongProperty("PID", producedCount.get() + 1);
                      messageProducer.send(message);
                      producedCount.incrementAndGet();
                   } else {
                      // just hang about and let the consumer listener work
-                     TimeUnit.SECONDS.sleep(5);
+                     TimeUnit.SECONDS.sleep(1);
                   }
                }
             } catch (JMSException | InterruptedException ignored) {
+               System.out.println("Exception: "  + ignored.toString() + ", 
PC=" +   producedCount.get());
             }
          }
       }
@@ -325,6 +354,16 @@ public class ElasticQueueTest extends ActiveMQTestBase {
       public long getLastConsumed() {
          return lastConsumed;
       }
+
+      @Override
+      public String toString() {
+         JmsConnection connectedTo = currentConnection.get();
+         if (connectedTo != null) {
+            return "EQProducerAsyncConsumer on:" + 
connectedTo.getConnectedURI();
+         } else {
+            return super.toString();
+         }
+      }
    }
 
    MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
@@ -491,8 +530,7 @@ public class ElasticQueueTest extends ActiveMQTestBase {
       }, 10000, 200));
 
       assertTrue("Got all produced", Wait.waitFor(() -> {
-         System.out.println("consumed pid: " + eqConsumer.getLastConsumed() + 
", produced: " + eqProducer.getLastProduced());
-         return (eqProducer.getLastProduced() == eqConsumer.getLastConsumed());
+         return pidInRange("head&tail", eqProducer.getLastProduced(), 
eqConsumer.getLastConsumed());
       }, 4000, 100));
 
       eqConsumer.done.set(true);
@@ -584,13 +622,12 @@ public class ElasticQueueTest extends ActiveMQTestBase {
       // head should drain
       assertTrue(Wait.waitFor(() -> {
          int usage = addressControl1.getAddressLimitPercent();
-         System.out.println("Node1 usage % " + usage);
+         System.out.println("Node1 usage % " + usage + ", eqProducerConsumer: 
" + eqProducerConsumer);
          return usage == 0;
-      },10000, 200));
+      },10000, 500));
 
       assertTrue(Wait.waitFor(() -> {
-         System.out.println("current head&tail lastProduced: " + 
eqProducerConsumer.getLastProduced() + ", consumed: " + 
eqProducerConsumer.getLastConsumed());
-         return eqProducerConsumer.getLastProduced() == 
eqProducerConsumer.getLastConsumed();
+         return pidInRange("head&tail", eqProducerConsumer.getLastProduced(), 
eqProducerConsumer.getLastConsumed());
       },5000, 100));
 
       eqProducerConsumer.done.set(true);
@@ -689,12 +726,25 @@ public class ElasticQueueTest extends ActiveMQTestBase {
       }, 20000, 200));
 
       assertTrue(Wait.waitFor(() -> {
-         System.out.println("current head&tail lastProduced: " + 
eqProducerConsumer.getLastProduced() + ", consumed: " + 
eqProducerConsumer.getLastConsumed());
-         return eqProducerConsumer.getLastProduced() == 
eqProducerConsumer.getLastConsumed();
+         return pidInRange("head&tail", eqProducerConsumer.getLastProduced(), 
eqProducerConsumer.getLastConsumed());
       }, 20000, 200));
 
       eqProducerConsumer.done.set(true);
 
       nodes.get(1).stop();
    }
+
+   private boolean pidInRange(String s, long lastProduced, long lastConsumed) {
+      System.out.println(String.format("pidInRange - %s, lastProduced: %d, 
lastConsumed: %d", s, lastProduced, lastConsumed));
+      if (lastConsumed == lastProduced) {
+         return true;
+      }
+      // in case of a send timeout or just a missed reply to send (the send 
disposition is lost), produced pid does not get
+      // incremented to reflect the send completion on the broker. An off by 
one error is expected sometimes.
+      // the connection can be whacked by the broker in the case of a slow 
consumer
+      if (lastConsumed == lastProduced + 1) {
+         return true;
+      }
+      return false;
+   }
 }

Reply via email to