gemmellr commented on code in PR #5338:
URL: https://github.com/apache/activemq-artemis/pull/5338#discussion_r1837938107


##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx, 
boolean useKill) throws
       ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
       runAfter(executorService::shutdownNow);
 
-      CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
-      CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 0, CONSUMING_THREADS, tx, queueName);
-
-      Thread.sleep(2000);
-
-      killProcess(serverProcess, useKill);
-      runningSend = false;
-      runningConsumer = false;
-      assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
-      logger.info("All receivers and senders are done!!!");
-
-      serverProcess = startServer0();
-
-      Thread.sleep(2000);
-
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
-
-      killProcess(serverProcess2, useKill);
-      assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
-      runningSend = false;
-      runningConsumer = false;
-      assertTrue(sendDone.await(1, TimeUnit.MINUTES));
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-
-      serverProcess2 = startServer1();
-
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
-
-      Thread.sleep(2000);
-      runningSend = false;
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
-      QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-      QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-
       File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
       File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + 
"/data/large-messages");
 
-      Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && 
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && 
lmFolder2.listFiles().length == 0);
+      {
+         CountDownLatch sendDone = startSendingThreads(executorService, 
protocol, 0, SENDING_THREADS, tx, queueName);
+         CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
+
+         // let it producing for a while
+         Thread.sleep(2000);
+
+         runningSend = false;
+         assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+         killProcess(serverProcess);
+         assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));

Review Comment:
   It was 10 seconds before, is going to a minute really necessary? It seems 
excessive. Especially given that is also now waits for the senders to complete 
_before_ the kill now, which it didnt before, so there should arguably be far 
less going on then. Its also always doing a forcible destroy anyway.



##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx, 
boolean useKill) throws
       ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
       runAfter(executorService::shutdownNow);
 
-      CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
-      CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 0, CONSUMING_THREADS, tx, queueName);
-
-      Thread.sleep(2000);
-
-      killProcess(serverProcess, useKill);
-      runningSend = false;
-      runningConsumer = false;
-      assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
-      logger.info("All receivers and senders are done!!!");
-
-      serverProcess = startServer0();
-
-      Thread.sleep(2000);
-
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
-
-      killProcess(serverProcess2, useKill);
-      assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
-      runningSend = false;
-      runningConsumer = false;
-      assertTrue(sendDone.await(1, TimeUnit.MINUTES));
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-
-      serverProcess2 = startServer1();
-
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
-
-      Thread.sleep(2000);
-      runningSend = false;
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
-      QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-      QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-
       File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
       File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + 
"/data/large-messages");
 
-      Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && 
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && 
lmFolder2.listFiles().length == 0);
+      {
+         CountDownLatch sendDone = startSendingThreads(executorService, 
protocol, 0, SENDING_THREADS, tx, queueName);
+         CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
+
+         // let it producing for a while
+         Thread.sleep(2000);
+
+         runningSend = false;
+         assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+         killProcess(serverProcess);
+         assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
+         serverProcess = startServer0();
+         runningConsumer = false;
+         assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+         SimpleManagement simpleManagement1 = new 
SimpleManagement("tcp://localhost:61716", null, null);
+         runAfter(simpleManagement1::close);

Review Comment:
   Is this actually used / doing anything?



##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx, 
boolean useKill) throws
       ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
       runAfter(executorService::shutdownNow);
 
-      CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
-      CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 0, CONSUMING_THREADS, tx, queueName);
-
-      Thread.sleep(2000);
-
-      killProcess(serverProcess, useKill);
-      runningSend = false;
-      runningConsumer = false;
-      assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
-      logger.info("All receivers and senders are done!!!");
-
-      serverProcess = startServer0();
-
-      Thread.sleep(2000);
-
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
-
-      killProcess(serverProcess2, useKill);
-      assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
-      runningSend = false;
-      runningConsumer = false;
-      assertTrue(sendDone.await(1, TimeUnit.MINUTES));
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-
-      serverProcess2 = startServer1();
-
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
-
-      Thread.sleep(2000);
-      runningSend = false;
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
-      QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-      QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-
       File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
       File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + 
"/data/large-messages");
 
-      Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && 
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && 
lmFolder2.listFiles().length == 0);
+      {
+         CountDownLatch sendDone = startSendingThreads(executorService, 
protocol, 0, SENDING_THREADS, tx, queueName);
+         CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
+
+         // let it producing for a while
+         Thread.sleep(2000);
+
+         runningSend = false;
+         assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+         killProcess(serverProcess);
+         assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
+         serverProcess = startServer0();
+         runningConsumer = false;
+         assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+         SimpleManagement simpleManagement1 = new 
SimpleManagement("tcp://localhost:61716", null, null);
+         runAfter(simpleManagement1::close);
+
+         long timeout = System.currentTimeMillis() + 60_000;
+
+         ConnectionFactory factory = createConnectionFactory(1, protocol);
+         try (Connection connection = factory.createConnection()) {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+            connection.start();
+            while (System.currentTimeMillis() < timeout) {
+               if (consumer.receive(1_000) == null) {
+                  if (lmFolder.listFiles().length == 0 && 
lmFolder2.listFiles().length == 0) {
+                     break;
+                  }
+               }
+            }
+         }

Review Comment:
   This isnt checking the messages like the original consumers do, probably 
should.
   Should this be transacted/not like the overall test method and original 
consumers?
   
   Might be worth adding some logging, akin to the original consumers, to see 
what is happening...e.g is it sitting there for a minute (and then later 
another 5 / 10 sec later) because the files list isnt empty?



##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx, 
boolean useKill) throws
       ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
       runAfter(executorService::shutdownNow);
 
-      CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
-      CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 0, CONSUMING_THREADS, tx, queueName);
-
-      Thread.sleep(2000);
-
-      killProcess(serverProcess, useKill);
-      runningSend = false;
-      runningConsumer = false;
-      assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
-      logger.info("All receivers and senders are done!!!");
-
-      serverProcess = startServer0();
-
-      Thread.sleep(2000);
-
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
-
-      killProcess(serverProcess2, useKill);
-      assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
-      runningSend = false;
-      runningConsumer = false;
-      assertTrue(sendDone.await(1, TimeUnit.MINUTES));
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
-
-      serverProcess2 = startServer1();
-
-      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
-      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
-
-      Thread.sleep(2000);
-      runningSend = false;
-      assertTrue(sendDone.await(10, TimeUnit.SECONDS));
-
-      QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-      QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-
       File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
       File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + 
"/data/large-messages");
 
-      Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && 
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && 
lmFolder2.listFiles().length == 0);
+      {
+         CountDownLatch sendDone = startSendingThreads(executorService, 
protocol, 0, SENDING_THREADS, tx, queueName);
+         CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
+
+         // let it producing for a while
+         Thread.sleep(2000);
+
+         runningSend = false;
+         assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+         killProcess(serverProcess);
+         assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
+         serverProcess = startServer0();
+         runningConsumer = false;
+         assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+         SimpleManagement simpleManagement1 = new 
SimpleManagement("tcp://localhost:61716", null, null);
+         runAfter(simpleManagement1::close);
+
+         long timeout = System.currentTimeMillis() + 60_000;
+
+         ConnectionFactory factory = createConnectionFactory(1, protocol);
+         try (Connection connection = factory.createConnection()) {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+            connection.start();
+            while (System.currentTimeMillis() < timeout) {
+               if (consumer.receive(1_000) == null) {
+                  if (lmFolder.listFiles().length == 0 && 
lmFolder2.listFiles().length == 0) {
+                     break;
+                  }
+               }
+            }
+         }
 
-      runningConsumer = false;
-      assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+      }
+
+      logger.info("All receivers and senders are done!!!");
 
       // no need to use wait here, the previous check should have checked that 
already
-      assertEquals(0, lmFolder.listFiles().length);
-      assertEquals(0, lmFolder2.listFiles().length);
+      Wait.assertEquals(0, () -> lmFolder.listFiles().length, 5000);
+      Wait.assertEquals(0, () -> lmFolder2.listFiles().length, 5000);

Review Comment:
   If adding waits it seems like the comment immediately above, asserting there 
is no need to use wait, probably ought to be updated in some way.



##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -375,21 +344,20 @@ private void testInterruptFailOnBridge(String protocol, 
boolean tx) throws Throw
       ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
       runAfter(executorService::shutdownNow);
 
-      // only start the sender for a while
       CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
 
       Thread.sleep(2000);
 
       runningSend = runningConsumer = false;
 
-      killProcess(serverProcess, false);
-      assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
+      killProcess(serverProcess);
+      assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));

Review Comment:
   This went from 10mins to 1min, which is good progress....but does it really 
need to be a minute, for a forcible kill?



##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -110,19 +111,22 @@ private static String createBody() {
    Process serverProcess2;
 
    public ConnectionFactory createConnectionFactory(int broker, String 
protocol) {
-      if (protocol.equals("CORE")) {
-         switch (broker) {
+      switch (protocol) {
+         case "CORE":
+            switch (broker) {
             // I need the connections stable in the selected server
-            case 0:
-               return new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
-            case 1:
-               return new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
-            default:
-               logger.warn("undefined argument {}", broker);
-               throw new IllegalArgumentException("undefined");
-         }
-      } else {
-         return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + 
(61616 + broker * 100) + "?ha=false");
+               case 0:
+                  return new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
+               case 1:
+                  return new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
+               default:
+                  logger.warn("undefined argument {}", broker);
+                  throw new IllegalArgumentException("undefined");
+            }
+         case "OPENWIRE":
+            return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" 
+ (61616 + broker * 100));
+         default:
+            return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" 
+ (61616 + broker * 100) + "?ha=false");

Review Comment:
   If adding an openwire case to the switch then would proably be clearer 
adding an AMQP one too, the same or more specific, since the  "?ha=false" 
option passed in this default case is not one that will use either, only the 
CORE client does.
   
   Would be clearer for the default case here to just throw for unknown 
protocol value.



##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java:
##########
@@ -375,21 +344,20 @@ private void testInterruptFailOnBridge(String protocol, 
boolean tx) throws Throw
       ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
       runAfter(executorService::shutdownNow);
 
-      // only start the sender for a while
       CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
 
       Thread.sleep(2000);
 
       runningSend = runningConsumer = false;
 
-      killProcess(serverProcess, false);
-      assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
+      killProcess(serverProcess);
+      assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
       assertTrue(sendDone.await(10, TimeUnit.SECONDS));
 
       sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
       CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
-      killProcess(serverProcess, false);
-      assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
+      killProcess(serverProcess);
+      assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));

Review Comment:
   Does it need to increase this much?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to