[ 
https://issues.apache.org/jira/browse/ARTEMIS-4193?focusedWorklogId=850071&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-850071
 ]

ASF GitHub Bot logged work on ARTEMIS-4193:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Mar/23 13:17
            Start Date: 09/Mar/23 13:17
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #4395:
URL: https://github.com/apache/activemq-artemis/pull/4395#discussion_r1131016813


##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.interruptlm;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// This is used to kill a server and make sure the server will remove any 
pending files.
+public class LargeMessageInterruptTest extends SoakTestBase {
+
+   public static final String SERVER_NAME_0 = "interruptlm";
+   private static final String JMX_SERVER_HOSTNAME = "localhost";
+   private static final int JMX_SERVER_PORT_0 = 1099;
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   static String liveURI = "service:jmx:rmi:///jndi/rmi://" + 
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
+   static ObjectNameBuilder liveNameBuilder = 
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), 
"lminterrupt", true);
+   Process serverProcess;
+
+   public ConnectionFactory createConnectionFactory(String protocol) {
+      return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+   }
+
+   @Before
+   public void before() throws Exception {
+      cleanupData(SERVER_NAME_0);
+      serverProcess = startServer(SERVER_NAME_0, 0, 30000);
+      disableCheckThread();
+   }
+
+   @Test
+   public void testInterruptLargeMessageAMQPTX() throws Throwable {
+      testInterruptLM("AMQP", true, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageAMQPTXPaging() throws Throwable {
+      testInterruptLM("AMQP", true, true);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORETX() throws Throwable {
+      testInterruptLM("CORE", true, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORETXPaging() throws Throwable {
+      testInterruptLM("CORE", true, true);
+   }
+
+
+   @Test
+   public void testInterruptLargeMessageOPENWIRETX() throws Throwable {
+      testInterruptLM("OPENWIRE", true, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageOPENWIRETXPaging() throws Throwable {
+      testInterruptLM("OPENWIRE", true, true);
+   }
+
+
+   @Test
+   public void testInterruptLargeMessageAMQPNonTX() throws Throwable {
+      testInterruptLM("AMQP", false, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageAMQPNonTXPaging() throws Throwable {
+      testInterruptLM("AMQP", false, true);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORENonTX() throws Throwable {
+      testInterruptLM("CORE", false, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORENonTXPaging() throws Throwable {
+      testInterruptLM("CORE", false, true);
+   }
+
+   private void testInterruptLM(String protocol, boolean tx, boolean paging) 
throws Throwable {
+      final int BODY_SIZE = 500 * 1024;
+      final int NUMBER_OF_MESSAGES = 10; // this is per producer
+      final int SENDING_THREADS = 10;
+      CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS);
+      final CountDownLatch done = new CountDownLatch(SENDING_THREADS);
+      final AtomicInteger produced = new AtomicInteger(0);
+      final ConnectionFactory factory = createConnectionFactory(protocol);
+      final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
+      final CountDownLatch killAt = new CountDownLatch(40);
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS);
+      runAfter(executorService::shutdownNow);
+
+      String queueName = "LargeMessageInterruptTest";
+
+      String largebody;
+
+      {
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < BODY_SIZE) {
+            buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T 
REALLY CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I 
DON'T REALLY CARE ");
+         }
+         largebody = buffer.toString();
+      }
+
+      if (paging) {
+         try (Connection connection = factory.createConnection()) {
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+            for (int i = 0; i < 1000; i++) {
+               producer.send(session.createTextMessage("forcePage"));
+            }
+            session.commit();
+         }
+      }
+
+      for (int i = 0; i < SENDING_THREADS; i++) {
+         executorService.execute(() -> {
+            int numberOfMessages = 0;
+            try {
+               Connection connection = factory.createConnection();
+               Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+               MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+
+               startFlag.await(10, TimeUnit.SECONDS);
+               while (numberOfMessages < NUMBER_OF_MESSAGES) {
+                  try {
+                     producer.send(session.createTextMessage(largebody));
+                     if (tx) {
+                        session.commit();
+                     }
+                     produced.incrementAndGet();
+                     killAt.countDown();
+                     if (numberOfMessages++ % 10 == 0) {
+                        logger.info("Sent {}", numberOfMessages);
+                     }
+                  } catch (Exception e) {
+                     logger.warn(e.getMessage(), e);
+
+                     logger.warn(e.getMessage(), e);
+                     try {
+                        connection.close();
+                     } catch (Throwable ignored) {
+                     }
+
+                     for (int retryNumber = 0; retryNumber < 100; 
retryNumber++) {
+                        try {
+                           Connection ctest = factory.createConnection();
+                           ctest.close();
+                           break;
+                        } catch (Throwable retry) {
+                           Thread.sleep(100);
+                        }
+                     }
+
+                     connection = factory.createConnection();
+                     session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+                     producer = 
session.createProducer(session.createQueue(queueName));
+                     connection.start();
+
+                  }
+               }
+            } catch (Exception e) {
+               logger.warn("Error getting the initial connection", e);
+               errors.incrementAndGet();
+            }
+
+            logger.info("Done sending");
+            done.countDown();
+         });
+      }
+
+      Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
+      serverProcess.destroyForcibly();
+      serverProcess = startServer(SERVER_NAME_0, 0, 0);
+      QueueControl queueControl = getQueueControl(liveURI, liveNameBuilder, 
queueName, queueName, RoutingType.ANYCAST, 5000);
+
+      Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
+      Assert.assertEquals(0, errors.get());
+
+      long numberOfMessages = queueControl.getMessageCount();

Review Comment:
   @gemmellr OH.. I see what you mean...
   
   
   I was going to place either that loop, or a connection loop over JMS... to 
wait the server to start.
   Since the threads already have a retry, I didn't consider to use them as the 
retry mechanism, which works perfectly. Kind of obvious but I was too stuck in 
my line of though.. thanks.
   
   
   I'm moving down the queueControl creation by 2 lines.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 850071)
    Time Spent: 2h 40m  (was: 2.5h)

> Interrupting Large Message Streaming with a server kill may leave orphaned 
> files
> --------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-4193
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4193
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.28.0
>            Reporter: Clebert Suconic
>            Priority: Major
>             Fix For: 2.29.0
>
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> There's a schema in the journal to store pending records before a file is 
> created.
> However the sync is not properly applied and if the server is killed it could 
> leave a few messages orphaned in the file system.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to