This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-6.2.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-6.2.x by this push:
new ecfd1336fa Improve the broker's handling of message corruption (#2136)
(#2143)
ecfd1336fa is described below
commit ecfd1336fa3f3f6116a78f0ab71d1b2f90bc717f
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Tue Jun 23 11:03:30 2026 -0400
Improve the broker's handling of message corruption (#2136) (#2143)
This commit improves the brokers handling of messages that are corrupt
and can't be read, such as unmarshaling the properties or body.
Currently if there is an error an IOException is triggered and can lead
to a client connection be closed. Furthermore for queues messages can be
stuck and no new messages can be delivered.
To improve things the following changes have been made:
* The MarshallingSupport utility that is used to unmarshal message
properties and bodies has improve validation to check for errors
such is incorrectly encoded size values.
* The broker will now handle message format errors both when messages
are evaluated to add to subscriptions and during dispatch to consumers
when the messages are already on a subscription.
* The Stomp protocol converter was fixed to not auto ack or track acks
until the message has been converted.
* AMQP no longer swallows message format errors and will throw so the
erors can be handled by the TransportConnection.
All of these changes allow the broker to deal with corrupt messages and
remove them (and possibly DLQ) vs causing the connections to close or
the messages to block consumers forever in the queue case.
(cherry picked from commit b20e61c7ef736a04ad07a934ea8de628550c6cf2)
---
.../transport/amqp/protocol/AmqpSender.java | 13 +-
.../transport/amqp/JMSInteroperabilityTest.java | 172 ++++++++-
.../activemq/broker/TransportConnection.java | 49 ++-
.../apache/activemq/broker/TransportConnector.java | 21 +-
.../broker/region/AbstractSubscription.java | 22 +-
.../org/apache/activemq/broker/region/Queue.java | 77 +++-
.../broker/region/QueueBrowserSubscription.java | 8 +
.../activemq/broker/region/QueueSubscription.java | 29 +-
.../org/apache/activemq/broker/region/Topic.java | 23 +-
.../auto/nio/AutoNIOSSLTransportServer.java | 3 +-
.../activemq/ActiveMQMessageFormatException.java | 26 ++
.../activemq/command/ActiveMQMapMessage.java | 2 +-
.../activemq/command/ActiveMQStreamMessage.java | 1 +
.../activemq/command/ActiveMQTextMessage.java | 2 +-
.../FrameSizeLimitedFilterInputStream.java | 3 +-
.../org/apache/activemq/util/ExceptionUtils.java | 96 +++++
.../apache/activemq/util/MarshallingSupport.java | 92 +++--
.../activemq/util/MarshallingSupportTest.java | 4 +-
.../apache/activemq/transport/mqtt/MQTTTest.java | 64 ++++
.../transport/stomp/StompSubscription.java | 30 +-
.../apache/activemq/transport/stomp/StompTest.java | 67 ++++
...ActiveMQMessageFormatExceptionSelectorTest.java | 401 +++++++++++++++++++++
.../activemq/command/ActiveMQMapMessageTest.java | 33 ++
.../activemq/command/ActiveMQMessageTest.java | 23 ++
.../command/ActiveMQObjectMessageTest.java | 30 ++
.../command/ActiveMQStreamMessageTest.java | 30 ++
.../activemq/command/ActiveMQTextMessageTest.java | 30 ++
27 files changed, 1261 insertions(+), 90 deletions(-)
diff --git
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 46846867ba..2fc8f4490f 100644
---
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.ActiveMQMessageFormatException;
import org.apache.activemq.broker.region.AbstractSubscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@@ -41,6 +42,7 @@ import
org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
import org.apache.activemq.transport.amqp.message.EncodedMessage;
import org.apache.activemq.transport.amqp.message.OutboundTransformer;
+import org.apache.activemq.util.ExceptionUtils;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
@@ -491,7 +493,16 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
}
}
} catch (Exception e) {
- LOG.warn("Error detected while flushing outbound messages:
{}", e.getMessage());
+ // Check if there is a format error trying to convert the
message. This error means the
+ // message can't be converted (corruption, etc). This will
wrap and throw the message
+ // so it can be handled by the transport
+ ActiveMQMessageFormatException formatError =
ExceptionUtils.createMessageFormatException(e);
+ if (formatError != null) {
+ LOG.warn("Message conversion error while flushing outbound
messages: {}", e.getMessage(), e);
+ throw formatError;
+ } else {
+ LOG.warn("Error detected while flushing outbound messages:
{}", e.getMessage());
+ }
}
}
}
diff --git
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
index 5e5670eac3..415632cab7 100644
---
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
+++
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -18,10 +18,12 @@ package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -30,20 +32,25 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
-import jakarta.jms.BytesMessage;
-import jakarta.jms.Connection;
-import jakarta.jms.Destination;
-import jakarta.jms.MapMessage;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.ObjectMessage;
-import jakarta.jms.Session;
-import jakarta.jms.TextMessage;
+import jakarta.jms.*;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.amqp.Binary;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -59,6 +66,7 @@ public class JMSInteroperabilityTest extends
JMSClientTestSupport {
protected static final Logger LOG =
LoggerFactory.getLogger(JMSInteroperabilityTest.class);
+ private final AtomicBoolean sentToDlq = new AtomicBoolean(false);
private final String transformer;
@Parameters(name="Transformer->{0}")
@@ -70,6 +78,13 @@ public class JMSInteroperabilityTest extends
JMSClientTestSupport {
});
}
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ sentToDlq.set(false);
+ }
+
public JMSInteroperabilityTest(String transformer) {
this.transformer = transformer;
}
@@ -84,7 +99,26 @@ public class JMSInteroperabilityTest extends
JMSClientTestSupport {
return transformer;
}
- //----- Tests for property handling between protocols
--------------------//
+ @Override
+ protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws
Exception {
+ super.addAdditionalPlugins(plugins);
+ plugins.add(new BrokerPluginSupport() {
+ @Override
+ public Broker installPlugin(Broker broker) {
+ return new BrokerFilter(broker) {
+ @Override
+ public boolean sendToDeadLetterQueue(ConnectionContext
context,
+ MessageReference
messageReference, Subscription subscription,
+ Throwable
poisonCause) {
+ sentToDlq.set(true);
+ return super.sendToDeadLetterQueue(context,
messageReference,
+ subscription, poisonCause);
+ }
+ };
+ }
+ });
+ }
+//----- Tests for property handling between protocols --------------------//
@SuppressWarnings("unchecked")
@Test(timeout = 60000)
@@ -487,6 +521,122 @@ public class JMSInteroperabilityTest extends
JMSClientTestSupport {
openwire.close();
}
+ // The following tests for corruption will corrupt the headers or body
+ // to test that the AMQP protocol correctly passes the error during
+ // dispatch to allow the Transport Connection to properly handle
+ // with a poison ack so the message will be removed from the subscription.
+ // No selectors are set so these messages are only going to error
+ // during the protocol conversion.
+
+ @Test
+ public void testCorruptMessageErrorHeaders() throws Exception {
+ testCorruptMessageError(session -> {
+ ActiveMQBytesMessage message = (ActiveMQBytesMessage)
session.createBytesMessage();
+ message.setStringProperty("testestt", "Testestt");
+ message.setStringProperty("prop2", "Testestt");
+ try {
+ message.beforeMarshall(null);
+
ByteSequenceData.writeIntBig(message.getMarshalledProperties(), 1024 * 1024);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return message;
+ }, false);
+ }
+
+ @Test
+ public void testCorruptMessageErrorMap() throws Exception {
+ testCorruptMessageError(session -> {
+ MapMessage message = session.createMapMessage();
+ message.setString("id", UUID.randomUUID().toString());
+ return message;
+ }, false);
+ }
+
+ // Check durable sub as well which is also a prefetch subscription so a
+ // poison ack will be handled the same way and DLQ
+ @Test
+ public void testCorruptMessageErrorMapDurableSub() throws Exception {
+ testCorruptMessageError(session -> {
+ MapMessage message = session.createMapMessage();
+ message.setString("id", UUID.randomUUID().toString());
+ return message;
+ }, true);
+ }
+
+ @Test
+ public void testCorruptMessageErrorText() throws Exception {
+ testCorruptMessageError(session -> {
+ TextMessage message = session.createTextMessage();
+ message.setText(UUID.randomUUID().toString());
+ return message;
+ }, false);
+ }
+
+ @Test
+ public void testCorruptMessageErrorTextDurableSub() throws Exception {
+ testCorruptMessageError(session -> {
+ TextMessage message = session.createTextMessage();
+ message.setText(UUID.randomUUID().toString());
+ return message;
+ }, true);
+ }
+
+
+ @Test
+ public void testCorruptMessageErrorStream() throws Exception {
+ testCorruptMessageError(session -> {
+ StreamMessage message = session.createStreamMessage();
+ message.writeBytes(UUID.randomUUID().toString().getBytes());
+ return message;
+ }, false);
+ }
+
+ private void testCorruptMessageError(MessageCreator messageCreator,
boolean topic) throws Exception {
+ // Raw Transformer doesn't expand the body
+ assumeFalse(!transformer.equals("jms"));
+
+ try (Connection openwire = createJMSConnection(); Connection amqp =
createConnection()) {
+ openwire.start();
+ amqp.start();
+ Session openwireSession = openwire.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Session amqpSession = amqp.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ ActiveMQDestination dest = topic ?
+ (ActiveMQDestination)
openwireSession.createTopic(getDestinationName()) :
+ (ActiveMQDestination)
openwireSession.createQueue(getDestinationName());
+ MessageProducer openwireProducer =
openwireSession.createProducer(dest);
+ MessageConsumer amqpConsumer = topic ?
+ amqpSession.createDurableSubscriber((Topic) dest, "sub") :
+ amqpSession.createConsumer(dest);
+
+ // Create and send the Message
+ ActiveMQMessage outgoing = (ActiveMQMessage)
messageCreator.create(openwireSession);
+ outgoing.storeContentAndClear();
+
+ // corrupt the buffer
+ // might be null if we are testing headers only
+ if (outgoing.getContent() != null && outgoing.getContent().length
> 0) {
+ ByteSequenceData.writeIntBig(outgoing.getContent(), 1000);
+ }
+
+ openwireProducer.send(outgoing);
+
+ // Now try to consume the Message, should not be received
+ Message received = amqpConsumer.receive(2000);
+ assertNull(received);
+
+ // verify message is gone off the dest and went to the DLQ
+ assertTrue(Wait.waitFor(() -> brokerService.getDestination(dest)
+ .getDestinationStatistics().getMessages().getCount() == 0,
500, 10));
+ assertTrue(sentToDlq.get());
+ }
+ }
+
+ private interface MessageCreator {
+ Message create(Session session) throws JMSException;
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testObjectMessageContainingList() throws Exception {
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index b6fe548857..b736008857 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -26,9 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -98,6 +96,8 @@ import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
+import org.apache.activemq.util.ExceptionUtils;
+import org.apache.activemq.ActiveMQMessageFormatException;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.NetworkBridgeUtils;
@@ -105,7 +105,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import jakarta.jms.ResourceAllocationException;
public class TransportConnection implements Connection, Task, CommandVisitor {
private static final Logger LOG =
LoggerFactory.getLogger(TransportConnection.class);
@@ -996,6 +995,18 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
if (sub != null) {
sub.onFailure();
}
+ // Check if this is a type of message format error which
indicates the
+ // message was corrupt and there was some problem
unmarshaling. For these
+ // errors we can handle by acking with a poison ack (which
will send to the DLQ
+ // if durable/queue sub) and remove them from the consumer so
the consumer can
+ // continue. We do not want to throw the exception as that
would close the connection.
+ ActiveMQMessageFormatException marshallingError =
ExceptionUtils.createMessageFormatException(e);
+ if (marshallingError != null) {
+ handleMessageFormatError(marshallingError,
messageDispatch);
+ // must set to null so when we return the finally block is
skipped
+ messageDispatch = null;
+ return;
+ }
messageDispatch = null;
throw e;
} else {
@@ -1015,6 +1026,38 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
}
}
+ private void handleMessageFormatError(ActiveMQMessageFormatException e,
MessageDispatch messageDispatch) {
+ if (TRANSPORTLOG.isDebugEnabled()) {
+ TRANSPORTLOG.debug("{} had an unexpected Message format error:
{}", this, e.getMessage(), e);
+ } else if (TRANSPORTLOG.isWarnEnabled()) {
+ if (connector.isDisplayStackTrace()) {
+ TRANSPORTLOG.warn("{} had an unexpected Message format
error", this, e);
+ } else {
+ TRANSPORTLOG.warn("{} had an unexpected Message format error:
{}", this, e.getMessage());
+ }
+ }
+
+ ConsumerBrokerExchange consumerExchange =
getConsumerBrokerExchange(messageDispatch.getConsumerId());
+ try {
+ // acknowledge with the consumer exchange for this dispatch
+ // This should exist because this error happened during dispatch,
but if for some
+ // reason it is null it should get handled when delivery is
attempted again
+ if (consumerExchange != null) {
+ MessageAck ack = new MessageAck();
+ // Acking with a poison ack will send to the DLQ
+ ack.setAckType(MessageAck.POISON_ACK_TYPE);
+ ack.setPoisonCause(e);
+ ack.setConsumerId(messageDispatch.getConsumerId());
+ ack.setDestination(messageDispatch.getDestination());
+ ack.setMessageID(messageDispatch.getMessage().getMessageId());
+ broker.acknowledge(consumerExchange, ack);
+ }
+ } catch (Exception ex) {
+ TRANSPORTLOG.warn("{} could not acknowledge and send message to
the DLQ after"
+ + " ActiveMQMessageFormatException: {}", this,
e.getMessage());
+ }
+ }
+
@Override
public boolean iterate() {
try {
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index 8cc6ee8869..3c42da77dd 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -42,6 +42,7 @@ import org.apache.activemq.transport.TransportFactorySupport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.apache.activemq.util.ExceptionUtils;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
@@ -257,10 +258,10 @@ public class TransportConnector implements Connector,
BrokerServiceAware {
private void onAcceptError(Exception error, String remoteHost) {
if (brokerService != null && brokerService.isStopping()) {
- LOG.info("Could not accept connection during shutdown {} :
{} ({})", (remoteHost == null ? "" : "from " + remoteHost),
error.getLocalizedMessage(), getRootCause(error).getMessage());
+ LOG.info("Could not accept connection during shutdown {} :
{} ({})", (remoteHost == null ? "" : "from " + remoteHost),
error.getLocalizedMessage(), ExceptionUtils.getRootCause(error).getMessage());
} else {
- LOG.warn("Could not accept connection {}: {} ({})",
(remoteHost == null ? "" : "from " + remoteHost), error.getMessage(),
getRootCause(error).getMessage());
- LOG.debug("Reason: " + error.getMessage(), error);
+ LOG.warn("Could not accept connection {}: {} ({})",
(remoteHost == null ? "" : "from " + remoteHost), error.getMessage(),
ExceptionUtils.getRootCause(error).getMessage());
+ LOG.debug("Reason: {}", error.getMessage(), error);
}
}
});
@@ -281,20 +282,6 @@ public class TransportConnector implements Connector,
BrokerServiceAware {
LOG.info("Connector {} started", getName());
}
- public static Throwable getRootCause(final Throwable throwable) {
- final List<Throwable> list = getThrowableList(throwable);
- return list.isEmpty() ? null : list.get(list.size() - 1);
- }
-
- static List<Throwable> getThrowableList(Throwable throwable) {
- final List<Throwable> list = new ArrayList<>();
- while (throwable != null && !list.contains(throwable)) {
- list.add(throwable);
- throwable = throwable.getCause();
- }
- return list;
- }
-
public String getPublishableConnectString() throws Exception {
String publishableConnectString =
publishedAddressPolicy.getPublishableConnectString(this);
LOG.debug("Publishing: {} for broker transport URI: {}",
publishableConnectString, getConnectUri());
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index f33060b464..ea7582c3eb 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -38,6 +38,7 @@ import org.apache.activemq.filter.LogicExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NoLocalExpression;
import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.ActiveMQMessageFormatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,13 +107,32 @@ public abstract class AbstractSubscription implements
Subscription {
}
}
try {
- return (selectorExpression == null ||
selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
+ return matchesSelector(node, context) &&
this.context.isAllowedToConsume(node);
} catch (JMSException e) {
LOG.info("Selector failed to evaluate: {}", e.getMessage(), e);
return false;
}
}
+ // This logic exists in a separate method so subscriptions can optionally
choose to
+ // handle any exception. Normally if an exception is thrown, the matches()
method
+ // that calls this will just log the error and return. This is correct for
browsers as
+ // the message gets skipped. It's also correct for topic/durable subs
because each
+ // sub independently will handle acking/removing if the message does not
match and
+ // will not block other subs. If there is no matching durable subs the
message gets
+ // removed from the store as well. However, for queue subscriptions, if
there is an error
+ // we need to handle ActiveMQMessageFormatException so we don't get stuck
in a loop
+ // because queues will keep trying to re-add the message to a sub on each
iteration.
+ protected boolean matchesSelector(MessageReference node,
MessageEvaluationContext context)
+ throws JMSException, ActiveMQMessageFormatException {
+ return evaluateSelectorExpression(context);
+ }
+
+ // move the original selector expression into its own method so we can
reference it
+ protected final boolean
evaluateSelectorExpression(MessageEvaluationContext context) throws
JMSException {
+ return selectorExpression == null ||
selectorExpression.matches(context);
+ }
+
@Override
public boolean isWildcard() {
return destinationFilter.isWildcard();
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 2e537db360..c81d8b8d22 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -31,6 +32,7 @@ import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -98,6 +100,7 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.BrokerSupport;
+import org.apache.activemq.ActiveMQMessageFormatException;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,6 +122,8 @@ public class Queue extends BaseDestination implements Task,
UsageListener, Index
private final PendingList pagedInMessages = new OrderedPendingList();
// Messages that are paged in but have not yet been targeted at a
subscription
private final ReentrantReadWriteLock pagedInPendingDispatchLock = new
ReentrantReadWriteLock();
+ // this is guarded by pagedInPendingDispatchLock
+ private final Map<QueueMessageReference, ActiveMQMessageFormatException>
dispatchMessageFormatErrors = new LinkedHashMap<>();
protected QueueDispatchPendingList dispatchPendingList = new
QueueDispatchPendingList();
private AtomicInteger pendingSends = new AtomicInteger(0);
private MessageGroupMap messageGroupOwners;
@@ -497,6 +502,8 @@ public class Queue extends BaseDestination implements Task,
UsageListener, Index
public void removeSubscription(ConnectionContext context, Subscription
sub, long lastDeliveredSequenceId)
throws Exception {
super.removeSubscription(context, sub, lastDeliveredSequenceId);
+
+ Map<QueueMessageReference, ActiveMQMessageFormatException>
messageFormatErrors = null;
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
pagedInPendingDispatchLock.writeLock().lock();
@@ -593,7 +600,7 @@ public class Queue extends BaseDestination implements Task,
UsageListener, Index
}
// AMQ-5107: don't resend if the broker is shutting down
if (dispatchPendingList.hasRedeliveries() && (!
this.brokerService.isStopping())) {
- doDispatch(new OrderedPendingList());
+ messageFormatErrors = doDispatch(new OrderedPendingList());
}
} finally {
consumersLock.writeLock().unlock();
@@ -610,6 +617,9 @@ public class Queue extends BaseDestination implements Task,
UsageListener, Index
// https://issues.apache.org/activemq/browse/AMQ-1878
wakeup();
}
+
+ // Remove any corrupt messages
+ removeMessageFormatErrorMessages(messageFormatErrors);
}
private volatile ResourceAllocationException sendMemAllocationException =
null;
@@ -1859,6 +1869,16 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
}
}
+ protected void removeAndSendToDlq(ConnectionContext c,
QueueMessageReference r, Exception e) throws IOException {
+ MessageAck ack = new MessageAck();
+ ack.setAckType(MessageAck.POISON_ACK_TYPE);
+ ack.setPoisonCause(e);
+ ack.setDestination(destination);
+ ack.setMessageID(r.getMessageId());
+ removeMessage(c, null, r, ack);
+ broker.getRoot().sendToDeadLetterQueue(c, r.getMessage(), null, e);
+ }
+
protected void removeMessage(ConnectionContext c, Subscription subs,
QueueMessageReference r) throws IOException {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
@@ -2166,9 +2186,11 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
return consumers.size() - browserSubscriptions.size() > 0;
}
- private void doDispatch(PendingList list) throws Exception {
+ private Map<QueueMessageReference, ActiveMQMessageFormatException>
doDispatch(PendingList list) throws Exception {
boolean doWakeUp = false;
+ Map<QueueMessageReference, ActiveMQMessageFormatException>
messageFormatErrors = null;
+
pagedInPendingDispatchLock.writeLock().lock();
try {
if (isPrioritizedMessages() && !dispatchPendingList.isEmpty() &&
list != null && !list.isEmpty()) {
@@ -2197,6 +2219,11 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
}
}
} finally {
+ // Copy the errors and clear as we need to proces outside the lock
+ if (!dispatchMessageFormatErrors.isEmpty()) {
+ messageFormatErrors = new
LinkedHashMap<>(dispatchMessageFormatErrors);
+ dispatchMessageFormatErrors.clear();
+ }
pagedInPendingDispatchLock.writeLock().unlock();
}
@@ -2204,6 +2231,8 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
// avoid lock order contention
asyncWakeup();
}
+
+ return messageFormatErrors;
}
/**
@@ -2211,6 +2240,7 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
* were not full.
*/
private PendingList doActualDispatch(PendingList list) throws Exception {
+
List<Subscription> consumers;
consumersLock.readLock().lock();
@@ -2236,12 +2266,27 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
}
if (!fullConsumers.contains(s)) {
if (!s.isFull()) {
- if (dispatchSelector.canSelect(s, node) &&
assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference)
node).isAcked() ) {
- // Dispatch it.
- s.add(node);
- LOG.trace("assigned {} to consumer {}",
node.getMessageId(), s.getConsumerInfo().getConsumerId());
+ try {
+ if (dispatchSelector.canSelect(s, node) &&
assignMessageGroup(s,
+ (QueueMessageReference) node)
+ && !((QueueMessageReference)
node).isAcked()) {
+ // Dispatch it.
+ s.add(node);
+ LOG.trace("assigned {} to consumer {}",
node.getMessageId(),
+ s.getConsumerInfo().getConsumerId());
+ iterator.remove();
+ target = s;
+ break;
+ }
+ } catch (ActiveMQMessageFormatException e) {
+ // A ActiveMQMessageFormatException could occur
when evaluating
+ // the selector which could trigger the properties
to unmarshal or the
+ // body to be read (for xpath selectors). This
should be rare but
+ // if it happens the message will just be stuck so
we need to remove it
+ // from the dispatched list and collect it to be
removed from this queue
+ // and sent to the DLQ
iterator.remove();
- target = s;
+
dispatchMessageFormatErrors.put((QueueMessageReference) node, e);
break;
}
} else {
@@ -2337,7 +2382,23 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
}
protected void pageInMessages(boolean force, int maxPageSize) throws
Exception {
- doDispatch(doPageInForDispatch(force, true, maxPageSize));
+ Map<QueueMessageReference, ActiveMQMessageFormatException>
messageFormatErrors =
+ doDispatch(doPageInForDispatch(force, true, maxPageSize));
+ // Handle outside the pagedInPendingDispatchLock
+ removeMessageFormatErrorMessages(messageFormatErrors);
+ }
+
+ // Any bad messages were already removed from dispatchPendingList and not
dispatched, so now we
+ // need to drop the message, remove it from pagedInMessages, remove from
the store and
+ // send to the DLQ
+ private void removeMessageFormatErrorMessages(Map<QueueMessageReference,
ActiveMQMessageFormatException> messageFormatErrors)
+ throws IOException {
+ if (messageFormatErrors != null) {
+ for (Entry<QueueMessageReference, ActiveMQMessageFormatException>
error : messageFormatErrors.entrySet()) {
+ removeAndSendToDlq(broker.getAdminConnectionContext(),
error.getKey(),
+ error.getValue());
+ }
+ }
}
private void addToConsumerList(Subscription sub) {
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
index fa7c66a1bf..4efffdec77 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
@@ -31,6 +31,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.MarshallingSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +83,13 @@ public class QueueBrowserSubscription extends
QueueSubscription {
return !browseDone && super.matches(node, context);
}
+ // Queue browsers can just delegate to the original method and throw a
JMSException which
+ // wil be caught and any message can be skipped if there is an error.
+ protected boolean matchesSelector(MessageReference node,
MessageEvaluationContext context)
+ throws JMSException {
+ return evaluateSelectorExpression(context);
+ }
+
/**
* Since we are a browser we don't really remove the message from the
queue.
*/
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index c7517d5c28..2e1ec45940 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -22,12 +22,12 @@ import jakarta.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.group.MessageGroupMap;
-import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ExceptionUtils;
+import org.apache.activemq.ActiveMQMessageFormatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,4 +100,27 @@ public class QueueSubscription extends
PrefetchSubscription implements LockOwner
return result;
}
+ // For queues if a message is in a bad state it could get stuck and will
block good
+ // messages from being processed. We don't want to handle all errors as
not all
+ // mean the message is bad, but if we specifically know the message is
corrupted
+ // then we should remove and DLQ as it may be stuck and not possible to
ever dispatch.
+ @Override
+ protected boolean matchesSelector(MessageReference node,
MessageEvaluationContext context)
+ throws JMSException, ActiveMQMessageFormatException {
+ try {
+ return super.matchesSelector(node, context);
+ } catch (JMSException e) {
+ // This may cause the headers to unmarshal which could throw
ActiveMQUnmarshalEOFException
+ // The body could also be unmarshaled if using XPATH and could
trigger a
+ // MessageDataFormat exception which this also handles.
+ ActiveMQMessageFormatException formatError =
ExceptionUtils.createMessageFormatException(e);
+ if (formatError != null) {
+ LOG.error("Message could not be read for selector evaluation:
{}", e.getMessage(), e);
+ throw formatError;
+ }
+ // it error is not an ActiveMQUnmarshalEOFException, just rethrow
the JMSException
+ // which will be caught and handled
+ throw e;
+ }
+ }
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index bb044c797a..33c49bc8ca 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+import org.apache.activemq.ActiveMQMessageFormatException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@@ -63,6 +64,7 @@ import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.ExceptionUtils;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -802,9 +804,26 @@ public class Topic extends BaseDestination implements Task
{
dispatchLock.readLock().lock();
try {
- if (!subscriptionRecoveryPolicy.add(context, message)) {
- return;
+ try {
+ if (!subscriptionRecoveryPolicy.add(context, message)) {
+ return;
+ }
+ } catch (Exception e) {
+ // In this case couldn't read the header properties so we need
to catch and continue.
+ // We still need to let dispatchPolicy.dispatch(message,
msgContext, consumers)
+ // run. If subs set a selector then they won't be matched if
it can't read
+ // the properites and that code will take care of any
removal/acks for durables
+ // by calling sub.unmatched(). If no subs match at all then
onMessageWithNoConsumers()
+ // will be called which allows sending an adivsory if enabled
(or if someone wanted to
+ // do something special like the DLQ).
+ ActiveMQMessageFormatException formatError =
ExceptionUtils.createMessageFormatException(e);
+ if (formatError != null) {
+ LOG.warn("Failed to check recovery policy, message is
corrupt: {}", e.getMessage(), e);
+ } else {
+ throw e;
+ }
}
+
synchronized (consumers) {
if (consumers.isEmpty()) {
onMessageWithNoConsumers(context, message);
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
index 6e53c76971..75491ef0fc 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
@@ -24,6 +24,7 @@ import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.ExceptionUtils;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
@@ -125,7 +126,7 @@ public class AutoNIOSSLTransportServer extends
AutoTcpTransportServer {
} catch (Exception error) {
LOG.warn("Could not accept connection {}: {} ({})",
(in.getRemoteAddress() == null ? "" : "from " +
in.getRemoteAddress()), error.getMessage(),
-
TransportConnector.getRootCause(error).getMessage());
+ ExceptionUtils.getRootCause(error).getMessage());
throw new IllegalStateException("Could not complete
Transport start", error);
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageFormatException.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageFormatException.java
new file mode 100644
index 0000000000..507ff7935b
--- /dev/null
+++
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageFormatException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+
+public class ActiveMQMessageFormatException extends IOException {
+
+ public ActiveMQMessageFormatException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
index a1065dac80..140431573f 100644
---
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
+++
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
@@ -189,7 +189,7 @@ public class ActiveMQMapMessage extends ActiveMQMessage
implements MapMessage {
if (content != null) {
InputStream is = new ByteArrayInputStream(content);
if (isCompressed()) {
- is = new InflaterInputStream(is);
+ is = MarshallingSupport.createInflaterInputStream(is);
}
DataInputStream dataIn = new DataInputStream(is);
map = MarshallingSupport.unmarshalPrimitiveMap(dataIn);
diff --git
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
index 648b448195..1cd0d70e97 100644
---
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
+++
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
@@ -1203,6 +1203,7 @@ public class ActiveMQStreamMessage extends
ActiveMQMessage implements StreamMess
if (isCompressed()) {
is = new InflaterInputStream(is);
is = new BufferedInputStream(is);
+ is =
MarshallingSupport.createFrameLimitedInputStream(Integer.MAX_VALUE, is);
}
this.dataIn = new DataInputStream(is);
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
index f0a529af9d..5e73c3314b 100644
---
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
+++
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
@@ -95,7 +95,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage
implements TextMessage
try {
is = new ByteArrayInputStream(bodyAsBytes);
if (isCompressed()) {
- is = new InflaterInputStream(is);
+ is = MarshallingSupport.createInflaterInputStream(is);
}
DataInputStream dataIn = new DataInputStream(is);
text = MarshallingSupport.readUTF8(dataIn);
diff --git
a/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
index f96b12427d..419d31561a 100644
---
a/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
+++
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
@@ -20,6 +20,7 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
+import
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
/**
* A filtered style input stream that allows reads up to a given known max
frame size
@@ -223,7 +224,7 @@ public class FrameSizeLimitedFilterInputStream extends
InputStream {
private static void validateAvailable(int requested, int available) throws
IOException {
if (requested > available) {
- throw new IOException(String.format(
+ throw new ActiveMQUnmarshalEOFException(String.format(
"Cannot read more than the max available %d bytes: requested
%d", available, requested));
}
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/util/ExceptionUtils.java
b/activemq-client/src/main/java/org/apache/activemq/util/ExceptionUtils.java
new file mode 100644
index 0000000000..07dae20427
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ExceptionUtils.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import jakarta.jms.JMSException;
+import jakarta.jms.MessageEOFException;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageFormatRuntimeException;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activemq.ActiveMQMessageFormatException;
+
+public class ExceptionUtils {
+
+
+ /**
+ * Creates a new ActiveMQMessageFormatException by wrapping the existing
throwable.
+ * This will only wrap the exception if the throwable contains a message
format
+ * error as the root cause
+ *
+ * @param error original exception
+ * @return ActiveMQMessageFormatException if a message format error, else
null
+ */
+ public static ActiveMQMessageFormatException
createMessageFormatException(Throwable error) {
+ if (error instanceof ActiveMQMessageFormatException) {
+ return (ActiveMQMessageFormatException) error;
+ } else if (containsMessageFormatError(error)) {
+ return new ActiveMQMessageFormatException(error);
+ }
+ return null;
+ }
+
+ /*
+ * Check if this throwable contains a message format error.
+ * This will check the root cause and any linked exceptions as well
+ * if the exception is a JMSException
+ */
+ private static boolean containsMessageFormatError(Throwable error) {
+ if (error == null) {
+ return false;
+ }
+
+ Throwable cause = ExceptionUtils.getRootCause(error);
+ return isMessageFormatError(cause) ||
+ error instanceof JMSException &&
isMessageFormatError(((JMSException) error).getLinkedException());
+ }
+
+ /*
+ * Checks if the error is considered an error with the format of the
message.
+ * This checks for the ActiveMQ custom ActiveMQUnmarshalEOFException that
+ * can be thrown by MarshallingSupport as well as JMS specific exceptions
+ * that indicated corruption/read problems such as MessageFormatException
+ * and MessageEOFException.
+ */
+ private static boolean isMessageFormatError(Throwable error) {
+ if (error == null) {
+ return false;
+ }
+
+ return error instanceof
MarshallingSupport.ActiveMQUnmarshalEOFException ||
+ error instanceof MessageFormatException ||
+ error instanceof MessageEOFException;
+ }
+
+ public static Throwable getRootCause(final Throwable throwable) {
+ if (throwable == null) {
+ return null;
+ }
+ final List<Throwable> list = getThrowableList(throwable);
+ return list.isEmpty() ? null : list.get(list.size() - 1);
+ }
+
+ static List<Throwable> getThrowableList(Throwable throwable) {
+ final List<Throwable> list = new ArrayList<>();
+ while (throwable != null && !list.contains(throwable)) {
+ list.add(throwable);
+ throwable = throwable.getCause();
+ }
+ return list;
+ }
+}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
b/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
index bf61f307c3..c7bc76f02f 100644
---
a/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
+++
b/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
@@ -16,11 +16,12 @@
*/
package org.apache.activemq.util;
-import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.UTFDataFormatException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -28,7 +29,11 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.zip.InflaterInputStream;
+import org.apache.activemq.transport.FrameSizeLimitedFilterInputStream;
import org.fusesource.hawtbuf.UTF8Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The fixed version of the UTF8 encoding function. Some older JVM's UTF8
@@ -36,6 +41,8 @@ import org.fusesource.hawtbuf.UTF8Buffer;
*/
public final class MarshallingSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(MarshallingSupport.class);
+
public static final byte NULL = 0;
public static final byte BOOLEAN_TYPE = 1;
public static final byte BYTE_TYPE = 2;
@@ -53,6 +60,15 @@ public final class MarshallingSupport {
private MarshallingSupport() {}
+ // TODO: This will be limited in a future PR to something besides
Integer.MAX_VALUE
+ public static InputStream createInflaterInputStream(InputStream is) {
+ return createFrameLimitedInputStream(Integer.MAX_VALUE, new
InflaterInputStream(is));
+ }
+
+ public static InputStream createFrameLimitedInputStream(int maxAvailable,
InputStream is) {
+ return new FrameSizeLimitedFilterInputStream(maxAvailable, is);
+ }
+
public static void marshalPrimitiveMap(Map<String, Object> map,
DataOutputStream out) throws IOException {
if (map == null) {
out.writeInt(-1);
@@ -67,7 +83,7 @@ public final class MarshallingSupport {
}
public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream
in) throws IOException {
- return unmarshalPrimitiveMap(in, Integer.MAX_VALUE, Integer.MAX_VALUE,
Integer.MAX_VALUE);
+ return unmarshalPrimitiveMap(in, Integer.MAX_VALUE, Integer.MAX_VALUE,
Byte.MAX_VALUE);
}
public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream
in, int maxPropertySize, int maxBufferSize, int maxDepth) throws IOException {
@@ -86,16 +102,23 @@ public final class MarshallingSupport {
validateDepth(maxDepth, currentDepth++);
int size = in.readInt();
- if (size > maxPropertySize) {
- throw new IOException("Primitive map is larger than the allowed
size: " + size);
- }
+ // validate not larger than configured max number of entries
+ validatePropertySize(size, maxPropertySize);
+
if (size < 0) {
return null;
} else {
- // Size here was already validated above
- Map<String, Object> rc = new HashMap<>(size);
+ // Limit the pre-allocate size of the map.
+ // The number of items was validated but still exceed total size
as we unmarshal
+ // (and max property size may not be set), so do a sanity check to
verify
+ // the number of items is less than or equal to the remaining
bytes.
+ validateBufferSize(in, maxBufferSize, size);
+
+ // As an extra precaution limit to no more than 128 initially
+ Map<String, Object> rc = new HashMap<>(Math.min(128, size));
for (int i = 0; i < size; i++) {
- String name = in.readUTF();
+ // validate key is less than max buffer size
+ String name = readUTF(in, maxBufferSize,
in.readUnsignedShort()).toString();
rc.put(name, unmarshalPrimitive(in, force, maxPropertySize,
maxBufferSize, maxDepth, currentDepth));
}
return rc;
@@ -119,8 +142,13 @@ public final class MarshallingSupport {
// increment after validation, so future calls get the incremented
depth
validateDepth(maxDepth, currentDepth++);
- int size = validateBufferSize(maxBufferSize, in.readInt());
- List<Object> answer = new ArrayList<>(size);
+ // Limit the pre-allocate size of the list.
+ // We could still exceed total size as we unmarshal, so do a sanity
check to verify
+ // the number of items is less than or equal to the remaining bytes
+ int size = validateBufferSize(in, maxBufferSize, in.readInt());
+
+ // As an extra precaution limit to no more than 128 initially
+ List<Object> answer = new ArrayList<>(Math.min(128, size));
while (size-- > 0) {
answer.add(unmarshalPrimitive(in, force, maxPropertySize,
maxBufferSize, maxDepth, currentDepth));
}
@@ -164,7 +192,7 @@ public final class MarshallingSupport {
}
public static Object unmarshalPrimitive(DataInputStream in) throws
IOException {
- return unmarshalPrimitive(in, false, Integer.MAX_VALUE,
Integer.MAX_VALUE, Integer.MAX_VALUE, 0);
+ return unmarshalPrimitive(in, false, Integer.MAX_VALUE,
Integer.MAX_VALUE, Byte.MAX_VALUE, 0);
}
private static Object unmarshalPrimitive(DataInputStream in, boolean
force, int maxPropertySize, int maxBufferSize,
@@ -197,15 +225,11 @@ public final class MarshallingSupport {
value = in.readDouble();
break;
case BYTE_ARRAY_TYPE:
- value = new byte[validateBufferSize(maxBufferSize, in.readInt())];
+ value = new byte[validateBufferSize(in, maxBufferSize,
in.readInt())];
in.readFully((byte[])value);
break;
case STRING_TYPE:
- if (force) {
- value = in.readUTF();
- } else {
- value = readUTF(in, maxBufferSize, in.readUnsignedShort());
- }
+ value = readUTF(in, maxBufferSize, in.readUnsignedShort());
break;
case BIG_STRING_TYPE: {
if (force) {
@@ -231,7 +255,7 @@ public final class MarshallingSupport {
}
public static UTF8Buffer readUTF(DataInputStream in, int maxLength, int
length) throws IOException {
- validateBufferSize(maxLength, length);
+ validateBufferSize(in, maxLength, length);
byte[] data = new byte[length];
in.readFully(data);
return new UTF8Buffer(data);
@@ -357,12 +381,12 @@ public final class MarshallingSupport {
return offset;
}
- public static String readUTF8(DataInput dataIn) throws IOException {
+ public static String readUTF8(DataInputStream dataIn) throws IOException {
return readUTF8(dataIn, Integer.MAX_VALUE);
}
- public static String readUTF8(DataInput dataIn, int maxBufferSize) throws
IOException {
- int utflen = validateBufferSize(maxBufferSize, dataIn.readInt());
+ static String readUTF8(DataInputStream dataIn, int maxBufferSize) throws
IOException {
+ int utflen = validateBufferSize(dataIn, maxBufferSize,
dataIn.readInt());
if (utflen > -1) {
byte bytearr[] = new byte[utflen];
char chararr[] = new char[utflen];
@@ -432,16 +456,34 @@ public final class MarshallingSupport {
return text;
}
- private static int validateBufferSize(int maxSize, int size) throws
IOException {
+ private static void validatePropertySize(int size, int maxPropertySize)
throws IOException {
+ if (size > maxPropertySize) {
+ throw new ActiveMQUnmarshalEOFException("Primitive map is larger
than the allowed size: " + size);
+ }
+ }
+
+ private static int validateBufferSize(DataInputStream stream, int maxSize,
int size) throws IOException {
+ // The size should never be more than what is greater to read
+ if (size > stream.available()) {
+ throw new ActiveMQUnmarshalEOFException("Read is greater than
remaining available data in the stream");
+ }
+
if (size > maxSize) {
- throw new IOException("Max buffer size: " + maxSize + " exceeded,
size: " + size);
+ throw new ActiveMQUnmarshalEOFException("Max buffer size: " +
maxSize + " exceeded, size: " + size);
}
return size;
}
- private static void validateDepth(int maxDepth, int currentDepth) throws
IOException {
+ private static void validateDepth(int maxDepth, int currentDepth) throws
EOFException {
if (currentDepth > maxDepth) {
- throw new IOException("Max unmarshaling depth: " + maxDepth + "
exceeded, depth: " + currentDepth);
+ throw new ActiveMQUnmarshalEOFException("Max unmarshaling depth: "
+ maxDepth + " exceeded, depth: " + currentDepth);
}
}
+
+ public static class ActiveMQUnmarshalEOFException extends EOFException {
+ public ActiveMQUnmarshalEOFException(String message) {
+ super(message);
+ }
+ }
+
}
diff --git
a/activemq-client/src/test/java/org/apache/activemq/util/MarshallingSupportTest.java
b/activemq-client/src/test/java/org/apache/activemq/util/MarshallingSupportTest.java
index 59a5e602d6..d9e778c8e5 100644
---
a/activemq-client/src/test/java/org/apache/activemq/util/MarshallingSupportTest.java
+++
b/activemq-client/src/test/java/org/apache/activemq/util/MarshallingSupportTest.java
@@ -79,8 +79,8 @@ public class MarshallingSupportTest {
// buffers too large
dataIn.reset();
- assertException(() -> unmarshalPrimitiveMap(dataIn, 100, 2, 10),
- "Max buffer size: 2 exceeded, size: 6");
+ assertException(() -> unmarshalPrimitiveMap(dataIn, 100, 4, 10),
+ "Max buffer size: 4 exceeded, size: 6");
// max depth violated
dataIn.reset();
diff --git
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 1bd7078e3e..7868cd4ce5 100644
---
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -59,8 +59,10 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import
org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.NioSslTestUtil;
import org.apache.activemq.util.Wait;
import org.fusesource.hawtdispatch.transport.SslTransport;
@@ -1122,6 +1124,68 @@ public class MQTTTest extends MQTTTestSupport {
activeMQConnection.close();
}
+ // The following test will corrupt a message and test the MQTT
+ // protocol correctly passes the error during
+ // dispatch to allow the Transport Connection to properly handle
+ // with a poison ack so the message will be removed from the subscription.
+ @Test
+ public void testCorruptHeaders() throws Exception {
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+
+ String destinationName = "foo.far";
+ ActiveMQConnection activeMQConnection = (ActiveMQConnection)
cf.createConnection();
+ activeMQConnection.start();
+ Session s = activeMQConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ jakarta.jms.Topic jmsTopic = s.createTopic(destinationName);
+ MessageProducer producer = s.createProducer(jmsTopic);
+
+ provider.subscribe("foo/+", AT_MOST_ONCE);
+ ActiveMQMessage sendMessage = (ActiveMQMessage)
s.createTextMessage("test");
+ sendMessage.setStringProperty("test", "Test");
+ // marshal and corrupt props
+ sendMessage.beforeMarshall(null);
+ ByteSequenceData.writeIntBig(sendMessage.getMarshalledProperties(),
1000);
+ producer.send(sendMessage);
+ assertNull("Should not get message", provider.receive(500));
+
+ provider.disconnect();
+ activeMQConnection.close();
+
+ // verify message is gone off the dest
+ assertTrue(Wait.waitFor(() -> brokerService.getDestination(new
ActiveMQTopic(destinationName))
+ .getDestinationStatistics().getMessages().getCount() == 0,
500, 10));
+ }
+
+ @Test
+ public void testCorruptBody() throws Exception {
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+
+ String destinationName = "foo.far";
+ ActiveMQConnection activeMQConnection = (ActiveMQConnection)
cf.createConnection();
+ activeMQConnection.start();
+ Session s = activeMQConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ jakarta.jms.Topic jmsTopic = s.createTopic(destinationName);
+ MessageProducer producer = s.createProducer(jmsTopic);
+
+ provider.subscribe("foo/+", AT_MOST_ONCE);
+ ActiveMQMessage sendMessage = (ActiveMQMessage)
s.createTextMessage("test");
+
+ // marshal and corrupt body
+ sendMessage.storeContentAndClear();
+ ByteSequenceData.writeIntBig(sendMessage.getContent(), 1000);
+ producer.send(sendMessage);
+ assertNull("Should not get message", provider.receive(500));
+
+ provider.disconnect();
+ activeMQConnection.close();
+
+ // verify message is gone off the dest
+ assertTrue(Wait.waitFor(() -> brokerService.getDestination(new
ActiveMQTopic(destinationName))
+ .getDestinationStatistics().getMessages().getCount() == 0,
500, 10));
+ }
+
@Test(timeout = 60 * 1000)
public void testPingKeepsInactivityMonitorAlive() throws Exception {
MQTT mqtt = createMQTTConnection();
diff --git
a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index 2fa9651194..92f034cf7a 100644
---
a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -74,6 +74,23 @@ public class StompSubscription {
void onMessageDispatch(MessageDispatch md) throws IOException,
JMSException {
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
+ boolean ignoreTransformation = false;
+
+ if (transformation != null && !( message instanceof
ActiveMQBytesMessage ) ) {
+ message.setReadOnlyProperties(false);
+ message.setStringProperty(Stomp.Headers.TRANSFORMATION,
transformation);
+ } else {
+ if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) !=
null) {
+ ignoreTransformation = true;
+ }
+ }
+
+ // This has been intentionally moved to happen before the acks are set
up and
+ // auto ack is done, which is line with all the other protocols.
+ StompFrame command = protocolConverter.convertMessage(message,
ignoreTransformation);
+
+ // Only configure the acks after protocol conversion. If there is an
error we don't want to
+ // track pending acks or auto ack as the message won't be dispatched
String ackId = null;
if (isClientAck() || isIndividualAck()) {
ackId = ACK_ID_GENERATOR.generateId();
@@ -90,19 +107,6 @@ public class StompSubscription {
protocolConverter.getStompTransport().sendToActiveMQ(ack);
}
- boolean ignoreTransformation = false;
-
- if (transformation != null && !( message instanceof
ActiveMQBytesMessage ) ) {
- message.setReadOnlyProperties(false);
- message.setStringProperty(Stomp.Headers.TRANSFORMATION,
transformation);
- } else {
- if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) !=
null) {
- ignoreTransformation = true;
- }
- }
-
- StompFrame command = protocolConverter.convertMessage(message,
ignoreTransformation);
-
command.setAction(Stomp.Responses.MESSAGE);
if (subscriptionId != null) {
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION,
subscriptionId);
diff --git
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index 7dcf0ef555..39469befec 100644
---
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -53,11 +54,18 @@ import jakarta.jms.TextMessage;
import javax.management.ObjectName;
import javax.net.ssl.SSLSocket;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.AbstractSubscription;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -68,6 +76,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.transport.stomp.Stomp.Commands;
import org.apache.activemq.transport.stomp.Stomp.Responses;
+import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.NioSslTestUtil;
import org.apache.activemq.util.Wait;
import org.junit.Assume;
@@ -118,6 +127,9 @@ public class StompTest extends StompTestSupport {
+ "]"
+ "}}";
+
+ final AtomicBoolean sentToDlq = new AtomicBoolean(false);
+
@Override
public void setUp() throws Exception {
// The order of the entries is different when using ibm jdk 5.
@@ -153,6 +165,7 @@ public class StompTest extends StompTestSupport {
xstream = new XStream();
xstream.processAnnotations(SamplePojo.class);
xstream.allowTypes(new Class[] { SamplePojo.class });
+ sentToDlq.set(false);
}
@Override
@@ -175,6 +188,26 @@ public class StompTest extends StompTestSupport {
}
}
+ @Override
+ protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws
Exception {
+ super.addAdditionalPlugins(plugins);
+ plugins.add(new BrokerPluginSupport() {
+ @Override
+ public Broker installPlugin(Broker broker) {
+ return new BrokerFilter(broker) {
+ @Override
+ public boolean sendToDeadLetterQueue(ConnectionContext
context,
+ MessageReference messageReference, Subscription
subscription,
+ Throwable poisonCause) {
+ sentToDlq.set(true);
+ return super.sendToDeadLetterQueue(context,
messageReference,
+ subscription, poisonCause);
+ }
+ };
+ }
+ });
+ }
+
public void sendMessage(String msg) throws Exception {
sendMessage(msg, "foo", "xyz");
}
@@ -308,6 +341,40 @@ public class StompTest extends StompTestSupport {
assertEquals(body, message.getBody());
}
+ // The following test will corrupt a message and test the Stomp
+ // protocol correctly passes the error during
+ // dispatch to allow the Transport Connection to properly handle
+ // with a poison ack so the message will be removed from the subscription.
+ @Test(timeout = 60000)
+ public void testCorruptMessage() throws Exception {
+ MessageProducer producer = session.createProducer(queue);
+ String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n"
+ Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n"
+ "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ ActiveMQTextMessage msg = (ActiveMQTextMessage)
session.createTextMessage("test");
+
+ // corrupt the buffer
+ msg.storeContentAndClear();
+ ByteSequenceData.writeIntBig(msg.getContent(), 1000);
+ producer.send(msg);
+
+ // Message should not be received because the UTF8 buffer is corrupt
+ try {
+ StompFrame frameNull = stompConnection.receive(500);
+ if (frameNull != null) {
+ fail("Should not have received any messages");
+ }
+ } catch (SocketTimeoutException ignored) {}
+
+ // Message should go to the DLQ
+ assertTrue(Wait.waitFor(() -> brokerService.getDestination(queue)
+ .getDestinationStatistics().getMessages().getCount() == 0,
500, 10));
+ assertTrue(sentToDlq.get());
+ }
+
@Test(timeout = 60000)
public void testJMSXGroupIdCanBeSet() throws Exception {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
new file mode 100644
index 0000000000..28b851c956
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueBrowser;
+import jakarta.jms.Session;
+import jakarta.jms.Topic;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Enumeration;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+public class ActiveMQMessageFormatExceptionSelectorTest {
+
+ @Rule
+ public TemporaryFolder dataFileDir = new TemporaryFolder(new
File("target"));
+
+ private URI clientUri;
+ private BrokerService brokerService;
+ private final AtomicInteger dlqCount = new AtomicInteger();
+
+ @Before
+ public void setUp() throws Exception {
+ dlqCount.set(0);
+ startBroker();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stopBroker();
+ }
+
+ // Test that queue browsers just skip corrupt messages as they are only
going to be
+ // DLQ'd and removed for normal queue consumers
+ @Test(timeout = 30000)
+ public void testUnmarshalQueueBrowseSubscription() throws Exception {
+ try (ActiveMQConnection connection = (ActiveMQConnection) new
ActiveMQConnectionFactory(
+ clientUri).createConnection()) {
+ connection.setClientID("client");
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue dest = session.createQueue("test.queue");
+ MessageProducer producer = session.createProducer(dest);
+ QueueBrowser browser = session.createBrowser(dest,
"stringProperty = 'a'");
+
+ // Mix good and bad messages to ensure the good ones are still
received
+ for (int i = 0; i < 100; i++) {
+ ActiveMQTextMessage message = (ActiveMQTextMessage)
session.createTextMessage("test-message");
+ message.setIntProperty("count", i);
+ message.setStringProperty("stringProperty", "a");
+
+ if (i % 5 == 0) {
+ message.beforeMarshall(null);
+
ByteSequenceData.writeIntBig(message.getMarshalledProperties(), 1024 * 1024);
+ }
+
+ producer.send(message);
+ }
+
+ Enumeration enumeration = browser.getEnumeration();
+ int i = 0;
+ while (enumeration.hasMoreElements()) {
+ // skip expected bad message
+ if (i % 5 == 0) {
+ i++;
+ }
+ jakarta.jms.Message message = (Message)
enumeration.nextElement();
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("count"));
+ i++;
+ }
+
+ Destination destination =
brokerService.getDestination((ActiveMQDestination) dest);
+ // browsers should NOT remove/move messages
+ assertEquals(100,
destination.getDestinationStatistics().getMessages().getCount());
+ assertTrue(destination.getMemoryUsage().getUsage() > 0);
+ assertEquals(0, dlqCount.get());
+ assertEquals(100, destination.getMessageStore().getMessageCount());
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testUnmarshalQueueSubscription() throws Exception {
+ testUnmarshalFail("test.queue", true, false);
+ // For queues, the message is already accepted onto the queue so
+ // if there is an error when trying to match for a consumer, and it is
+ // corrupted it makes sense to just remove with the first error and
+ // send to the DLQ
+ assertEquals(20, dlqCount.get());
+ }
+
+ @Test(timeout = 30000)
+ public void testUnmarshalTopicSubscription() throws Exception {
+ testUnmarshalFail("test.topic", false, false);
+ // for topic subscription on error it just won't match the selector
+ // and skips adding to the sub, but other subs may be able to receive
+ // as it evaluates each sub independently (not all subs may use a
selector)
+ // Messages won't be stuck and block other subscriptions so no need to
+ // do anything special and there is no DLQ because it was never even
added
+ // to the subscription
+ }
+
+ @Test(timeout = 30000)
+ public void testUnmarshalDurableSubscription() throws Exception {
+ testUnmarshalFail("test.topic", false, true);
+ // for durable subscription on error it just won't match the selector
+ // and skips adding to the sub, but other subs may be able to receive
+ // as it evaluates each sub independently (not all subs may use a
selector)
+ // Messages won't be stuck and block other subscriptions so no need to
+ // do anything special and there is no DLQ because it was never even
added
+ // to the subscription
+ }
+
+ // Test mixing subs with and without selectors
+ @Test(timeout = 30000)
+ public void testMultipleTopicSubs() throws Exception {
+ testMultipleSubs(false);
+ }
+
+ @Test(timeout = 30000)
+ public void testMultipleDurableSubs() throws Exception {
+ testMultipleSubs(true);
+ }
+
+ private void testMultipleSubs(boolean durable) throws Exception {
+ try (ActiveMQConnection connection = (ActiveMQConnection) new
ActiveMQConnectionFactory(
+ clientUri).createConnection()) {
+ connection.setClientID("client");
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQTopic dest = (ActiveMQTopic) session.createTopic("topic");
+ MessageProducer producer = session.createProducer(dest);
+ MessageConsumer selectorConsumer = durable ?
+ session.createDurableSubscriber((Topic) dest, "sub",
"stringProperty = 'a'", false) :
+ session.createConsumer(dest, "stringProperty = 'a'");
+ MessageConsumer consumer = durable ?
+ session.createDurableSubscriber((Topic) dest, "sub2") :
session.createConsumer(dest);
+
+ // Mix good and bad messages to ensure the good ones are still
received
+ for (int i = 0; i < 10; i++) {
+ ActiveMQTextMessage message = (ActiveMQTextMessage)
session.createTextMessage("test-message");
+ message.setIntProperty("count", i);
+ message.setStringProperty("stringProperty", "a");
+ message.beforeMarshall(null);
+ // corrupt
+
ByteSequenceData.writeIntBig(message.getMarshalledProperties(), 1024 * 1024);
+ producer.send(message);
+ }
+
+ // no selector should get all 10
+ for (int i = 0; i < 10; i++) {
+ jakarta.jms.Message message = consumer.receive(100);
+ assertNotNull(message);
+ }
+
+ // selector so should error and not get any
+ assertNull(selectorConsumer.receive(100));
+
+ // messages should be gone
+ Destination destination = brokerService.getDestination(dest);
+ assertTrue(Wait.waitFor(() ->
destination.getDestinationStatistics().getMessages().getCount() == 0,
+ 500, 10));
+ assertTrue(Wait.waitFor(
+ () -> destination.getMemoryUsage().getUsage() == 0, 1000,
100));
+ }
+ }
+
+ private void testUnmarshalFail(String destName, boolean queue, boolean
durable) throws Exception {
+ try (ActiveMQConnection connection = (ActiveMQConnection) new
ActiveMQConnectionFactory(
+ clientUri).createConnection()) {
+ connection.setClientID("client");
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination dest = queue ? (ActiveMQDestination)
session.createQueue(destName) :
+ (ActiveMQDestination) session.createTopic(destName);
+ MessageProducer producer = session.createProducer(dest);
+ MessageConsumer consumer = durable ?
+ session.createDurableSubscriber((Topic) dest, "sub",
"stringProperty = 'a'", false) :
+ session.createConsumer(dest, "stringProperty = 'a'");
+
+ // Mix good and bad messages to ensure the good ones are still
received
+ for (int i = 0; i < 100; i++) {
+ ActiveMQTextMessage message = (ActiveMQTextMessage)
session.createTextMessage("test-message");
+ message.setIntProperty("count", i);
+ message.setStringProperty("stringProperty", "a");
+
+ if (i % 5 == 0) {
+ message.beforeMarshall(null);
+
ByteSequenceData.writeIntBig(message.getMarshalledProperties(), 1024 * 1024);
+ }
+ producer.send(message);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ // skip expected bad message
+ if (i % 5 == 0) {
+ i++;
+ }
+ jakarta.jms.Message message = consumer.receive(100);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("count"));
+ }
+
+ Destination destination = brokerService.getDestination(dest);
+ assertTrue(Wait.waitFor(() ->
destination.getDestinationStatistics().getMessages().getCount() == 0,
+ 500, 10));
+ assertTrue(Wait.waitFor(
+ () -> destination.getMemoryUsage().getUsage() == 0, 1000,
100));
+ assertEquals(0, destination.getMessageStore().getMessageCount());
+ }
+ }
+
+ // test that the messages that are corrupt are still handled by the queue
+ // on restart and load when consumers come online. This is really only a
big concern for queues.
+ // For topic/durables subs the messages won't be added as they won't match
in the first place
+ // if corrupt so they should not be there on restart. If they are for some
reason
+ // (maybe the broker crashed during sub matching and didn't ack) then an
exception is logged and
+ // they get skipped and not loaded (this is the exact same behavior as
previously). Topic sub
+ // behavior on restart could look to be improved in a future change but is
an edge case
+ // and logging the error allows an admin to address if it does happen for
topics
+ @Test
+ public void testRestart() throws Exception {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(clientUri);
+ ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection();
+ connection.setClientID("client");
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination dest = (ActiveMQDestination)
session.createQueue("Test'") ;
+ MessageProducer producer = session.createProducer(dest);
+
+ // send 10 good and 10 bad
+ for (int i = 0; i < 20; i++) {
+ ActiveMQTextMessage message = (ActiveMQTextMessage)
session.createTextMessage("test-message");
+ message.setStringProperty("stringProperty", "a");
+ if (i % 2 == 0) {
+ message.beforeMarshall(null);
+
ByteSequenceData.writeIntBig(message.getMarshalledProperties(), 1024 * 1024);
+ }
+ producer.send(message);
+ }
+ connection.close();
+ // All 20 messages should be persisted as no consumers yet
+ Destination destination = brokerService.getDestination(dest);
+ assertEquals(20, destination.getMessageStore().getMessageCount());
+ // stop and restart broker to verify it loads all the messages
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ startBroker();
+
+ // bring the consumer online which should trigger the errors and DLQ
+ factory = new ActiveMQConnectionFactory(clientUri);
+ connection = (ActiveMQConnection) factory.createConnection();
+ connection.setClientID("client");
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(dest,
"stringProperty = 'a'");
+
+ // we should get 10
+ for (int i = 0; i < 10; i++) {
+ jakarta.jms.Message message = consumer.receive(100);
+ assertNotNull(message);
+ }
+ // should only get 10
+ assertNull(consumer.receive(100));
+ Destination regionDest = brokerService.getDestination(dest);
+ // makes sure messages are gone
+ assertTrue(Wait.waitFor(() ->
regionDest.getDestinationStatistics().getMessages().getCount() == 0,
+ 500, 10));
+ assertTrue(Wait.waitFor(
+ () -> regionDest.getMemoryUsage().getUsage() == 0, 1000, 100));
+ assertEquals(0, regionDest.getMessageStore().getMessageCount());
+ }
+
+ // Xpath selector will trigger the body to unmarshal, make sure that is
handled as well
+ // The queue should detect that error when trying to add to the consumer
and DLQ
+ @Test
+ public void testXpath() throws Exception {
+ try (ActiveMQConnection connection = (ActiveMQConnection) new
ActiveMQConnectionFactory(
+ clientUri).createConnection()) {
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination dest = (ActiveMQDestination)
session.createQueue("test.queue");
+ MessageProducer producer = session.createProducer(dest);
+ MessageConsumer consumer = session.createConsumer(dest, "XPATH
'//books//book[@lang=''en'']'");
+
+ ActiveMQTextMessage message = (ActiveMQTextMessage)
session.createTextMessage(
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?><books><book
lang=\"en\">ABC</book></books>");
+ message.storeContentAndClear();
+ ByteSequenceData.writeIntBig(message.getContent(), 1024 * 1024);
+ producer.send(message);
+
+ assertNull(consumer.receive(100));
+
+ Destination destination = brokerService.getDestination(dest);
+ assertTrue(Wait.waitFor(() ->
destination.getDestinationStatistics().getMessages().getCount() == 0,
+ 500, 10));
+ assertTrue(Wait.waitFor(
+ () -> destination.getMemoryUsage().getUsage() == 0, 1000,
100));
+ assertEquals(0, destination.getMessageStore().getMessageCount());
+ }
+ }
+
+
+ protected void startBroker() throws Exception {
+ brokerService = new BrokerService();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setSendAdvisoryIfNoConsumers(true);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ brokerService.setDestinationPolicy(pMap);
+ brokerService.setPersistent(true);
+ brokerService.setDataDirectoryFile(dataFileDir.getRoot());
+ KahaDBStore store = new KahaDBStore();
+ store.setDirectory(dataFileDir.getRoot());
+
store.setJournalDiskSyncStrategy(Journal.JournalDiskSyncStrategy.NEVER.name());
+ brokerService.setPersistenceAdapter(store);
+ brokerService.setUseJmx(false);
+ brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
+ @Override
+ public Broker installPlugin(Broker broker) {
+ return new BrokerFilter(broker) {
+ @Override
+ public boolean sendToDeadLetterQueue(ConnectionContext
context,
+ MessageReference messageReference, Subscription
subscription,
+ Throwable poisonCause) {
+ dlqCount.getAndIncrement();
+ return super.sendToDeadLetterQueue(context,
messageReference,
+ subscription, poisonCause);
+ }
+ };
+ }
+ }});
+
+ TransportConnector connector =
brokerService.addConnector("nio://localhost:0");
+ brokerService.start();
+ brokerService.waitUntilStarted();
+
+ clientUri = connector.getPublishableConnectURI();
+ }
+
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ brokerService = null;
+ }
+ }
+}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
index 8ecfcfb69c..4eb5234b69 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
@@ -33,6 +35,10 @@ import jakarta.jms.MessageFormatException;
import jakarta.jms.MessageNotReadableException;
import jakarta.jms.MessageNotWriteableException;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.util.ByteSequenceData;
+import
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -468,4 +474,31 @@ public class ActiveMQMapMessageTest {
msg.getShort("short");
msg.getString("string");
}
+
+ @Test
+ public void testUnmarshalException() throws Exception {
+ ActiveMQConnection connection = mock(ActiveMQConnection.class);
+
+ ActiveMQMapMessage msg = new ActiveMQMapMessage();
+ msg.setConnection(connection);
+ msg.setString("test", "test");
+
+ // store and marshal
+ msg.storeContentAndClear();
+ assertTrue(msg.map.isEmpty());
+
+ // corrupt the buffer
+ ByteSequenceData.writeIntBig(msg.content, 1000);
+
+ try {
+ // trigger unmarshalling the map
+ msg.getString("test");
+ fail("Should have thrown exception");
+ } catch (JMSException e) {
+ // expected
+ assertTrue(
+ ExceptionUtils.getRootCause(e) instanceof
ActiveMQUnmarshalEOFException);
+ }
+ }
+
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
index badd57a8df..83ecc4ac3a 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
@@ -31,6 +31,8 @@ import junit.framework.TestCase;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
+import
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -989,4 +991,25 @@ public class ActiveMQMessageTest extends TestCase {
msg.setJMSExpiration(System.currentTimeMillis() + 10000);
assertFalse(msg.isExpired());
}
+
+ public void testUnmarshalPropertiesException() throws Exception {
+ ActiveMQMessage msg = new ActiveMQMessage();
+ msg.setProperty("test", "test");
+
+ // marshal properties and clear unmarshaled state
+ msg.beforeMarshall(null);
+ msg.clearUnMarshalledState();
+ assertNull(msg.properties);
+
+ // corrupt the buffer
+ ByteSequenceData.writeIntBig(msg.marshalledProperties, 100000);
+
+ try {
+ // this will trigger unmarshalling
+ msg.getProperty("test");
+ fail("Should have thrown exception");
+ } catch (ActiveMQUnmarshalEOFException e) {
+ // expected
+ }
+ }
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
index b52f99e7f7..f898acd50e 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.command;
+
+import static org.mockito.Mockito.mock;
+
import java.io.IOException;
import jakarta.jms.JMSException;
@@ -23,6 +26,9 @@ import jakarta.jms.MessageNotReadableException;
import jakarta.jms.MessageNotWriteableException;
import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.commons.lang.exception.ExceptionUtils;
/**
*
@@ -123,4 +129,28 @@ public class ActiveMQObjectMessageTest extends TestCase {
}
}
+ public void testUnCompressedException() throws Exception {
+ ActiveMQConnection connection = mock(ActiveMQConnection.class);
+
+ ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
+ msg.setConnection(connection);
+ msg.setObject("test");
+
+ // store and marshal
+ msg.storeContentAndClear();
+ assertNull(msg.object);
+
+ // corrupt the buffer
+ ByteSequenceData.writeIntBig(msg.content, 1000);
+
+ try {
+ // trigger unmarshalling the object
+ msg.getObject();
+ fail("Should have thrown exception");
+ } catch (JMSException e) {
+ // uncompressed will have an error from the JDK deserialization
+ assertTrue(ExceptionUtils.getRootCause(e) instanceof IOException);
+ }
+ }
+
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
index 9022b5ecf1..1aa4eedeaf 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
@@ -18,8 +18,10 @@ package org.apache.activemq.command;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import jakarta.jms.JMSException;
import jakarta.jms.MessageEOFException;
@@ -27,6 +29,10 @@ import jakarta.jms.MessageFormatException;
import jakarta.jms.MessageNotReadableException;
import jakarta.jms.MessageNotWriteableException;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.util.ByteSequenceData;
+import
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.junit.Test;
/**
@@ -1072,4 +1078,28 @@ public class ActiveMQStreamMessageTest {
message.readBoolean();
} catch (MessageEOFException ex) {}
}
+
+ @Test
+ public void testUnmarshalException() throws Exception {
+ ActiveMQConnection connection = mock(ActiveMQConnection.class);
+
+ ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
+ msg.setConnection(connection);
+ msg.writeBytes("Test".getBytes());
+
+ // store and marshal
+ msg.reset();
+ assertNull(msg.dataOut);
+
+ // corrupt the buffer
+ ByteSequenceData.writeIntBig(msg.content, 1000000);
+
+ try {
+ msg.readBytes(new byte[1024]);
+ fail("Should have thrown exception");
+ } catch (JMSException e) {
+ // expected
+ assertTrue(e instanceof MessageFormatException);
+ }
+ }
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
index 48c1befbe4..f2825f0ba4 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.command;
+import static org.mockito.Mockito.mock;
+
import java.beans.Transient;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -28,9 +30,13 @@ import jakarta.jms.MessageNotWriteableException;
import junit.framework.TestCase;
import junit.textui.TestRunner;
+import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.MarshallingSupport;
+import
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
+import org.apache.commons.lang.exception.ExceptionUtils;
/**
*
@@ -155,6 +161,30 @@ public class ActiveMQTextMessageTest extends TestCase {
Method method =
ActiveMQTextMessage.class.getMethod("getRegionDestination");
assertTrue(method.isAnnotationPresent(Transient.class));
}
+
+ public void testUnUnmarshalException() throws Exception {
+ ActiveMQConnection connection = mock(ActiveMQConnection.class);
+
+ ActiveMQTextMessage msg = new ActiveMQTextMessage();
+ msg.setConnection(connection);
+ msg.setText("content");
+
+ // store and marshal
+ msg.storeContentAndClear();
+ assertNull(msg.text);
+
+ // corrupt the buffer
+ ByteSequenceData.writeIntBig(msg.content, 1000);
+
+ try {
+ msg.getText();
+ fail("Should have thrown exception");
+ } catch (JMSException e) {
+ // expected
+ assertTrue(
+ ExceptionUtils.getRootCause(e) instanceof
ActiveMQUnmarshalEOFException);
+ }
+ }
protected void setContent(Message message, String text) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact