gemmellr commented on code in PR #5338: URL: https://github.com/apache/activemq-artemis/pull/5338#discussion_r1837938107
########## tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java: ########## @@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx, boolean useKill) throws ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS); runAfter(executorService::shutdownNow); - CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); - CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 0, CONSUMING_THREADS, tx, queueName); - - Thread.sleep(2000); - - killProcess(serverProcess, useKill); - runningSend = false; - runningConsumer = false; - assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS)); - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); - - logger.info("All receivers and senders are done!!!"); - - serverProcess = startServer0(); - - Thread.sleep(2000); - - sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); - receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - - killProcess(serverProcess2, useKill); - assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS)); - runningSend = false; - runningConsumer = false; - assertTrue(sendDone.await(1, TimeUnit.MINUTES)); - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); - - serverProcess2 = startServer1(); - - sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); - receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - - Thread.sleep(2000); - runningSend = false; - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); - - QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000); - QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000); - File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages"); File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + "/data/large-messages"); - Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0); + { + CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); + CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); + + // let it producing for a while + Thread.sleep(2000); + + runningSend = false; + assertTrue(sendDone.await(10, TimeUnit.SECONDS)); + + killProcess(serverProcess); + assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); Review Comment: It was 10 seconds before, is going to a minute really necessary? It seems excessive. Especially given that is also now waits for the senders to complete _before_ the kill now, which it didnt before, so there should arguably be far less going on then. Its also always doing a forcible destroy anyway. ########## tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java: ########## @@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx, boolean useKill) throws ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS); runAfter(executorService::shutdownNow); - CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); - CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 0, CONSUMING_THREADS, tx, queueName); - - Thread.sleep(2000); - - killProcess(serverProcess, useKill); - runningSend = false; - runningConsumer = false; - assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS)); - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); - - logger.info("All receivers and senders are done!!!"); - - serverProcess = startServer0(); - - Thread.sleep(2000); - - sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); - receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - - killProcess(serverProcess2, useKill); - assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS)); - runningSend = false; - runningConsumer = false; - assertTrue(sendDone.await(1, TimeUnit.MINUTES)); - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); - - serverProcess2 = startServer1(); - - sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); - receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - - Thread.sleep(2000); - runningSend = false; - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); - - QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000); - QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000); - File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages"); File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + "/data/large-messages"); - Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0); + { + CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); + CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); + + // let it producing for a while + Thread.sleep(2000); + + runningSend = false; + assertTrue(sendDone.await(10, TimeUnit.SECONDS)); + + killProcess(serverProcess); + assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); + serverProcess = startServer0(); + runningConsumer = false; + assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); + + SimpleManagement simpleManagement1 = new SimpleManagement("tcp://localhost:61716", null, null); + runAfter(simpleManagement1::close); Review Comment: Is this actually used / doing anything? ########## tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java: ########## @@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx, boolean useKill) throws ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS); runAfter(executorService::shutdownNow); - CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); - CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 0, CONSUMING_THREADS, tx, queueName); - - Thread.sleep(2000); - - killProcess(serverProcess, useKill); - runningSend = false; - runningConsumer = false; - assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS)); - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); - - logger.info("All receivers and senders are done!!!"); - - serverProcess = startServer0(); - - Thread.sleep(2000); - - sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); - receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - - killProcess(serverProcess2, useKill); - assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS)); - runningSend = false; - runningConsumer = false; - assertTrue(sendDone.await(1, TimeUnit.MINUTES)); - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); - - serverProcess2 = startServer1(); - - sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); - receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - - Thread.sleep(2000); - runningSend = false; - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); - - QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000); - QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000); - File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages"); File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + "/data/large-messages"); - Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0); + { + CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); + CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); + + // let it producing for a while + Thread.sleep(2000); + + runningSend = false; + assertTrue(sendDone.await(10, TimeUnit.SECONDS)); + + killProcess(serverProcess); + assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); + serverProcess = startServer0(); + runningConsumer = false; + assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); + + SimpleManagement simpleManagement1 = new SimpleManagement("tcp://localhost:61716", null, null); + runAfter(simpleManagement1::close); + + long timeout = System.currentTimeMillis() + 60_000; + + ConnectionFactory factory = createConnectionFactory(1, protocol); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + connection.start(); + while (System.currentTimeMillis() < timeout) { + if (consumer.receive(1_000) == null) { + if (lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0) { + break; + } + } + } + } Review Comment: This isnt checking the messages like the original consumers do, probably should. Should this be transacted/not like the overall test method and original consumers? Might be worth adding some logging, akin to the original consumers, to see what is happening...e.g is it sitting there for a minute (and then later another 5 / 10 sec later) because the files list isnt empty? ########## tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java: ########## @@ -289,57 +268,51 @@ private void testInterrupt(String protocol, boolean tx, boolean useKill) throws ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS); runAfter(executorService::shutdownNow); - CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); - CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 0, CONSUMING_THREADS, tx, queueName); - - Thread.sleep(2000); - - killProcess(serverProcess, useKill); - runningSend = false; - runningConsumer = false; - assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS)); - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); - - logger.info("All receivers and senders are done!!!"); - - serverProcess = startServer0(); - - Thread.sleep(2000); - - sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); - receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - - killProcess(serverProcess2, useKill); - assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS)); - runningSend = false; - runningConsumer = false; - assertTrue(sendDone.await(1, TimeUnit.MINUTES)); - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); - - serverProcess2 = startServer1(); - - sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); - receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - - Thread.sleep(2000); - runningSend = false; - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); - - QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000); - QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000); - File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages"); File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + "/data/large-messages"); - Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0); + { + CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); + CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); + + // let it producing for a while + Thread.sleep(2000); + + runningSend = false; + assertTrue(sendDone.await(10, TimeUnit.SECONDS)); + + killProcess(serverProcess); + assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); + serverProcess = startServer0(); + runningConsumer = false; + assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); + + SimpleManagement simpleManagement1 = new SimpleManagement("tcp://localhost:61716", null, null); + runAfter(simpleManagement1::close); + + long timeout = System.currentTimeMillis() + 60_000; + + ConnectionFactory factory = createConnectionFactory(1, protocol); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + connection.start(); + while (System.currentTimeMillis() < timeout) { + if (consumer.receive(1_000) == null) { + if (lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0) { + break; + } + } + } + } - runningConsumer = false; - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); + } + + logger.info("All receivers and senders are done!!!"); // no need to use wait here, the previous check should have checked that already - assertEquals(0, lmFolder.listFiles().length); - assertEquals(0, lmFolder2.listFiles().length); + Wait.assertEquals(0, () -> lmFolder.listFiles().length, 5000); + Wait.assertEquals(0, () -> lmFolder2.listFiles().length, 5000); Review Comment: If adding waits it seems like the comment immediately above, asserting there is no need to use wait, probably ought to be updated in some way. ########## tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java: ########## @@ -375,21 +344,20 @@ private void testInterruptFailOnBridge(String protocol, boolean tx) throws Throw ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS); runAfter(executorService::shutdownNow); - // only start the sender for a while CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); Thread.sleep(2000); runningSend = runningConsumer = false; - killProcess(serverProcess, false); - assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES)); + killProcess(serverProcess); + assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); Review Comment: This went from 10mins to 1min, which is good progress....but does it really need to be a minute, for a forcible kill? ########## tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java: ########## @@ -110,19 +111,22 @@ private static String createBody() { Process serverProcess2; public ConnectionFactory createConnectionFactory(int broker, String protocol) { - if (protocol.equals("CORE")) { - switch (broker) { + switch (protocol) { + case "CORE": + switch (broker) { // I need the connections stable in the selected server - case 0: - return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000"); - case 1: - return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000"); - default: - logger.warn("undefined argument {}", broker); - throw new IllegalArgumentException("undefined"); - } - } else { - return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + (61616 + broker * 100) + "?ha=false"); + case 0: + return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000"); + case 1: + return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000"); + default: + logger.warn("undefined argument {}", broker); + throw new IllegalArgumentException("undefined"); + } + case "OPENWIRE": + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + (61616 + broker * 100)); + default: + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + (61616 + broker * 100) + "?ha=false"); Review Comment: If adding an openwire case to the switch then would proably be clearer adding an AMQP one too, the same or more specific, since the "?ha=false" option passed in this default case is not one that will use either, only the CORE client does. Would be clearer for the default case here to just throw for unknown protocol value. ########## tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java: ########## @@ -375,21 +344,20 @@ private void testInterruptFailOnBridge(String protocol, boolean tx) throws Throw ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS); runAfter(executorService::shutdownNow); - // only start the sender for a while CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); Thread.sleep(2000); runningSend = runningConsumer = false; - killProcess(serverProcess, false); - assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES)); + killProcess(serverProcess); + assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); assertTrue(sendDone.await(10, TimeUnit.SECONDS)); sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - killProcess(serverProcess, false); - assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS)); + killProcess(serverProcess); + assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); Review Comment: Does it need to increase this much? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact