This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new aa03f29  AMQ-7464 - ensure message.copy before server session run 
dispatch
aa03f29 is described below

commit aa03f295f523ab00d2b19d54a36f4140bef18d0b
Author: gtully <[email protected]>
AuthorDate: Wed Apr 8 14:57:49 2020 +0100

    AMQ-7464 - ensure message.copy before server session run dispatch
---
 .../java/org/apache/activemq/ActiveMQSession.java  |  15 ++-
 .../org/apache/activemq/RedeliveryPolicyTest.java  | 106 ++++++++++++++++++++-
 2 files changed, 117 insertions(+), 4 deletions(-)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 5910634..bef6f4e 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -73,6 +73,7 @@ import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
@@ -882,7 +883,16 @@ public class ActiveMQSession implements Session, 
QueueSession, TopicSession, Sta
         MessageDispatch messageDispatch;
         while ((messageDispatch = executor.dequeueNoWait()) != null) {
             final MessageDispatch md = messageDispatch;
-            final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
+
+            // subset of 
org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage
+            final ActiveMQMessage message = 
(ActiveMQMessage)md.getMessage().copy();
+            if 
(message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
+                ((ActiveMQBlobMessage)message).setBlobDownloader(new 
BlobDownloader(getBlobTransferPolicy()));
+            }
+            if (message.getDataStructureType() == 
CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
+                
((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages());
+                
((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages());
+            }
 
             MessageAck earlyAck = null;
             if (message.isExpired()) {
@@ -951,7 +961,7 @@ public class ActiveMQSession implements Session, 
QueueSession, TopicSession, Sta
                             @Override
                             public void afterRollback() throws Exception {
                                 if (LOG.isTraceEnabled()) {
-                                    LOG.trace("rollback {}", ack, new 
Throwable("here"));
+                                    LOG.trace("afterRollback {}", ack, new 
Throwable("here"));
                                 }
                                 // ensure we don't filter this as a duplicate
                                 
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
@@ -979,6 +989,7 @@ public class ActiveMQSession implements Session, 
QueueSession, TopicSession, Sta
                                     MessageAck ack = new MessageAck(md, 
MessageAck.POSION_ACK_TYPE, 1);
                                     
ack.setFirstMessageId(md.getMessage().getMessageId());
                                     ack.setPoisonCause(new Throwable("Exceeded 
ra redelivery policy limit:" + redeliveryPolicy));
+                                    LOG.trace("Exceeded redelivery with count: 
{}, Ack: {}", redeliveryCounter, ack);
                                     asyncSendPacket(ack);
 
                                 } else {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index a0a1ca8..5f325a4 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -149,7 +149,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
     /**
      * @throws Exception
      */
-    public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws 
Exception {
+    public void testNormalRedeliveryPolicyDelaysDeliveryOnRollback() throws 
Exception {
 
         // Receive a message with the JMS API
         RedeliveryPolicy policy = connection.getRedeliveryPolicy();
@@ -742,7 +742,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
                 public void onMessage(Message message) {
                     try {
                         ActiveMQTextMessage m = (ActiveMQTextMessage) message;
-                        LOG.info("Got: " + ((ActiveMQTextMessage) 
message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) 
message).getMessageId().getBrokerSequenceId());
+                        LOG.info("Got: " + ((ActiveMQTextMessage) 
message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) 
message).getMessageId().getBrokerSequenceId() + ", redeliveryCount: "  + 
m.getRedeliveryCounter());
                         assertEquals("1st", m.getText());
                         assertEquals(receivedCount.get(), 
m.getRedeliveryCounter());
                         receivedCount.incrementAndGet();
@@ -802,6 +802,108 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
 
     }
 
+
+    public void testRepeatedRedeliveryNoCommitForwardMessage() throws 
Exception {
+
+        connection.start();
+        Session dlqSession = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = dlqSession.createProducer(destination);
+
+        // Send the messages
+        producer.send(dlqSession.createTextMessage("1st"));
+
+        dlqSession.commit();
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new 
ActiveMQQueue("ActiveMQ.DLQ"));
+
+        final MessageProducer forwardingProducer = 
dlqSession.createProducer(new ActiveMQQueue("TEST_FORWARD"));
+
+        // Send the messages
+
+        final int maxRedeliveries = 2;
+        final AtomicInteger receivedCount = new AtomicInteger(0);
+
+        for (int i=0;i<=maxRedeliveries+1;i++) {
+            connection = 
(ActiveMQConnection)factory.createConnection(userName, password);
+            connections.add(connection);
+
+            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+            policy.setInitialRedeliveryDelay(0);
+            policy.setUseExponentialBackOff(false);
+            policy.setMaximumRedeliveries(maxRedeliveries);
+
+            connection.start();
+            final CountDownLatch done = new CountDownLatch(1);
+
+            final ActiveMQSession session = (ActiveMQSession) 
connection.createSession(true, Session.SESSION_TRANSACTED);
+            session.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        ActiveMQTextMessage m = (ActiveMQTextMessage) message;
+                        LOG.info("Got: " + ((ActiveMQTextMessage) 
message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) 
message).getMessageId().getBrokerSequenceId() + " ,Redelivery: " + 
m.getRedeliveryCounter());
+                        assertEquals("1st", m.getText());
+                        assertEquals(receivedCount.get(), 
m.getRedeliveryCounter());
+                        receivedCount.incrementAndGet();
+
+                        // do a forward of the received message, will reset 
the messageID
+                        forwardingProducer.send(message);
+                        done.countDown();
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
+                    }
+                }
+            });
+
+            connection.createConnectionConsumer(
+                    destination,
+                    null,
+                    new ServerSessionPool() {
+                        @Override
+                        public ServerSession getServerSession() throws 
JMSException {
+                            return new ServerSession() {
+                                @Override
+                                public Session getSession() throws 
JMSException {
+                                    return session;
+                                }
+
+                                @Override
+                                public void start() throws JMSException {
+                                }
+                            };
+                        }
+                    },
+                    100,
+                    false);
+
+            Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    session.run();
+                    return done.await(10, TimeUnit.MILLISECONDS);
+                }
+            }, 5000);
+
+            if (i<=maxRedeliveries) {
+                assertTrue("listener done @" + i, done.await(5, 
TimeUnit.SECONDS));
+            } else {
+                // final redelivery gets poisoned before dispatch
+                assertFalse("listener not done @" + i, done.await(5, 
TimeUnit.SECONDS));
+            }
+            connection.close();
+            connections.remove(connection);
+        }
+
+        // We should be able to get the message off the DLQ now.
+        TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull("Got message from DLQ", m);
+        assertEquals("1st", m.getText());
+        String cause = 
m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        assertTrue("cause exception has policy ref", 
cause.contains("RedeliveryPolicy"));
+        dlqSession.commit();
+
+    }
+
     public void testRedeliveryRollbackWithDelayBlocking() throws Exception
     {
         redeliveryRollbackWithDelay(true);

Reply via email to