This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a5601572e ARTEMIS-4207 Redistribution could leave messages stranded 
in the folder
3a5601572e is described below

commit 3a5601572e63dfa9299625f72e1a9a1fe12d1a7d
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 15 10:33:10 2023 -0400

    ARTEMIS-4207 Redistribution could leave messages stranded in the folder
    
    - redistribute received the handle call, it then copies the message
    - the routing table changes
    - the message is left behind
    
    With the new version of the server these messages will be removed. But we 
should remove these right away
---
 .../impl/journal/JournalStorageManager.java        |  3 +
 .../impl/journal/LargeServerMessageImpl.java       |  9 ++-
 .../core/postoffice/impl/PostOfficeImpl.java       | 21 ++++++-
 .../protocol/core/ServerSessionPacketHandler.java  | 29 ++++++++++
 .../core/server/cluster/impl/Redistributor.java    |  6 ++
 .../ClusteredLargeMessageInterruptTest.java        | 67 ++++++++++++++++------
 .../interruptlm/LargeMessageInterruptTest.java     |  8 ++-
 7 files changed, 123 insertions(+), 20 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 678af8ee6c..336a1820df 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -507,6 +507,9 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
 
    @Override
    public LargeServerMessage createLargeMessage(final long id, final Message 
message) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Initializing large message {}", id, new 
Exception("trace"));
+      }
       try (ArtemisCloseable lock = closeableReadLock()) {
          if (isReplicated()) {
             replicator.largeMessageBegin(id);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 262db1bb8c..02a54e234c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -72,7 +72,11 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements CoreLar
 
    private static Message asLargeMessage(Message message, StorageManager 
storageManager) throws Exception {
       ICoreMessage coreMessage = message.toCore();
-      LargeServerMessage lsm = 
storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
+      long id = storageManager.generateID();
+      if (logger.isDebugEnabled()) {
+         logger.debug("asLargeMessage create largeMessage with id={}", id);
+      }
+      LargeServerMessage lsm = storageManager.createLargeMessage(id, 
coreMessage);
       ActiveMQBuffer messageBodyBuffer = coreMessage.getReadOnlyBodyBuffer();
       final int readableBytes = messageBodyBuffer.readableBytes();
 
@@ -306,6 +310,9 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements CoreLar
    @Override
    public Message copy(final long newID) {
       try {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Copy large message id={} as newID={}", 
this.getMessageID(), newID);
+         }
          LargeServerMessage newMessage = 
storageManager.createLargeMessage(newID, this);
          largeBody.copyInto(newMessage);
          newMessage.releaseResources(true, true);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 50b995491d..c5b08c01b7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -121,6 +121,8 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
 
    public static final SimpleString BRIDGE_CACHE_STR = new 
SimpleString("BRIDGE.");
 
+   private final Executor postOfficeExecutor;
+
    private final AddressManager addressManager;
 
    private final QueueFactory queueFactory;
@@ -192,6 +194,8 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
 
       this.addressSettingsRepository = addressSettingsRepository;
 
+      this.postOfficeExecutor = server.getExecutorFactory().getExecutor();
+
       this.server = server;
    }
 
@@ -1391,7 +1395,8 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
          // We have to copy the message and store it separately, otherwise we 
may lose remote bindings in case of restart before the message
          // arrived the target node
          // as described on https://issues.jboss.org/browse/JBPAPP-6130
-         Message copyRedistribute = message.copy(storageManager.generateID());
+         final Message copyRedistribute = 
message.copy(storageManager.generateID());
+         logger.info("Message {} being copied as {}", message.getMessageID(), 
copyRedistribute.getMessageID());
          copyRedistribute.setAddress(message.getAddress());
 
          RoutingContext context = new RoutingContextImpl(tx);
@@ -1400,6 +1405,20 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
 
          if (routed) {
             return new Pair<>(context, copyRedistribute);
+         } else {
+            // things have changed, we are not redistributing any more
+            if (copyRedistribute.isLargeMessage()) {
+               LargeServerMessage lsm = (LargeServerMessage) copyRedistribute;
+               postOfficeExecutor.execute(() -> {
+                  try {
+                     logger.debug("Removing large message {} since the routing 
tables have changed", lsm.getAppendFile());
+                     lsm.deleteFile();
+                  } catch (Exception e) {
+                     logger.warn("Error removing {}", copyRedistribute);
+                  }
+               });
+            }
+
          }
       }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index a1dae880d8..e4f80a7c49 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -217,10 +217,14 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
    }
 
    private void clearLargeMessage() {
+      if (currentLargeMessage != null) {
+         logger.debug("pending large message on session being removed {}", 
currentLargeMessage);
+      }
       synchronized (largeMessageLock) {
          if (currentLargeMessage != null) {
             try {
                currentLargeMessage.deleteFile();
+               logger.debug("Remove file {} after a failed session", 
currentLargeMessage.getAppendFile());
             } catch (Throwable error) {
                
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
             } finally {
@@ -1069,12 +1073,29 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
       // need to create the LargeMessage before continue
       long id = storageManager.generateID();
 
+      if (logger.isDebugEnabled()) {
+         logger.debug("initializing large message {}", id);
+      }
       LargeServerMessage largeMsg = storageManager.createLargeMessage(id, 
message);
 
       logger.trace("sendLarge::{}", largeMsg);
 
       if (currentLargeMessage != null) {
          
ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID());
+
+         // this shouldn't really happen.
+         // Adding this just in case
+         final LargeServerMessage replaced = currentLargeMessage;
+         callExecutor.execute(() -> {
+            try {
+               if (replaced != null) {
+                  logger.debug("Replaced failed being removed over interrupted 
send for message {}", replaced);
+                  replaced.deleteFile();
+               }
+            } catch (Exception e) {
+               logger.warn("Error removing currentLargeMessage {}", replaced);
+            }
+         });
       }
 
       currentLargeMessage = largeMsg;
@@ -1106,11 +1127,19 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
             LargeServerMessage message = currentLargeMessage;
             currentLargeMessage.setStorageManager(storageManager);
             currentLargeMessage = null;
+
+            logger.info("Sending {}", message.getMessageID());
             try {
                session.send(session.getCurrentTransaction(), 
EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), 
storageManager), false, producers.get(senderID), false);
+               logger.info("Sending finished on {}", message.getMessageID());
             } catch (Exception e) {
                message.deleteFile();
                throw e;
+            } catch (Throwable e) {
+               
logger.warn("********************************************************************************");
+               logger.warn("Throwable on currentLargeMessage {}", 
message.getMessageID(), e);
+               
logger.warn("********************************************************************************");
+
             }
          }
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 0b70c31caa..955b777d3e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server.cluster.impl;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.List;
 
@@ -32,9 +33,13 @@ import 
org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Redistributor implements Consumer {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    private boolean active;
 
    private final StorageManager storageManager;
@@ -113,6 +118,7 @@ public class Redistributor implements Consumer {
       final Pair<RoutingContext, Message> routingInfo = 
postOffice.redistribute(reference.getMessage(), queue, tx);
 
       if (routingInfo == null) {
+         logger.debug("postOffice.redistribute return null for message {}", 
reference);
          tx.rollback();
          return HandleStatus.BUSY;
       }
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
index fd58bb0eab..407b6ccc7a 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
@@ -118,27 +118,52 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
 
    @Test
    public void testLargeMessageAMQPTX() throws Throwable {
-      testInterrupt("AMQP", true);
+      testInterrupt("AMQP", true, false);
+   }
+
+   @Test
+   public void testLargeMessageAMQPTXKill() throws Throwable {
+      testInterrupt("AMQP", true, true);
    }
 
    @Test
    public void testInterruptAMQPNonTX() throws Throwable {
-      testInterrupt("AMQP", false);
+      testInterrupt("AMQP", false, false);
+   }
+
+   @Test
+   public void testInterruptAMQPNonTXKill() throws Throwable {
+      testInterrupt("AMQP", false, true);
    }
 
    @Test
    public void testInterruptCORETX() throws Throwable {
-      testInterrupt("CORE", true);
+      testInterrupt("CORE", true, false);
+   }
+
+   @Test
+   public void testInterruptCORETXKill() throws Throwable {
+      testInterrupt("CORE", true, true);
    }
 
    @Test
    public void testInterruptOPENWIRETX() throws Throwable {
-      testInterrupt("OPENWIRE", true);
+      testInterrupt("OPENWIRE", true, false);
+   }
+
+   @Test
+   public void testInterruptOPENWIRETXKill() throws Throwable {
+      testInterrupt("OPENWIRE", true, true);
    }
 
    @Test
    public void testInterruptCORENonTX() throws Throwable {
-      testInterrupt("CORE", false);
+      testInterrupt("CORE", false, false);
+   }
+
+   @Test
+   public void testInterruptCORENonTXKill() throws Throwable {
+      testInterrupt("CORE", false, true);
    }
 
    private CountDownLatch startSendingThreads(Executor executor, String 
protocol, int broker, int threads, boolean tx, String queueName) {
@@ -227,7 +252,7 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
 
    // this test has sleeps as the test will send while still active
    // we keep sending all the time.. so the testInterruptLM acts like a 
controller telling the threads when to stop
-   private void testInterrupt(String protocol, boolean tx) throws Throwable {
+   private void testInterrupt(String protocol, boolean tx, boolean useKill) 
throws Throwable {
       final int SENDING_THREADS = 10;
       final int CONSUMING_THREADS = 10;
       final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
@@ -242,7 +267,7 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
 
       Thread.sleep(2000);
 
-      serverProcess.destroyForcibly();
+      killProcess(serverProcess, useKill);
       runningSend = false;
       runningConsumer = false;
       Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
@@ -258,7 +283,7 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
       receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
 
-      serverProcess2.destroyForcibly();
+      killProcess(serverProcess2, useKill);
       Assert.assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
       runningSend = false;
       runningConsumer = false;
@@ -277,17 +302,17 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, 
queueName, queueName, RoutingType.ANYCAST, 5000);
       QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, 
queueName, queueName, RoutingType.ANYCAST, 5000);
 
-      Wait.assertEquals(0, queueControl1::getMessageCount);
-      Wait.assertEquals(0, queueControl2::getMessageCount);
+      File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
+      File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + 
"/data/large-messages");
+
+      Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && 
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && 
lmFolder2.listFiles().length == 0);
 
       runningConsumer = false;
       Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
 
-      File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
-      File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + 
"/data/large-messages");
-
-      Wait.assertEquals(0, () -> lmFolder.listFiles().length);
-      Wait.assertEquals(0, () -> lmFolder2.listFiles().length);
+      // no need to use wait here, the previous check should have checked that 
already
+      Assert.assertEquals(0, lmFolder.listFiles().length);
+      Assert.assertEquals(0, lmFolder2.listFiles().length);
       Assert.assertEquals(0, errors.get());
    }
 
@@ -301,6 +326,14 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
       testInterruptFailOnBridge("CORE", false);
    }
 
+   private void killProcess(Process process, boolean useKill) throws Exception 
{
+      if (useKill) {
+         Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
+      } else {
+         process.destroyForcibly();
+      }
+   }
+
 
    // this is a slight variation of testInterruptLM where I switch over 
consumers before killing the previous node
    // this is to force messages being redistributed and try to get the bridge 
to failure.
@@ -322,13 +355,13 @@ public class ClusteredLargeMessageInterruptTest extends 
SoakTestBase {
 
       runningSend = runningConsumer = false;
 
-      serverProcess.destroyForcibly();
+      killProcess(serverProcess, false);
       Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
       Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
 
       sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
       CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
-      serverProcess.destroyForcibly();
+      killProcess(serverProcess, false);
       Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
       serverProcess = startServer0();
 
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
index fb7f694fa5..d247d255f0 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
@@ -119,6 +119,11 @@ public class LargeMessageInterruptTest extends 
SoakTestBase {
       testInterruptLM("CORE", false, true);
    }
 
+   private void killProcess(Process process) throws Exception {
+      Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
+   }
+
+
    private void testInterruptLM(String protocol, boolean tx, boolean paging) 
throws Throwable {
       final int BODY_SIZE = 500 * 1024;
       final int NUMBER_OF_MESSAGES = 10; // this is per producer
@@ -213,7 +218,8 @@ public class LargeMessageInterruptTest extends SoakTestBase 
{
       }
 
       Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
-      serverProcess.destroyForcibly();
+      killProcess(serverProcess);
+      Assert.assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
       serverProcess = startServer(SERVER_NAME_0, 0, 0);
 
       Assert.assertTrue(done.await(60, TimeUnit.SECONDS));

Reply via email to