gemmellr commented on code in PR #5338:
URL: https://github.com/apache/activemq-artemis/pull/5338#discussion_r1837938107
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx,
boolean useKill) throws
ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);
- CountDownLatch sendDone = startSendingThreads(executorService, protocol,
0, SENDING_THREADS, tx, queueName);
- CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 0, CONSUMING_THREADS, tx, queueName);
-
- Thread.sleep(2000);
-
- killProcess(serverProcess, useKill);
- runningSend = false;
- runningConsumer = false;
- assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
- assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
- assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
- logger.info("All receivers and senders are done!!!");
-
- serverProcess = startServer0();
-
- Thread.sleep(2000);
-
- sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
- receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
-
- killProcess(serverProcess2, useKill);
- assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
- runningSend = false;
- runningConsumer = false;
- assertTrue(sendDone.await(1, TimeUnit.MINUTES));
- assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-
- serverProcess2 = startServer1();
-
- sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
- receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
-
- Thread.sleep(2000);
- runningSend = false;
- assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
- QueueControl queueControl1 = getQueueControl(server1URI, builderServer1,
queueName, queueName, RoutingType.ANYCAST, 5000);
- QueueControl queueControl2 = getQueueControl(server2URI, builderServer2,
queueName, queueName, RoutingType.ANYCAST, 5000);
-
File lmFolder = new File(getServerLocation(SERVER_NAME_0) +
"/data/large-messages");
File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) +
"/data/large-messages");
- Wait.waitFor(() -> queueControl1.getMessageCount() == 0 &&
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 &&
lmFolder2.listFiles().length == 0);
+ {
+ CountDownLatch sendDone = startSendingThreads(executorService,
protocol, 0, SENDING_THREADS, tx, queueName);
+ CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 1, CONSUMING_THREADS, tx, queueName);
+
+ // let it producing for a while
+ Thread.sleep(2000);
+
+ runningSend = false;
+ assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+ killProcess(serverProcess);
+ assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
Review Comment:
It was 10 seconds before, is going to a minute really necessary? It seems
excessive. Especially given that is also now waits for the senders to complete
_before_ the kill now, which it didnt before, so there should arguably be far
less going on then. Its also always doing a forcible destroy anyway.
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx,
boolean useKill) throws
ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);
- CountDownLatch sendDone = startSendingThreads(executorService, protocol,
0, SENDING_THREADS, tx, queueName);
- CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 0, CONSUMING_THREADS, tx, queueName);
-
- Thread.sleep(2000);
-
- killProcess(serverProcess, useKill);
- runningSend = false;
- runningConsumer = false;
- assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
- assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
- assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
- logger.info("All receivers and senders are done!!!");
-
- serverProcess = startServer0();
-
- Thread.sleep(2000);
-
- sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
- receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
-
- killProcess(serverProcess2, useKill);
- assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
- runningSend = false;
- runningConsumer = false;
- assertTrue(sendDone.await(1, TimeUnit.MINUTES));
- assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-
- serverProcess2 = startServer1();
-
- sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
- receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
-
- Thread.sleep(2000);
- runningSend = false;
- assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
- QueueControl queueControl1 = getQueueControl(server1URI, builderServer1,
queueName, queueName, RoutingType.ANYCAST, 5000);
- QueueControl queueControl2 = getQueueControl(server2URI, builderServer2,
queueName, queueName, RoutingType.ANYCAST, 5000);
-
File lmFolder = new File(getServerLocation(SERVER_NAME_0) +
"/data/large-messages");
File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) +
"/data/large-messages");
- Wait.waitFor(() -> queueControl1.getMessageCount() == 0 &&
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 &&
lmFolder2.listFiles().length == 0);
+ {
+ CountDownLatch sendDone = startSendingThreads(executorService,
protocol, 0, SENDING_THREADS, tx, queueName);
+ CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 1, CONSUMING_THREADS, tx, queueName);
+
+ // let it producing for a while
+ Thread.sleep(2000);
+
+ runningSend = false;
+ assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+ killProcess(serverProcess);
+ assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
+ serverProcess = startServer0();
+ runningConsumer = false;
+ assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+ SimpleManagement simpleManagement1 = new
SimpleManagement("tcp://localhost:61716", null, null);
+ runAfter(simpleManagement1::close);
Review Comment:
Is this actually used / doing anything?
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx,
boolean useKill) throws
ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);
- CountDownLatch sendDone = startSendingThreads(executorService, protocol,
0, SENDING_THREADS, tx, queueName);
- CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 0, CONSUMING_THREADS, tx, queueName);
-
- Thread.sleep(2000);
-
- killProcess(serverProcess, useKill);
- runningSend = false;
- runningConsumer = false;
- assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
- assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
- assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
- logger.info("All receivers and senders are done!!!");
-
- serverProcess = startServer0();
-
- Thread.sleep(2000);
-
- sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
- receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
-
- killProcess(serverProcess2, useKill);
- assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
- runningSend = false;
- runningConsumer = false;
- assertTrue(sendDone.await(1, TimeUnit.MINUTES));
- assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-
- serverProcess2 = startServer1();
-
- sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
- receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
-
- Thread.sleep(2000);
- runningSend = false;
- assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
- QueueControl queueControl1 = getQueueControl(server1URI, builderServer1,
queueName, queueName, RoutingType.ANYCAST, 5000);
- QueueControl queueControl2 = getQueueControl(server2URI, builderServer2,
queueName, queueName, RoutingType.ANYCAST, 5000);
-
File lmFolder = new File(getServerLocation(SERVER_NAME_0) +
"/data/large-messages");
File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) +
"/data/large-messages");
- Wait.waitFor(() -> queueControl1.getMessageCount() == 0 &&
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 &&
lmFolder2.listFiles().length == 0);
+ {
+ CountDownLatch sendDone = startSendingThreads(executorService,
protocol, 0, SENDING_THREADS, tx, queueName);
+ CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 1, CONSUMING_THREADS, tx, queueName);
+
+ // let it producing for a while
+ Thread.sleep(2000);
+
+ runningSend = false;
+ assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+ killProcess(serverProcess);
+ assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
+ serverProcess = startServer0();
+ runningConsumer = false;
+ assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+ SimpleManagement simpleManagement1 = new
SimpleManagement("tcp://localhost:61716", null, null);
+ runAfter(simpleManagement1::close);
+
+ long timeout = System.currentTimeMillis() + 60_000;
+
+ ConnectionFactory factory = createConnectionFactory(1, protocol);
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName));
+ connection.start();
+ while (System.currentTimeMillis() < timeout) {
+ if (consumer.receive(1_000) == null) {
+ if (lmFolder.listFiles().length == 0 &&
lmFolder2.listFiles().length == 0) {
+ break;
+ }
+ }
+ }
+ }
Review Comment:
This isnt checking the messages like the original consumers do, probably
should.
Should this be transacted/not like the overall test method and original
consumers?
Might be worth adding some logging, akin to the original consumers, to see
what is happening...e.g is it sitting there for a minute (and then later
another 5 / 10 sec later) because the files list isnt empty?
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx,
boolean useKill) throws
ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);
- CountDownLatch sendDone = startSendingThreads(executorService, protocol,
0, SENDING_THREADS, tx, queueName);
- CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 0, CONSUMING_THREADS, tx, queueName);
-
- Thread.sleep(2000);
-
- killProcess(serverProcess, useKill);
- runningSend = false;
- runningConsumer = false;
- assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
- assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
- assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
- logger.info("All receivers and senders are done!!!");
-
- serverProcess = startServer0();
-
- Thread.sleep(2000);
-
- sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
- receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
-
- killProcess(serverProcess2, useKill);
- assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
- runningSend = false;
- runningConsumer = false;
- assertTrue(sendDone.await(1, TimeUnit.MINUTES));
- assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-
- serverProcess2 = startServer1();
-
- sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
- receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
-
- Thread.sleep(2000);
- runningSend = false;
- assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
- QueueControl queueControl1 = getQueueControl(server1URI, builderServer1,
queueName, queueName, RoutingType.ANYCAST, 5000);
- QueueControl queueControl2 = getQueueControl(server2URI, builderServer2,
queueName, queueName, RoutingType.ANYCAST, 5000);
-
File lmFolder = new File(getServerLocation(SERVER_NAME_0) +
"/data/large-messages");
File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) +
"/data/large-messages");
- Wait.waitFor(() -> queueControl1.getMessageCount() == 0 &&
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 &&
lmFolder2.listFiles().length == 0);
+ {
+ CountDownLatch sendDone = startSendingThreads(executorService,
protocol, 0, SENDING_THREADS, tx, queueName);
+ CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 1, CONSUMING_THREADS, tx, queueName);
+
+ // let it producing for a while
+ Thread.sleep(2000);
+
+ runningSend = false;
+ assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+ killProcess(serverProcess);
+ assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
+ serverProcess = startServer0();
+ runningConsumer = false;
+ assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+ SimpleManagement simpleManagement1 = new
SimpleManagement("tcp://localhost:61716", null, null);
+ runAfter(simpleManagement1::close);
+
+ long timeout = System.currentTimeMillis() + 60_000;
+
+ ConnectionFactory factory = createConnectionFactory(1, protocol);
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName));
+ connection.start();
+ while (System.currentTimeMillis() < timeout) {
+ if (consumer.receive(1_000) == null) {
+ if (lmFolder.listFiles().length == 0 &&
lmFolder2.listFiles().length == 0) {
+ break;
+ }
+ }
+ }
+ }
- runningConsumer = false;
- assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+ }
+
+ logger.info("All receivers and senders are done!!!");
// no need to use wait here, the previous check should have checked that
already
- assertEquals(0, lmFolder.listFiles().length);
- assertEquals(0, lmFolder2.listFiles().length);
+ Wait.assertEquals(0, () -> lmFolder.listFiles().length, 5000);
+ Wait.assertEquals(0, () -> lmFolder2.listFiles().length, 5000);
Review Comment:
If adding waits it seems like the comment immediately above, asserting there
is no need to use wait, probably ought to be updated in some way.
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -375,21 +344,20 @@ private void testInterruptFailOnBridge(String protocol,
boolean tx) throws Throw
ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);
- // only start the sender for a while
CountDownLatch sendDone = startSendingThreads(executorService, protocol,
0, SENDING_THREADS, tx, queueName);
Thread.sleep(2000);
runningSend = runningConsumer = false;
- killProcess(serverProcess, false);
- assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
+ killProcess(serverProcess);
+ assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
Review Comment:
This went from 10mins to 1min, which is good progress....but does it really
need to be a minute, for a forcible kill?
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -110,19 +111,22 @@ private static String createBody() {
Process serverProcess2;
public ConnectionFactory createConnectionFactory(int broker, String
protocol) {
- if (protocol.equals("CORE")) {
- switch (broker) {
+ switch (protocol) {
+ case "CORE":
+ switch (broker) {
// I need the connections stable in the selected server
- case 0:
- return new
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
- case 1:
- return new
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
- default:
- logger.warn("undefined argument {}", broker);
- throw new IllegalArgumentException("undefined");
- }
- } else {
- return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" +
(61616 + broker * 100) + "?ha=false");
+ case 0:
+ return new
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
+ case 1:
+ return new
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
+ default:
+ logger.warn("undefined argument {}", broker);
+ throw new IllegalArgumentException("undefined");
+ }
+ case "OPENWIRE":
+ return CFUtil.createConnectionFactory(protocol, "tcp://localhost:"
+ (61616 + broker * 100));
+ default:
+ return CFUtil.createConnectionFactory(protocol, "tcp://localhost:"
+ (61616 + broker * 100) + "?ha=false");
Review Comment:
If adding an openwire case to the switch then would proably be clearer
adding an AMQP one too, the same or more specific, since the "?ha=false"
option passed in this default case is not one that will use either, only the
CORE client does.
Would be clearer for the default case here to just throw for unknown
protocol value.
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -375,21 +344,20 @@ private void testInterruptFailOnBridge(String protocol,
boolean tx) throws Throw
ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);
- // only start the sender for a while
CountDownLatch sendDone = startSendingThreads(executorService, protocol,
0, SENDING_THREADS, tx, queueName);
Thread.sleep(2000);
runningSend = runningConsumer = false;
- killProcess(serverProcess, false);
- assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
+ killProcess(serverProcess);
+ assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
assertTrue(sendDone.await(10, TimeUnit.SECONDS));
sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 1, CONSUMING_THREADS, tx, queueName);
- killProcess(serverProcess, false);
- assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
+ killProcess(serverProcess);
+ assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
Review Comment:
Does it need to increase this much?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact