This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 3a5601572e ARTEMIS-4207 Redistribution could leave messages stranded
in the folder
3a5601572e is described below
commit 3a5601572e63dfa9299625f72e1a9a1fe12d1a7d
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 15 10:33:10 2023 -0400
ARTEMIS-4207 Redistribution could leave messages stranded in the folder
- redistribute received the handle call, it then copies the message
- the routing table changes
- the message is left behind
With the new version of the server these messages will be removed. But we
should remove these right away
---
.../impl/journal/JournalStorageManager.java | 3 +
.../impl/journal/LargeServerMessageImpl.java | 9 ++-
.../core/postoffice/impl/PostOfficeImpl.java | 21 ++++++-
.../protocol/core/ServerSessionPacketHandler.java | 29 ++++++++++
.../core/server/cluster/impl/Redistributor.java | 6 ++
.../ClusteredLargeMessageInterruptTest.java | 67 ++++++++++++++++------
.../interruptlm/LargeMessageInterruptTest.java | 8 ++-
7 files changed, 123 insertions(+), 20 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 678af8ee6c..336a1820df 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -507,6 +507,9 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
@Override
public LargeServerMessage createLargeMessage(final long id, final Message
message) throws Exception {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Initializing large message {}", id, new
Exception("trace"));
+ }
try (ArtemisCloseable lock = closeableReadLock()) {
if (isReplicated()) {
replicator.largeMessageBegin(id);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 262db1bb8c..02a54e234c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -72,7 +72,11 @@ public final class LargeServerMessageImpl extends
CoreMessage implements CoreLar
private static Message asLargeMessage(Message message, StorageManager
storageManager) throws Exception {
ICoreMessage coreMessage = message.toCore();
- LargeServerMessage lsm =
storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
+ long id = storageManager.generateID();
+ if (logger.isDebugEnabled()) {
+ logger.debug("asLargeMessage create largeMessage with id={}", id);
+ }
+ LargeServerMessage lsm = storageManager.createLargeMessage(id,
coreMessage);
ActiveMQBuffer messageBodyBuffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = messageBodyBuffer.readableBytes();
@@ -306,6 +310,9 @@ public final class LargeServerMessageImpl extends
CoreMessage implements CoreLar
@Override
public Message copy(final long newID) {
try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Copy large message id={} as newID={}",
this.getMessageID(), newID);
+ }
LargeServerMessage newMessage =
storageManager.createLargeMessage(newID, this);
largeBody.copyInto(newMessage);
newMessage.releaseResources(true, true);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 50b995491d..c5b08c01b7 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -121,6 +121,8 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
public static final SimpleString BRIDGE_CACHE_STR = new
SimpleString("BRIDGE.");
+ private final Executor postOfficeExecutor;
+
private final AddressManager addressManager;
private final QueueFactory queueFactory;
@@ -192,6 +194,8 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
this.addressSettingsRepository = addressSettingsRepository;
+ this.postOfficeExecutor = server.getExecutorFactory().getExecutor();
+
this.server = server;
}
@@ -1391,7 +1395,8 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
// We have to copy the message and store it separately, otherwise we
may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
- Message copyRedistribute = message.copy(storageManager.generateID());
+ final Message copyRedistribute =
message.copy(storageManager.generateID());
+ logger.info("Message {} being copied as {}", message.getMessageID(),
copyRedistribute.getMessageID());
copyRedistribute.setAddress(message.getAddress());
RoutingContext context = new RoutingContextImpl(tx);
@@ -1400,6 +1405,20 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
if (routed) {
return new Pair<>(context, copyRedistribute);
+ } else {
+ // things have changed, we are not redistributing any more
+ if (copyRedistribute.isLargeMessage()) {
+ LargeServerMessage lsm = (LargeServerMessage) copyRedistribute;
+ postOfficeExecutor.execute(() -> {
+ try {
+ logger.debug("Removing large message {} since the routing
tables have changed", lsm.getAppendFile());
+ lsm.deleteFile();
+ } catch (Exception e) {
+ logger.warn("Error removing {}", copyRedistribute);
+ }
+ });
+ }
+
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index a1dae880d8..e4f80a7c49 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -217,10 +217,14 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
}
private void clearLargeMessage() {
+ if (currentLargeMessage != null) {
+ logger.debug("pending large message on session being removed {}",
currentLargeMessage);
+ }
synchronized (largeMessageLock) {
if (currentLargeMessage != null) {
try {
currentLargeMessage.deleteFile();
+ logger.debug("Remove file {} after a failed session",
currentLargeMessage.getAppendFile());
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
} finally {
@@ -1069,12 +1073,29 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
// need to create the LargeMessage before continue
long id = storageManager.generateID();
+ if (logger.isDebugEnabled()) {
+ logger.debug("initializing large message {}", id);
+ }
LargeServerMessage largeMsg = storageManager.createLargeMessage(id,
message);
logger.trace("sendLarge::{}", largeMsg);
if (currentLargeMessage != null) {
ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID());
+
+ // this shouldn't really happen.
+ // Adding this just in case
+ final LargeServerMessage replaced = currentLargeMessage;
+ callExecutor.execute(() -> {
+ try {
+ if (replaced != null) {
+ logger.debug("Replaced failed being removed over interrupted
send for message {}", replaced);
+ replaced.deleteFile();
+ }
+ } catch (Exception e) {
+ logger.warn("Error removing currentLargeMessage {}", replaced);
+ }
+ });
}
currentLargeMessage = largeMsg;
@@ -1106,11 +1127,19 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
LargeServerMessage message = currentLargeMessage;
currentLargeMessage.setStorageManager(storageManager);
currentLargeMessage = null;
+
+ logger.info("Sending {}", message.getMessageID());
try {
session.send(session.getCurrentTransaction(),
EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(),
storageManager), false, producers.get(senderID), false);
+ logger.info("Sending finished on {}", message.getMessageID());
} catch (Exception e) {
message.deleteFile();
throw e;
+ } catch (Throwable e) {
+
logger.warn("********************************************************************************");
+ logger.warn("Throwable on currentLargeMessage {}",
message.getMessageID(), e);
+
logger.warn("********************************************************************************");
+
}
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 0b70c31caa..955b777d3e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.cluster.impl;
+import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.List;
@@ -32,9 +33,13 @@ import
org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Redistributor implements Consumer {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private boolean active;
private final StorageManager storageManager;
@@ -113,6 +118,7 @@ public class Redistributor implements Consumer {
final Pair<RoutingContext, Message> routingInfo =
postOffice.redistribute(reference.getMessage(), queue, tx);
if (routingInfo == null) {
+ logger.debug("postOffice.redistribute return null for message {}",
reference);
tx.rollback();
return HandleStatus.BUSY;
}
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
index fd58bb0eab..407b6ccc7a 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
@@ -118,27 +118,52 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
@Test
public void testLargeMessageAMQPTX() throws Throwable {
- testInterrupt("AMQP", true);
+ testInterrupt("AMQP", true, false);
+ }
+
+ @Test
+ public void testLargeMessageAMQPTXKill() throws Throwable {
+ testInterrupt("AMQP", true, true);
}
@Test
public void testInterruptAMQPNonTX() throws Throwable {
- testInterrupt("AMQP", false);
+ testInterrupt("AMQP", false, false);
+ }
+
+ @Test
+ public void testInterruptAMQPNonTXKill() throws Throwable {
+ testInterrupt("AMQP", false, true);
}
@Test
public void testInterruptCORETX() throws Throwable {
- testInterrupt("CORE", true);
+ testInterrupt("CORE", true, false);
+ }
+
+ @Test
+ public void testInterruptCORETXKill() throws Throwable {
+ testInterrupt("CORE", true, true);
}
@Test
public void testInterruptOPENWIRETX() throws Throwable {
- testInterrupt("OPENWIRE", true);
+ testInterrupt("OPENWIRE", true, false);
+ }
+
+ @Test
+ public void testInterruptOPENWIRETXKill() throws Throwable {
+ testInterrupt("OPENWIRE", true, true);
}
@Test
public void testInterruptCORENonTX() throws Throwable {
- testInterrupt("CORE", false);
+ testInterrupt("CORE", false, false);
+ }
+
+ @Test
+ public void testInterruptCORENonTXKill() throws Throwable {
+ testInterrupt("CORE", false, true);
}
private CountDownLatch startSendingThreads(Executor executor, String
protocol, int broker, int threads, boolean tx, String queueName) {
@@ -227,7 +252,7 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
// this test has sleeps as the test will send while still active
// we keep sending all the time.. so the testInterruptLM acts like a
controller telling the threads when to stop
- private void testInterrupt(String protocol, boolean tx) throws Throwable {
+ private void testInterrupt(String protocol, boolean tx, boolean useKill)
throws Throwable {
final int SENDING_THREADS = 10;
final int CONSUMING_THREADS = 10;
final AtomicInteger errors = new AtomicInteger(0); // I don't expect
many errors since this test is disconnecting and reconnecting the server
@@ -242,7 +267,7 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
Thread.sleep(2000);
- serverProcess.destroyForcibly();
+ killProcess(serverProcess, useKill);
runningSend = false;
runningConsumer = false;
Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
@@ -258,7 +283,7 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
- serverProcess2.destroyForcibly();
+ killProcess(serverProcess2, useKill);
Assert.assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
runningSend = false;
runningConsumer = false;
@@ -277,17 +302,17 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
QueueControl queueControl1 = getQueueControl(server1URI, builderServer1,
queueName, queueName, RoutingType.ANYCAST, 5000);
QueueControl queueControl2 = getQueueControl(server2URI, builderServer2,
queueName, queueName, RoutingType.ANYCAST, 5000);
- Wait.assertEquals(0, queueControl1::getMessageCount);
- Wait.assertEquals(0, queueControl2::getMessageCount);
+ File lmFolder = new File(getServerLocation(SERVER_NAME_0) +
"/data/large-messages");
+ File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) +
"/data/large-messages");
+
+ Wait.waitFor(() -> queueControl1.getMessageCount() == 0 &&
queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 &&
lmFolder2.listFiles().length == 0);
runningConsumer = false;
Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
- File lmFolder = new File(getServerLocation(SERVER_NAME_0) +
"/data/large-messages");
- File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) +
"/data/large-messages");
-
- Wait.assertEquals(0, () -> lmFolder.listFiles().length);
- Wait.assertEquals(0, () -> lmFolder2.listFiles().length);
+ // no need to use wait here, the previous check should have checked that
already
+ Assert.assertEquals(0, lmFolder.listFiles().length);
+ Assert.assertEquals(0, lmFolder2.listFiles().length);
Assert.assertEquals(0, errors.get());
}
@@ -301,6 +326,14 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
testInterruptFailOnBridge("CORE", false);
}
+ private void killProcess(Process process, boolean useKill) throws Exception
{
+ if (useKill) {
+ Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
+ } else {
+ process.destroyForcibly();
+ }
+ }
+
// this is a slight variation of testInterruptLM where I switch over
consumers before killing the previous node
// this is to force messages being redistributed and try to get the bridge
to failure.
@@ -322,13 +355,13 @@ public class ClusteredLargeMessageInterruptTest extends
SoakTestBase {
runningSend = runningConsumer = false;
- serverProcess.destroyForcibly();
+ killProcess(serverProcess, false);
Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 1, CONSUMING_THREADS, tx, queueName);
- serverProcess.destroyForcibly();
+ killProcess(serverProcess, false);
Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
serverProcess = startServer0();
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
index fb7f694fa5..d247d255f0 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
@@ -119,6 +119,11 @@ public class LargeMessageInterruptTest extends
SoakTestBase {
testInterruptLM("CORE", false, true);
}
+ private void killProcess(Process process) throws Exception {
+ Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
+ }
+
+
private void testInterruptLM(String protocol, boolean tx, boolean paging)
throws Throwable {
final int BODY_SIZE = 500 * 1024;
final int NUMBER_OF_MESSAGES = 10; // this is per producer
@@ -213,7 +218,8 @@ public class LargeMessageInterruptTest extends SoakTestBase
{
}
Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
- serverProcess.destroyForcibly();
+ killProcess(serverProcess);
+ Assert.assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
serverProcess = startServer(SERVER_NAME_0, 0, 0);
Assert.assertTrue(done.await(60, TimeUnit.SECONDS));