This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch activemq-5.18.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.18.x by this push:
new 6cc7658b94 [AMQ-9455] DestinationPolicy support for
MessageInterceptorStrategy
6cc7658b94 is described below
commit 6cc7658b941249c89463b4d570738ff574635315
Author: Matt Pavlovich <[email protected]>
AuthorDate: Sun Mar 17 19:32:53 2024 -0500
[AMQ-9455] DestinationPolicy support for MessageInterceptorStrategy
(cherry picked from commit c465330be5e66fac40e7c73a74fd0cad9c5333f0)
---
.../activemq/broker/region/BaseDestination.java | 11 +-
.../org/apache/activemq/broker/region/Queue.java | 11 +
.../org/apache/activemq/broker/region/Topic.java | 10 +
.../policy/ChainMessageInterceptorStrategy.java | 51 ++++
.../policy/HeaderMessageInterceptorStrategy.java | 158 +++++++++++
.../region/policy/MessageInterceptorStrategy.java | 48 ++++
.../activemq/broker/region/policy/PolicyEntry.java | 11 +
.../MessageInterceptorStrategyMemoryUsageTest.java | 168 ++++++++++++
.../policy/MessageInterceptorStrategyTest.java | 291 +++++++++++++++++++++
9 files changed, 758 insertions(+), 1 deletion(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 06088da57e..a7d2cd738d 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -28,6 +28,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.MessageInterceptorStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
@@ -35,7 +36,6 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
@@ -99,6 +99,7 @@ public abstract class BaseDestination implements Destination {
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
protected int cursorMemoryHighWaterMark = 70;
protected int storeUsageHighWaterMark = 100;
+ private MessageInterceptorStrategy messageInterceptorStrategy;
private SlowConsumerStrategy slowConsumerStrategy;
private boolean prioritizedMessages;
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
@@ -942,4 +943,12 @@ public abstract class BaseDestination implements
Destination {
public SystemUsage getSystemUsage() {
return systemUsage;
}
+
+ public MessageInterceptorStrategy getMessageInterceptorStrategy() {
+ return this.messageInterceptorStrategy;
+ }
+
+ public void setMessageInterceptorStrategy(MessageInterceptorStrategy
messageInterceptorStrategy) {
+ this.messageInterceptorStrategy = messageInterceptorStrategy;
+ }
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 66fa2cf397..1e4c1c45ec 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -49,6 +49,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageFormatRuntimeException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerStoppedException;
@@ -625,6 +627,15 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
final ProducerInfo producerInfo =
producerExchange.getProducerState().getInfo();
final boolean sendProducerAck = !message.isResponseRequired() &&
producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
+
+ if(getMessageInterceptorStrategy() != null) {
+ try {
+ getMessageInterceptorStrategy().process(producerExchange,
message);
+ } catch (MessageFormatRuntimeException e) {
+ throw new MessageFormatException(e.getMessage(),
e.getErrorCode());
+ }
+ }
+
if (message.isExpired()) {
// message not stored - or added to stats yet - so chuck here
broker.getRoot().messageExpired(context, message, null);
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 2cca4c0bd2..8903b91b50 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -64,6 +64,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageFormatRuntimeException;
import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
@@ -371,6 +373,14 @@ public class Topic extends BaseDestination implements Task
{
message.setRegionDestination(this);
+ if(getMessageInterceptorStrategy() != null) {
+ try {
+ getMessageInterceptorStrategy().process(producerExchange,
message);
+ } catch (MessageFormatRuntimeException e) {
+ throw new MessageFormatException(e.getMessage(),
e.getErrorCode());
+ }
+ }
+
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if (message.isExpired()) {
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ChainMessageInterceptorStrategy.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ChainMessageInterceptorStrategy.java
new file mode 100644
index 0000000000..2ab83e8c2b
--- /dev/null
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ChainMessageInterceptorStrategy.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import java.util.Arrays;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.Message;
+
+import javax.jms.MessageFormatRuntimeException;
+
+/**
+ * Configurable chain of MessageInterceptorStrategies
+ *
+ * @org.apache.xbean.XBean
+ */
+public class ChainMessageInterceptorStrategy implements
MessageInterceptorStrategy {
+
+ private MessageInterceptorStrategy[] messageInterceptorStrategies;
+
+ @Override
+ public void process(ProducerBrokerExchange producerBrokerExchange, Message
message) throws MessageFormatRuntimeException {
+ if(messageInterceptorStrategies == null ||
messageInterceptorStrategies.length == 0) {
+ return;
+ }
+
+ Arrays.stream(messageInterceptorStrategies).forEach(m ->
m.process(producerBrokerExchange, message));
+ }
+
+ public void setMessageStrategies(MessageInterceptorStrategy[]
messageInterceptorStrategies) {
+ this.messageInterceptorStrategies = messageInterceptorStrategies;
+ }
+
+ public MessageInterceptorStrategy[] getMessageInterceptorStrategies() {
+ return this.messageInterceptorStrategies;
+ }
+}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/HeaderMessageInterceptorStrategy.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/HeaderMessageInterceptorStrategy.java
new file mode 100644
index 0000000000..353a51bbe1
--- /dev/null
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/HeaderMessageInterceptorStrategy.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.MessageFormatRuntimeException;
+
+/**
+ * Enforce message policies for JMS Header values
+ *
+ * @org.apache.xbean.XBean
+ */
+public class HeaderMessageInterceptorStrategy implements
MessageInterceptorStrategy {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HeaderMessageInterceptorStrategy.class);
+
+ boolean forceDeliveryMode = false;
+
+ boolean persistent = true;
+
+ boolean forceExpiration = false;
+
+ /**
+ * variable which (when non-zero) is used to override
+ * the expiration date for messages that arrive with
+ * no expiration date set (in Milliseconds).
+ */
+ long zeroExpirationOverride = 0;
+
+ /**
+ * variable which (when non-zero) is used to limit
+ * the expiration date (in Milliseconds).
+ */
+ long expirationCeiling = 0;
+
+ /**
+ * If true, the plugin will not update timestamp to past values
+ * False by default
+ */
+ boolean futureOnly = false;
+
+ /**
+ * if true, update timestamp even if message has passed through a network
+ * default false
+ */
+ boolean processNetworkMessages = false;
+
+ /**
+ * setter method for zeroExpirationOverride
+ */
+ public void setZeroExpirationOverride(long ttl)
+ {
+ this.zeroExpirationOverride = ttl;
+ }
+
+ /**
+ * setter method for expirationCeiling
+ */
+ public void setExpirationCeiling(long expirationCeiling)
+ {
+ this.expirationCeiling = expirationCeiling;
+ }
+
+ public void setFutureOnly(boolean futureOnly) {
+ this.futureOnly = futureOnly;
+ }
+
+ public void setProcessNetworkMessages(Boolean processNetworkMessages) {
+ this.processNetworkMessages = processNetworkMessages;
+ }
+
+ @Override
+ public void process(final ProducerBrokerExchange producerBrokerExchange,
final Message message) throws MessageFormatRuntimeException {
+ if(!isProcessNetworkMessages() &&
producerBrokerExchange.getConnectionContext().isNetworkConnection()) {
+ // Message passed through a network and
processNetworkMessages=true is not set
+ return;
+ }
+
+ if(isForceExpiration()) {
+ if (message.getTimestamp() > 0 &&
!message.getDestination().isDLQ()) {
+ long oldExpiration = message.getExpiration();
+ long newTimeStamp = System.currentTimeMillis();
+ long timeToLive = zeroExpirationOverride;
+ long oldTimestamp = message.getTimestamp();
+ if (oldExpiration > 0) {
+ timeToLive = oldExpiration - oldTimestamp;
+ }
+ if (timeToLive > 0 && expirationCeiling > 0 && timeToLive >
expirationCeiling) {
+ timeToLive = expirationCeiling;
+ }
+ long expiration = timeToLive + newTimeStamp;
+ // In the scenario that the Broker is behind the clients we
never want to set the
+ // Timestamp and Expiration in the past
+ if(!futureOnly || (expiration > oldExpiration)) {
+ if (timeToLive > 0 && expiration > 0) {
+ message.setExpiration(expiration);
+ }
+ message.setTimestamp(newTimeStamp);
+ LOG.debug("Set message {} timestamp from {} to {}",
message.getMessageId(), oldTimestamp, newTimeStamp);
+ }
+ }
+ }
+
+ if(forceDeliveryMode) {
+ message.setPersistent(isPersistent());
+ }
+ }
+
+ public void setForceDeliveryMode(boolean forceDeliveryMode) {
+ this.forceDeliveryMode = forceDeliveryMode;
+ }
+
+ public boolean isForceDeliveryMode() {
+ return this.forceDeliveryMode;
+ }
+
+ public void setForceExpiration(boolean forceExpiration) {
+ this.forceExpiration = forceExpiration;
+ }
+
+ public boolean isForceExpiration() {
+ return this.forceExpiration;
+ }
+
+ public void setPersistent(boolean persistent) {
+ this.persistent = persistent;
+ }
+
+ public boolean isPersistent() {
+ return this.persistent;
+ }
+
+ public void setProcessNetworkMessages(boolean processNetworkMessages) {
+ this.processNetworkMessages = processNetworkMessages;
+ }
+
+ public boolean isProcessNetworkMessages() {
+ return this.processNetworkMessages;
+ }
+}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageInterceptorStrategy.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageInterceptorStrategy.java
new file mode 100644
index 0000000000..d581978c80
--- /dev/null
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageInterceptorStrategy.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.Message;
+import javax.jms.MessageFormatRuntimeException;
+
+public interface MessageInterceptorStrategy {
+
+ /**
+ * When a PolicyEntry is configured with a MessageInterceptorStrategy, the
+ * process method is invoked with the current ProducerBrokerExchange and
Message before
+ * the message is stored in any destination cache or persistence store.
+ *
+ * Implementations may reference data from the ProducerBrokerExchange and
may check or
+ * modify headers, properties, body or other metadata on the Message.
+ *
+ * Any change to the message must adhere to OpenWire and ActiveMQ
requirements or risk
+ * issues with memory usage, compatibility, and general correct
functioning.
+ *
+ * Implementations shall not copy, or clone the message.
+ *
+ * Implementations may throw a <tt>MessageFormatRuntimeException</tt>
+ * that is returned to the client to indicate a message should not be
added to the queue.
+ *
+ * @param producerBrokerExchange
+ * @param message
+ * @return
+ * @throws MessageFormatRuntimeException
+ */
+ void process(final ProducerBrokerExchange producerBrokerExchange, final
Message message) throws MessageFormatRuntimeException;
+
+}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 51b20e30e1..7230957022 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -115,6 +115,7 @@ public class PolicyEntry extends DestinationMapEntry {
private int sendFailIfNoSpace = -1;
private long sendFailIfNoSpaceAfterTimeout = -1;
+ private MessageInterceptorStrategy messageInterceptorStrategy = null;
public void configure(Broker broker,Queue queue) {
baseConfiguration(broker,queue);
@@ -139,6 +140,7 @@ public class PolicyEntry extends DestinationMapEntry {
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
+ queue.setMessageInterceptorStrategy(getMessageInterceptorStrategy());
}
public void update(Queue queue) {
@@ -201,6 +203,7 @@ public class PolicyEntry extends DestinationMapEntry {
topic.getMemoryUsage().setLimit(memoryLimit);
}
topic.setLazyDispatch(isLazyDispatch());
+ topic.setMessageInterceptorStrategy(getMessageInterceptorStrategy());
}
public void update(Topic topic) {
@@ -1165,4 +1168,12 @@ public class PolicyEntry extends DestinationMapEntry {
public void setUseTopicSubscriptionInflightStats(boolean
useTopicSubscriptionInflightStats) {
this.useTopicSubscriptionInflightStats =
useTopicSubscriptionInflightStats;
}
+
+ public void setMessageInterceptorStrategy(MessageInterceptorStrategy
messageInterceptorStrategy) {
+ this.messageInterceptorStrategy = messageInterceptorStrategy;
+ }
+
+ public MessageInterceptorStrategy getMessageInterceptorStrategy() {
+ return this.messageInterceptorStrategy;
+ }
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java
new file mode 100644
index 0000000000..a87ecc6622
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.io.File;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.MessageInterceptorStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * This unit test is to test that MessageInterceptorStrategy features
+ *
+ */
+public class MessageInterceptorStrategyMemoryUsageTest extends TestSupport {
+
+ BrokerService broker;
+ ConnectionFactory factory;
+ Connection connection;
+ Session session;
+ MessageProducer producer;
+ QueueBrowser queueBrowser;
+
+ @Before
+ public void setUp() throws Exception {
+ broker = new BrokerService();
+
+ File testDataDir = new
File("target/activemq-data/message-interceptor-strategy");
+ broker.setDataDirectoryFile(testDataDir);
+ broker.setUseJmx(true);
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
+ broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ broker.addConnector("tcp://localhost:0");
+ broker.start();
+ factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
+ .get(0).getConnectUri().toString());
+ connection = factory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if(producer != null) {
+ producer.close();
+ }
+ session.close();
+ connection.stop();
+ connection.close();
+ broker.stop();
+ }
+
+ /**
+ * Test sending messages that have body modified have correct usage
+ *
+ * Start with 10x 1k message bodies that get increased to 1mb
+ */
+ @Test
+ public void testMemoryUsageBodyIncrease() throws Exception {
+ applyHeaderMessageInterceptor(1*1024*1024);
+ String queueName = "mis.bodySize.increase";
+ Queue queue = createQueue(queueName);
+
+ for (int i=0; i<10; i++) {
+ BytesMessage sendMessageP = session.createBytesMessage();
+ byte[] origBody = new byte[1*1024];
+ sendMessageP.writeBytes(origBody);
+ producer.send(queue, sendMessageP);
+ }
+
+ QueueViewMBean queueViewMBean = getProxyToQueue(queueName);
+ assertEquals(Long.valueOf(10_496_000l),
Long.valueOf(queueViewMBean.getMemoryUsageByteCount()));
+ }
+
+ /**
+ * Test sending messages that have body modified have correct usage
+ *
+ * Start with 10x 1mb message bodies that get decreased to 1kb
+ */
+ @Test
+ public void testMemoryUsageBodyDecrease() throws Exception {
+ applyHeaderMessageInterceptor(1*1024);
+ String queueName = "mis.bodySize.decrease";
+ Queue queue = createQueue(queueName);
+
+ for (int i=0; i<10; i++) {
+ BytesMessage sendMessageP = session.createBytesMessage();
+ byte[] origBody = new byte[1*1024*1024];
+ sendMessageP.writeBytes(origBody);
+ producer.send(queue, sendMessageP);
+ }
+
+ QueueViewMBean queueViewMBean = getProxyToQueue(queueName);
+ assertEquals(Long.valueOf(20_480),
Long.valueOf(queueViewMBean.getMemoryUsageByteCount()));
+ }
+
+ private PolicyMap applyHeaderMessageInterceptor(final int bodySize) {
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+
+ MessageInterceptorStrategy bodySizeMessageInterceptorStrategy= new
MessageInterceptorStrategy() {
+
+ @Override
+ public void process(ProducerBrokerExchange producerBrokerExchange,
org.apache.activemq.command.Message message) throws
MessageFormatRuntimeException {
+ if(bodySize > 0) {
+ try {
+ message.clearBody();
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ byte[] newBody = new byte[bodySize];
+ new Random().nextBytes(newBody);
+ message.setContent(new ByteSequence(newBody));
+ message.storeContent();
+ }
+ }
+ };
+
defaultEntry.setMessageInterceptorStrategy(bodySizeMessageInterceptorStrategy);
+
+ policyMap.setDefaultEntry(defaultEntry);
+ broker.setDestinationPolicy(policyMap);
+ return policyMap;
+ }
+
+ private Queue createQueue(String queueName) throws Exception {
+ Queue queue = session.createQueue(queueName);
+ producer = session.createProducer(queue);
+ return queue;
+ }
+
+}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java
new file mode 100644
index 0000000000..5729119b05
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Enumeration;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import
org.apache.activemq.broker.region.policy.HeaderMessageInterceptorStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.test.TestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * This unit test is to test that MessageInterceptorStrategy features
+ *
+ */
+public class MessageInterceptorStrategyTest extends TestSupport {
+
+ BrokerService broker;
+ ConnectionFactory factory;
+ Connection connection;
+ Session session;
+ MessageProducer producer;
+ QueueBrowser queueBrowser;
+
+ @Before
+ public void setUp() throws Exception {
+ broker = new BrokerService();
+
+ File testDataDir = new
File("target/activemq-data/message-interceptor-strategy");
+ broker.setDataDirectoryFile(testDataDir);
+ broker.setUseJmx(true);
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
+ broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ broker.addConnector("tcp://localhost:0");
+ broker.start();
+ factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
+ .get(0).getConnectUri().toString());
+ connection = factory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if(producer != null) {
+ producer.close();
+ }
+ session.close();
+ connection.stop();
+ connection.close();
+ broker.stop();
+ }
+
+ /**
+ * Test sending messages can be forced to Persistent
+ */
+ @Test
+ public void testForceDeliveryModePersistent() throws Exception {
+ applyHeaderMessageInterceptor(true, true, false, 0l, Long.MAX_VALUE);
+
+ Queue queue = createQueue("mis.forceDeliveryMode.true");
+ Message sendMessageP =
session.createTextMessage("forceDeliveryMode=true");
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producer.send(queue, sendMessageP);
+
+ Message sendMessageNP =
session.createTextMessage("forceDeliveryMode=true");
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producer.send(queue, sendMessageNP);
+
+ queueBrowser = session.createBrowser(queue);
+ Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+ int count = 0;
+ while(browseEnumeration.hasMoreElements()) {
+ Message message = (Message)browseEnumeration.nextElement();
+ assertEquals(DeliveryMode.PERSISTENT,
message.getJMSDeliveryMode());
+ count++;
+ }
+ assertEquals(Integer.valueOf(2), Integer.valueOf(count));
+ }
+
+ /**
+ * Test sending messages can be forced to NonPersistent
+ */
+ @Test
+ public void testForceDeliveryModeNonPersistent() throws Exception {
+ applyHeaderMessageInterceptor(true, false, false, 0l, Long.MAX_VALUE);
+
+ Queue queue = createQueue("mis.forceDeliveryMode.false");
+ Message sendMessageP =
session.createTextMessage("forceDeliveryMode=false");
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producer.send(queue, sendMessageP);
+
+ Message sendMessageNP =
session.createTextMessage("forceDeliveryMode=false");
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producer.send(queue, sendMessageNP);
+
+ queueBrowser = session.createBrowser(queue);
+ Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+ int count = 0;
+ while(browseEnumeration.hasMoreElements()) {
+ Message message = (Message)browseEnumeration.nextElement();
+ assertEquals(DeliveryMode.NON_PERSISTENT,
message.getJMSDeliveryMode());
+ count++;
+ }
+ assertEquals(Integer.valueOf(2), Integer.valueOf(count));
+ }
+
+ /**
+ * Test not overriding expiration
+ */
+ @Test
+ public void testForceExpirationDisabled() throws Exception {
+ applyHeaderMessageInterceptor(false, false, false, 100_000l,
Long.MAX_VALUE);
+
+ Queue queue = createQueue("mis.forceExpiration.zero");
+ Message sendMessageP = session.createTextMessage("expiration=zero");
+ producer.setTimeToLive(0l);
+ producer.send(queue, sendMessageP);
+
+ queueBrowser = session.createBrowser(queue);
+ Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+ int count = 0;
+ while(browseEnumeration.hasMoreElements()) {
+ Message message = (Message)browseEnumeration.nextElement();
+ assertEquals(Long.valueOf(0l),
Long.valueOf(message.getJMSExpiration()));
+ count++;
+ }
+ assertEquals(Integer.valueOf(1), Integer.valueOf(count));
+ }
+
+ /**
+ * Test overriding zero (0) expiration
+ */
+ @Test
+ public void testForceExpirationZeroOverride() throws Exception {
+ long expiryTime = 100_000l;
+ applyHeaderMessageInterceptor(false, false, true, expiryTime,
Long.MAX_VALUE);
+
+ long currentTime = System.currentTimeMillis();
+ Queue queue = createQueue("mis.forceExpiration.100k");
+ Message sendMessageP = session.createTextMessage("expiration=zero");
+ producer.setTimeToLive(100_000l);
+ producer.send(queue, sendMessageP);
+
+ queueBrowser = session.createBrowser(queue);
+ Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+ int count = 0;
+ while(browseEnumeration.hasMoreElements()) {
+ Message message = (Message)browseEnumeration.nextElement();
+ assertTrue(Long.valueOf(message.getJMSExpiration()) > currentTime
+ (expiryTime / 2));
+ count++;
+ }
+ assertEquals(Integer.valueOf(1), Integer.valueOf(count));
+ }
+
+ /**
+ * Test overriding zero (0) expiration
+ */
+ @Test
+ public void testForceExpirationZeroOverrideDLQ() throws Exception {
+ long expiryTime = 1l;
+ applyHeaderMessageInterceptor(false, false, true, expiryTime,
Long.MAX_VALUE);
+
+ Queue queue = createQueue("mis.forceExpiration.zero-no-dlq-expiry");
+ Message sendMessageP =
session.createTextMessage("expiration=zero-no-dlq-expiry");
+ producer.send(queue, sendMessageP);
+
+ Thread.sleep(250l);
+
+ queueBrowser = session.createBrowser(queue);
+ Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+ int count = 0;
+ while(browseEnumeration.hasMoreElements()) {
+ count++;
+ }
+ assertEquals(Integer.valueOf(0), Integer.valueOf(count));
+
+ QueueBrowser dlqQueueBrowser =
session.createBrowser(createQueue("mis.forceExpiration.zero-no-dlq-expiry.dlq"));
+ Enumeration<?> dlqBrowseEnumeration = dlqQueueBrowser.getEnumeration();
+
+ int dlqCount = 0;
+ while(dlqBrowseEnumeration.hasMoreElements()) {
+ Message dlqMessage = (Message)dlqBrowseEnumeration.nextElement();
+ assertEquals(sendMessageP.getJMSMessageID(),
dlqMessage.getJMSMessageID());
+ assertEquals("Expiration should be zero" +
dlqMessage.getJMSExpiration() + "\n", dlqMessage.getJMSExpiration(), 0);
+ dlqCount++;
+ }
+ assertEquals(Integer.valueOf(1), Integer.valueOf(dlqCount));
+ }
+
+ /**
+ * Test overriding expiration ceiling
+ */
+ @Test
+ public void testForceExpirationCeilingOverride() throws Exception {
+ long zeroOverrideExpiryTime = 100_000l;
+ long expirationCeiling = Duration.ofDays(1).toMillis();
+ applyHeaderMessageInterceptor(false, false, true,
zeroOverrideExpiryTime, expirationCeiling);
+
+ long currentTime = System.currentTimeMillis();
+ long expiryTime = Duration.ofDays(10).toMillis();
+ Queue queue = createQueue("mis.forceExpiration.maxValue");
+ Message sendMessageP = session.createTextMessage("expiration=ceiling");
+ producer.setTimeToLive(expiryTime);
+ producer.send(queue, sendMessageP);
+
+ queueBrowser = session.createBrowser(queue);
+ Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+ int count = 0;
+ while(browseEnumeration.hasMoreElements()) {
+ Message message = (Message)browseEnumeration.nextElement();
+ assertTrue(Long.valueOf(message.getJMSExpiration()) <
(currentTime + Duration.ofDays(9).toMillis()));
+ count++;
+ }
+ assertEquals(Integer.valueOf(1), Integer.valueOf(count));
+ }
+
+ private PolicyMap applyHeaderMessageInterceptor(boolean forceDeliveryMode,
boolean persistent, boolean forceExpiration, long zeroExpirationOverride, long
expirationCeiling) {
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+
+ HeaderMessageInterceptorStrategy headerMessageInterceptorStrategy =
new HeaderMessageInterceptorStrategy();
+
+ // Persistence related fields
+
headerMessageInterceptorStrategy.setForceDeliveryMode(forceDeliveryMode);
+ headerMessageInterceptorStrategy.setPersistent(persistent);
+
+ // Expiration related fields
+ headerMessageInterceptorStrategy.setForceExpiration(forceExpiration);
+
headerMessageInterceptorStrategy.setZeroExpirationOverride(zeroExpirationOverride);
+
headerMessageInterceptorStrategy.setExpirationCeiling(expirationCeiling);
+
defaultEntry.setMessageInterceptorStrategy(headerMessageInterceptorStrategy);
+
+ IndividualDeadLetterStrategy individualDeadLetterStrategy = new
IndividualDeadLetterStrategy();
+ individualDeadLetterStrategy.setQueuePrefix("");
+ individualDeadLetterStrategy.setQueueSuffix(".dlq");
+ defaultEntry.setDeadLetterStrategy(individualDeadLetterStrategy);
+
+ policyMap.setDefaultEntry(defaultEntry);
+ broker.setDestinationPolicy(policyMap);
+ return policyMap;
+ }
+
+ private Queue createQueue(String queueName) throws Exception {
+ Queue queue = session.createQueue(queueName);
+ producer = session.createProducer(queue);
+ return queue;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact