[ 
https://issues.apache.org/jira/browse/ARTEMIS-4193?focusedWorklogId=849807&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-849807
 ]

ASF GitHub Bot logged work on ARTEMIS-4193:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Mar/23 12:57
            Start Date: 08/Mar/23 12:57
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4395:
URL: https://github.com/apache/activemq-artemis/pull/4395#discussion_r1129337551


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -216,6 +219,10 @@ public void rebuild() throws Exception {
          try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
             while (iter.hasNext()) {
                PagedMessage msg = iter.next();
+               if (storedLargeMessages != null && 
msg.getMessage().isLargeMessage()) {
+                  logger.debug("removing storedLargeMessage {}", 
msg.getMessage().getMessageID());

Review Comment:
   As getMessageID returns a long it will be autoboxing, may be worth a gate 
since all the similar logging before and after this uses them already.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java:
##########
@@ -1272,13 +1280,16 @@ final class MutableLong {
 
          journalLoader.handleAddMessage(queueMap);
 
-         loadPreparedTransactions(postOffice, pagingManager, resourceManager, 
queueInfos, preparedTransactions, this::failedToPrepareException, 
pageSubscriptions, pendingLargeMessages, journalLoader);
+         loadPreparedTransactions(postOffice, pagingManager, resourceManager, 
queueInfos, preparedTransactions, this::failedToPrepareException, 
pageSubscriptions, pendingLargeMessages, storedLargeMessages, journalLoader);
 
          for (PageSubscription sub : pageSubscriptions.values()) {
             sub.getCounter().processReload();
          }
 
          for (LargeServerMessage msg : largeMessages) {
+            if (storedLargeMessages != null && 
storedLargeMessages.remove(msg.getMessageID())) {
+               logger.debug("Large message in folder removed on {}", 
msg.getMessageID());

Review Comment:
   Ditto re: gate.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java:
##########
@@ -797,6 +764,14 @@ private Map<Long, Pair<String, Long>> 
recoverPendingLargeMessages() throws Excep
       return largeMessages;
    }
 
+   @Override
+   public void recoverLargeMessagesOnFolder(Set<Long> files) throws Exception {
+      List<String> filenames = largeMessagesFactory.listFiles("msg");
+      filenames.forEach(f -> {
+         files.add(getLargeMessageIdFromFilename(f));
+      });
+   }

Review Comment:
   Might be clearer to call the Set 'messageIDs' or something, given it doesnt 
really contain files/names in the end.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java:
##########
@@ -3381,10 +3382,23 @@ synchronized void initialisePart2(boolean scalingDown) 
throws Exception {
 
       pagingManager.reloadStores();
 
-      JournalLoadInformation[] journalInfo = loadJournals();
+      Set<Long> storedLargeMessages = new HashSet<>();
+      JournalLoadInformation[] journalInfo = loadJournals(storedLargeMessages);
 
       if (rebuildCounters) {
-         pagingManager.rebuildCounters();
+         pagingManager.rebuildCounters(storedLargeMessages);
+
+         pagingManager.execute(() -> {
+            storedLargeMessages.forEach(id -> {
+               try {
+                  SequentialFile file = 
storageManager.createFileForLargeMessage(id, true);
+                  logger.debug("Removing pending large message for file={}", 
file);
+                  file.delete();
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);

Review Comment:
   Might be good to give some context in the log message about what was being 
done when the exception occurred. May not be so obvious to a user from the 
exception message itself.



##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.interruptlm;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// This is used to kill a server and make sure the server will remove any 
pending files.
+public class LargeMessageInterruptTest extends SoakTestBase {
+
+   public static final String SERVER_NAME_0 = "interruptlm";
+   private static final String JMX_SERVER_HOSTNAME = "localhost";
+   private static final int JMX_SERVER_PORT_0 = 1099;
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   static String liveURI = "service:jmx:rmi:///jndi/rmi://" + 
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
+   static ObjectNameBuilder liveNameBuilder = 
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), 
"lminterrupt", true);
+   Process serverProcess;
+
+   public ConnectionFactory createConnectionFactory(String protocol) {
+      return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+   }
+
+   @Before
+   public void before() throws Exception {
+      cleanupData(SERVER_NAME_0);
+      serverProcess = startServer(SERVER_NAME_0, 0, 30000);
+      disableCheckThread();
+   }
+
+   @Test
+   public void testInterruptLargeMessageAMQPTX() throws Throwable {
+      testInterruptLM("AMQP", true, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageAMQPTXPaging() throws Throwable {
+      testInterruptLM("AMQP", true, true);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORETX() throws Throwable {
+      testInterruptLM("CORE", true, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORETXPaging() throws Throwable {
+      testInterruptLM("CORE", true, true);
+   }
+
+
+   @Test
+   public void testInterruptLargeMessageOPENWIRETX() throws Throwable {
+      testInterruptLM("OPENWIRE", true, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageOPENWIRETXPaging() throws Throwable {
+      testInterruptLM("OPENWIRE", true, true);
+   }
+
+
+   @Test
+   public void testInterruptLargeMessageAMQPNonTX() throws Throwable {
+      testInterruptLM("AMQP", false, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageAMQPNonTXPaging() throws Throwable {
+      testInterruptLM("AMQP", false, true);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORENonTX() throws Throwable {
+      testInterruptLM("CORE", false, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORENonTXPaging() throws Throwable {
+      testInterruptLM("CORE", false, true);
+   }
+
+   private void testInterruptLM(String protocol, boolean tx, boolean paging) 
throws Throwable {
+      final int BODY_SIZE = 500 * 1024;
+      final int NUMBER_OF_MESSAGES = 10; // this is per producer
+      final int SENDING_THREADS = 10;
+      CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS);
+      final CountDownLatch done = new CountDownLatch(SENDING_THREADS);
+      final AtomicInteger produced = new AtomicInteger(0);
+      final ConnectionFactory factory = createConnectionFactory(protocol);
+      final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
+      final CountDownLatch killAt = new CountDownLatch(40);
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS);
+      runAfter(executorService::shutdownNow);
+
+      String queueName = "LargeMessageInterruptTest";
+
+      String largebody;
+
+      {
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < BODY_SIZE) {
+            buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T 
REALLY CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I 
DON'T REALLY CARE ");
+         }
+         largebody = buffer.toString();
+      }
+
+      if (paging) {
+         try (Connection connection = factory.createConnection()) {
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+            for (int i = 0; i < 1000; i++) {
+               producer.send(session.createTextMessage("forcePage"));
+            }
+            session.commit();
+         }
+      }
+
+      for (int i = 0; i < SENDING_THREADS; i++) {
+         executorService.execute(() -> {
+            int numberOfMessages = 0;
+            try {
+               Connection connection = factory.createConnection();
+               Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+               MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+
+               startFlag.await(10, TimeUnit.SECONDS);
+               while (numberOfMessages < NUMBER_OF_MESSAGES) {
+                  try {
+                     producer.send(session.createTextMessage(largebody));
+                     if (tx) {
+                        session.commit();
+                     }
+                     produced.incrementAndGet();
+                     killAt.countDown();
+                     if (numberOfMessages++ % 10 == 0) {
+                        logger.info("Sent {}", numberOfMessages);
+                     }
+                  } catch (Exception e) {
+                     logger.warn(e.getMessage(), e);
+
+                     logger.warn(e.getMessage(), e);
+                     try {
+                        connection.close();
+                     } catch (Throwable ignored) {
+                     }
+
+                     for (int retryNumber = 0; retryNumber < 100; 
retryNumber++) {
+                        try {
+                           Connection ctest = factory.createConnection();
+                           ctest.close();
+                           break;
+                        } catch (Throwable retry) {
+                           Thread.sleep(100);
+                        }
+                     }
+
+                     connection = factory.createConnection();
+                     session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+                     producer = 
session.createProducer(session.createQueue(queueName));
+                     connection.start();
+
+                  }
+               }
+            } catch (Exception e) {
+               logger.warn("Error getting the initial connection", e);
+               errors.incrementAndGet();
+            }
+
+            logger.info("Done sending");
+            done.countDown();
+         });
+      }
+
+      Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
+      serverProcess.destroyForcibly();
+      serverProcess = startServer(SERVER_NAME_0, 0, 0);
+      QueueControl queueControl = getQueueControl(liveURI, liveNameBuilder, 
queueName, queueName, RoutingType.ANYCAST, 5000);
+
+      Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
+      Assert.assertEquals(0, errors.get());
+
+      long numberOfMessages = queueControl.getMessageCount();
+      logger.info("there are {} messages", numberOfMessages);
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+         connection.start();
+         for (int i = 0; i < numberOfMessages; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertTrue(message.getText().equals("forcePage") || 
message.getText().equals(largebody));
+            Assert.assertNotNull(message);

Review Comment:
   Redundant, already checked and used above



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java:
##########
@@ -1839,6 +1838,9 @@ private void loadSinglePreparedTransaction(PostOffice 
postOffice,
             }
             case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
                Message message = decodeMessage(pools, buff);
+               if (storedLargeMessages != null && message.isLargeMessage() && 
storedLargeMessages.remove(record.id)) {
+                  logger.debug("PreparedTX load removing stored large message 
{}", record.id);

Review Comment:
   might be useful to differentiate the message from the currently-identical 
log message above to help discern which occurred without resorting to 
line-number output comparisons (Aside: I'm just assuming there is some 
'known'/'obvious' reason for the 2 different record types doing such similar 
thing to extent of having such simialr log messages).
   
   also, guessing same about gate.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java:
##########
@@ -1829,6 +1825,9 @@ private void loadSinglePreparedTransaction(PostOffice 
postOffice,
 
          switch (recordType) {
             case JournalRecordIds.ADD_LARGE_MESSAGE: {
+               if (storedLargeMessages != null && 
storedLargeMessages.remove(record.id)) {
+                  logger.debug("PreparedTX load removing stored large message 
{}", record.id);

Review Comment:
   guessing the same here



##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.interruptlm;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// This is used to kill a server and make sure the server will remove any 
pending files.
+public class LargeMessageInterruptTest extends SoakTestBase {
+
+   public static final String SERVER_NAME_0 = "interruptlm";
+   private static final String JMX_SERVER_HOSTNAME = "localhost";
+   private static final int JMX_SERVER_PORT_0 = 1099;
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   static String liveURI = "service:jmx:rmi:///jndi/rmi://" + 
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
+   static ObjectNameBuilder liveNameBuilder = 
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), 
"lminterrupt", true);
+   Process serverProcess;
+
+   public ConnectionFactory createConnectionFactory(String protocol) {
+      return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+   }
+
+   @Before
+   public void before() throws Exception {
+      cleanupData(SERVER_NAME_0);
+      serverProcess = startServer(SERVER_NAME_0, 0, 30000);
+      disableCheckThread();
+   }
+
+   @Test
+   public void testInterruptLargeMessageAMQPTX() throws Throwable {
+      testInterruptLM("AMQP", true, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageAMQPTXPaging() throws Throwable {
+      testInterruptLM("AMQP", true, true);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORETX() throws Throwable {
+      testInterruptLM("CORE", true, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORETXPaging() throws Throwable {
+      testInterruptLM("CORE", true, true);
+   }
+
+
+   @Test
+   public void testInterruptLargeMessageOPENWIRETX() throws Throwable {
+      testInterruptLM("OPENWIRE", true, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageOPENWIRETXPaging() throws Throwable {
+      testInterruptLM("OPENWIRE", true, true);
+   }
+
+
+   @Test
+   public void testInterruptLargeMessageAMQPNonTX() throws Throwable {
+      testInterruptLM("AMQP", false, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageAMQPNonTXPaging() throws Throwable {
+      testInterruptLM("AMQP", false, true);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORENonTX() throws Throwable {
+      testInterruptLM("CORE", false, false);
+   }
+
+   @Test
+   public void testInterruptLargeMessageCORENonTXPaging() throws Throwable {
+      testInterruptLM("CORE", false, true);
+   }
+
+   private void testInterruptLM(String protocol, boolean tx, boolean paging) 
throws Throwable {
+      final int BODY_SIZE = 500 * 1024;
+      final int NUMBER_OF_MESSAGES = 10; // this is per producer
+      final int SENDING_THREADS = 10;
+      CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS);
+      final CountDownLatch done = new CountDownLatch(SENDING_THREADS);
+      final AtomicInteger produced = new AtomicInteger(0);
+      final ConnectionFactory factory = createConnectionFactory(protocol);
+      final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
+      final CountDownLatch killAt = new CountDownLatch(40);
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS);
+      runAfter(executorService::shutdownNow);
+
+      String queueName = "LargeMessageInterruptTest";
+
+      String largebody;
+
+      {
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < BODY_SIZE) {
+            buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T 
REALLY CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I 
DON'T REALLY CARE ");
+         }
+         largebody = buffer.toString();
+      }
+
+      if (paging) {
+         try (Connection connection = factory.createConnection()) {
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+            for (int i = 0; i < 1000; i++) {
+               producer.send(session.createTextMessage("forcePage"));
+            }
+            session.commit();
+         }
+      }
+
+      for (int i = 0; i < SENDING_THREADS; i++) {
+         executorService.execute(() -> {
+            int numberOfMessages = 0;
+            try {
+               Connection connection = factory.createConnection();
+               Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+               MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+
+               startFlag.await(10, TimeUnit.SECONDS);
+               while (numberOfMessages < NUMBER_OF_MESSAGES) {
+                  try {
+                     producer.send(session.createTextMessage(largebody));
+                     if (tx) {
+                        session.commit();
+                     }
+                     produced.incrementAndGet();
+                     killAt.countDown();
+                     if (numberOfMessages++ % 10 == 0) {
+                        logger.info("Sent {}", numberOfMessages);
+                     }
+                  } catch (Exception e) {
+                     logger.warn(e.getMessage(), e);
+
+                     logger.warn(e.getMessage(), e);
+                     try {
+                        connection.close();
+                     } catch (Throwable ignored) {
+                     }
+
+                     for (int retryNumber = 0; retryNumber < 100; 
retryNumber++) {
+                        try {
+                           Connection ctest = factory.createConnection();
+                           ctest.close();
+                           break;
+                        } catch (Throwable retry) {
+                           Thread.sleep(100);
+                        }
+                     }
+
+                     connection = factory.createConnection();
+                     session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+                     producer = 
session.createProducer(session.createQueue(queueName));
+                     connection.start();
+
+                  }
+               }
+            } catch (Exception e) {
+               logger.warn("Error getting the initial connection", e);
+               errors.incrementAndGet();
+            }
+
+            logger.info("Done sending");
+            done.countDown();
+         });
+      }
+
+      Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
+      serverProcess.destroyForcibly();
+      serverProcess = startServer(SERVER_NAME_0, 0, 0);
+      QueueControl queueControl = getQueueControl(liveURI, liveNameBuilder, 
queueName, queueName, RoutingType.ANYCAST, 5000);
+
+      Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
+      Assert.assertEquals(0, errors.get());
+
+      long numberOfMessages = queueControl.getMessageCount();

Review Comment:
   Its not clear to me why this seems to go to the effort of starting the 
broker without waiting, spinning in a loop trying to connect to management to 
get a QueueControl (and potentially waiting 500ms at a time, and spewing 
tracktrace, while doing it), only to not use the QueueControl until after it is 
done waiting for sending to complete?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 849807)
    Time Spent: 1h  (was: 50m)

> Interrupting Large Message Streaming with a server kill may leave orphaned 
> files
> --------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-4193
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4193
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.28.0
>            Reporter: Clebert Suconic
>            Priority: Major
>             Fix For: 2.29.0
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> There's a schema in the journal to store pending records before a file is 
> created.
> However the sync is not properly applied and if the server is killed it could 
> leave a few messages orphaned in the file system.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to