This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 90535a2 ARTEMIS-3652 - tighten up test assertions and cleanup, track
errors via connection listener and consider missing send reply
90535a2 is described below
commit 90535a24012ed89d7d6d3c45192e4d9fc8416d89
Author: gtully <[email protected]>
AuthorDate: Tue Jan 25 14:33:22 2022 +0000
ARTEMIS-3652 - tighten up test assertions and cleanup, track errors via
connection listener and consider missing send reply
---
.../integration/balancing/ElasticQueueTest.java | 100 +++++++++++++++------
1 file changed, 75 insertions(+), 25 deletions(-)
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
index e70c4e8..a71a26a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -73,6 +74,7 @@ public class ElasticQueueTest extends ActiveMQTestBase {
static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
final int base_port = 61616;
+ final Stack<Worker> workers = new Stack<>();
final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
private final String balancerConfigName = "role_name_sharder";
@@ -86,6 +88,10 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}
}
nodes.clear();
+ for (Worker worker : workers) {
+ worker.done.set(true);
+ }
+ workers.clear();
executorService.shutdownNow();
}
@@ -98,8 +104,8 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}
builder.append("amqp://localhost:").append(port_start++);
}
- // fast reconnect, randomize to get to all brokers and timeout sends
that block on no credit
-
builder.append(")?failover.randomize=true&failover.maxReconnectAttempts=1&jms.sendTimeout="
+ 1000);
+ // fast reconnect, randomize to get to all brokers and timeout sends
that block on no credit once connected
+
builder.append(")?failover.randomize=true&failover.maxReconnectAttempts=0&jms.sendTimeout="
+ 1000);
return builder.toString();
}
@@ -107,9 +113,11 @@ public class ElasticQueueTest extends ActiveMQTestBase {
static class ConnectionListener implements JmsConnectionListener {
AtomicInteger connectionCount;
+ final AtomicReference<JMSException> failureReason;
- ConnectionListener(AtomicInteger connectionCount) {
+ ConnectionListener(AtomicInteger connectionCount,
AtomicReference<JMSException> failureReason) {
this.connectionCount = connectionCount;
+ this.failureReason = failureReason;
}
@Override
@@ -118,6 +126,11 @@ public class ElasticQueueTest extends ActiveMQTestBase {
@Override
public void onConnectionFailure(Throwable throwable) {
+ if (failureReason != null) {
+ JMSException wrapper = new
JMSException("ConnectionFailureViaListener");
+ wrapper.setLinkedException(new RuntimeException(throwable));
+ failureReason.set(wrapper);
+ }
}
@Override
@@ -146,12 +159,18 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}
}
+ abstract class Worker implements Runnable {
+ final AtomicBoolean done = new AtomicBoolean();
+ Worker() {
+ workers.push(this);
+ }
+ }
+
// slow consumer
- class EQConsumer implements Runnable {
+ class EQConsumer extends Worker {
final AtomicInteger consumedCount = new AtomicInteger();
final AtomicInteger connectionCount = new AtomicInteger();
- final AtomicBoolean done = new AtomicBoolean();
final AtomicInteger delayMillis;
private final String url;
long lastConsumed = 0;
@@ -171,18 +190,18 @@ public class ElasticQueueTest extends ActiveMQTestBase {
try {
while (!done.get()) {
JmsConnectionFactory factory = new
JmsConnectionFactory("CONSUMER", "PASSWORD", url);
-
try (JmsConnection connection = (JmsConnection)
factory.createConnection()) {
// track disconnects via faiover listener
connectionCount.incrementAndGet();
- connection.addConnectionListener(new
ConnectionListener(connectionCount));
+ final AtomicReference<JMSException> fatalError = new
AtomicReference<>();
+ connection.addConnectionListener(new
ConnectionListener(connectionCount, fatalError));
connection.start();
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
MessageConsumer messageConsumer =
session.createConsumer(session.createQueue(qName));
- while (!done.get()) {
+ while (!done.get() && fatalError.get() == null) {
Message receivedMessage = messageConsumer.receiveNoWait();
if (receivedMessage != null) {
consumedCount.incrementAndGet();
@@ -205,11 +224,10 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}
// regular producer
- static class EQProducer implements Runnable {
+ class EQProducer extends Worker {
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger connectionCount = new AtomicInteger();
- final AtomicBoolean done = new AtomicBoolean();
private final String url;
EQProducer(String url) {
@@ -227,7 +245,8 @@ public class ElasticQueueTest extends ActiveMQTestBase {
// track disconnects via faiover listener
connectionCount.incrementAndGet();
- connection.addConnectionListener(new
ConnectionListener(connectionCount));
+ final AtomicReference<JMSException> fatalError = new
AtomicReference<>();
+ connection.addConnectionListener(new
ConnectionListener(connectionCount, fatalError));
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -236,7 +255,7 @@ public class ElasticQueueTest extends ActiveMQTestBase {
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[1024]);
- while (!done.get()) {
+ while (!done.get() && fatalError.get() == null) {
connectedToUri = connection.getConnectedURI();
message.setLongProperty("PID", producedCount.get() + 1);
messageProducer.send(message);
@@ -254,17 +273,17 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}
// combined producer/ async consumer
- static class EQProducerAsyncConsumer implements Runnable {
+ class EQProducerAsyncConsumer extends Worker {
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger connectionCount = new AtomicInteger();
- final AtomicBoolean done = new AtomicBoolean();
final AtomicBoolean producerDone = new AtomicBoolean();
final AtomicInteger consumerSleepMillis = new AtomicInteger(1000);
private final String url;
final AtomicInteger consumedCount = new AtomicInteger();
private final String user;
private long lastConsumed;
+ private AtomicReference<JmsConnection> currentConnection = new
AtomicReference<>();
EQProducerAsyncConsumer(String url, String user) {
this.url = url;
@@ -277,11 +296,13 @@ public class ElasticQueueTest extends ActiveMQTestBase {
while (!done.get()) {
JmsConnectionFactory factory = new JmsConnectionFactory(user,
"PASSWORD", url);
+ final AtomicReference<JMSException> fatalError = new
AtomicReference<>();
try (JmsConnection connection = (JmsConnection)
factory.createConnection()) {
+ currentConnection.set(connection);
// track disconnects via faiover listener
connectionCount.incrementAndGet();
- connection.addConnectionListener(new
ConnectionListener(connectionCount));
+ connection.addConnectionListener(new
ConnectionListener(connectionCount, fatalError));
connection.start();
Session clientSession = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
@@ -295,7 +316,11 @@ public class ElasticQueueTest extends ActiveMQTestBase {
TimeUnit.MILLISECONDS.sleep(consumerSleepMillis.get());
}
message.acknowledge();
- } catch (JMSException | InterruptedException ignored) {
+ } catch (JMSException | InterruptedException treatAsFatal) {
+ JMSException errorWrapper = new JMSException("ERROR from
onMessage");
+ errorWrapper.setLinkedException(treatAsFatal);
+ fatalError.set(errorWrapper);
+ System.out.println("OnMessage Got: " + treatAsFatal + ",
lastConsumed:" + lastConsumed + ", connectionCount:" + connectionCount.get());
}
});
@@ -304,16 +329,20 @@ public class ElasticQueueTest extends ActiveMQTestBase {
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[1024]);
while (!done.get()) {
+ if (fatalError.get() != null) {
+ throw fatalError.get();
+ }
if (!producerDone.get()) {
message.setLongProperty("PID", producedCount.get() + 1);
messageProducer.send(message);
producedCount.incrementAndGet();
} else {
// just hang about and let the consumer listener work
- TimeUnit.SECONDS.sleep(5);
+ TimeUnit.SECONDS.sleep(1);
}
}
} catch (JMSException | InterruptedException ignored) {
+ System.out.println("Exception: " + ignored.toString() + ",
PC=" + producedCount.get());
}
}
}
@@ -325,6 +354,16 @@ public class ElasticQueueTest extends ActiveMQTestBase {
public long getLastConsumed() {
return lastConsumed;
}
+
+ @Override
+ public String toString() {
+ JmsConnection connectedTo = currentConnection.get();
+ if (connectedTo != null) {
+ return "EQProducerAsyncConsumer on:" +
connectedTo.getConnectedURI();
+ } else {
+ return super.toString();
+ }
+ }
}
MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
@@ -491,8 +530,7 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}, 10000, 200));
assertTrue("Got all produced", Wait.waitFor(() -> {
- System.out.println("consumed pid: " + eqConsumer.getLastConsumed() +
", produced: " + eqProducer.getLastProduced());
- return (eqProducer.getLastProduced() == eqConsumer.getLastConsumed());
+ return pidInRange("head&tail", eqProducer.getLastProduced(),
eqConsumer.getLastConsumed());
}, 4000, 100));
eqConsumer.done.set(true);
@@ -584,13 +622,12 @@ public class ElasticQueueTest extends ActiveMQTestBase {
// head should drain
assertTrue(Wait.waitFor(() -> {
int usage = addressControl1.getAddressLimitPercent();
- System.out.println("Node1 usage % " + usage);
+ System.out.println("Node1 usage % " + usage + ", eqProducerConsumer:
" + eqProducerConsumer);
return usage == 0;
- },10000, 200));
+ },10000, 500));
assertTrue(Wait.waitFor(() -> {
- System.out.println("current head&tail lastProduced: " +
eqProducerConsumer.getLastProduced() + ", consumed: " +
eqProducerConsumer.getLastConsumed());
- return eqProducerConsumer.getLastProduced() ==
eqProducerConsumer.getLastConsumed();
+ return pidInRange("head&tail", eqProducerConsumer.getLastProduced(),
eqProducerConsumer.getLastConsumed());
},5000, 100));
eqProducerConsumer.done.set(true);
@@ -689,12 +726,25 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}, 20000, 200));
assertTrue(Wait.waitFor(() -> {
- System.out.println("current head&tail lastProduced: " +
eqProducerConsumer.getLastProduced() + ", consumed: " +
eqProducerConsumer.getLastConsumed());
- return eqProducerConsumer.getLastProduced() ==
eqProducerConsumer.getLastConsumed();
+ return pidInRange("head&tail", eqProducerConsumer.getLastProduced(),
eqProducerConsumer.getLastConsumed());
}, 20000, 200));
eqProducerConsumer.done.set(true);
nodes.get(1).stop();
}
+
+ private boolean pidInRange(String s, long lastProduced, long lastConsumed) {
+ System.out.println(String.format("pidInRange - %s, lastProduced: %d,
lastConsumed: %d", s, lastProduced, lastConsumed));
+ if (lastConsumed == lastProduced) {
+ return true;
+ }
+ // in case of a send timeout or just a missed reply to send (the send
disposition is lost), produced pid does not get
+ // incremented to reflect the send completion on the broker. An off by
one error is expected sometimes.
+ // the connection can be whacked by the broker in the case of a slow
consumer
+ if (lastConsumed == lastProduced + 1) {
+ return true;
+ }
+ return false;
+ }
}