Repository: activemq Updated Branches: refs/heads/trunk d5470254a -> 6cebd2c79
https://issues.apache.org/jira/browse/AMQ-5513 - interaction with - https://issues.apache.org/jira/browse/AMQ-5068 - need to ensure broker cached messages state reflects delivery attempt - RedeliveryRestartWithExceptionTest regression Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6cebd2c7 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6cebd2c7 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6cebd2c7 Branch: refs/heads/trunk Commit: 6cebd2c79e939ae45b4cd94b6433b9d96d2bae8d Parents: d547025 Author: gtully <[email protected]> Authored: Mon Jan 26 12:13:45 2015 +0000 Committer: gtully <[email protected]> Committed: Mon Jan 26 12:13:55 2015 +0000 ---------------------------------------------------------------------- .../activemq/broker/region/RegionBroker.java | 24 ++++++++++++++++++-- .../RedeliveryRestartWithExceptionTest.java | 4 ++-- 2 files changed, 24 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6cebd2c7/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 658bc7c..2943c98 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -66,6 +66,7 @@ import org.apache.activemq.state.ConnectionState; import org.apache.activemq.store.PListStore; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.BrokerSupport; import org.apache.activemq.util.IdGenerator; @@ -613,8 +614,8 @@ public class RegionBroker extends EmptyBroker { } @Override - public void preProcessDispatch(MessageDispatch messageDispatch) { - Message message = messageDispatch.getMessage(); + public void preProcessDispatch(final MessageDispatch messageDispatch) { + final Message message = messageDispatch.getMessage(); if (message != null) { long endTime = System.currentTimeMillis(); message.setBrokerOutTime(endTime); @@ -627,6 +628,25 @@ public class RegionBroker extends EmptyBroker { message.incrementRedeliveryCounter(); try { ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); + messageDispatch.setTransmitCallback(new TransmitCallback() { + // dispatch is considered a delivery, so update sub state post dispatch otherwise + // on a disconnect/reconnect cached messages will not reflect initial delivery attempt + final TransmitCallback delegate = messageDispatch.getTransmitCallback(); + @Override + public void onSuccess() { + message.incrementRedeliveryCounter(); + if (delegate != null) { + delegate.onSuccess(); + } + } + + @Override + public void onFailure() { + if (delegate != null) { + delegate.onFailure(); + } + } + }); } catch (IOException error) { RuntimeException runtimeException = new RuntimeException("Failed to persist JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error); LOG.warn(runtimeException.getLocalizedMessage(), runtimeException); http://git-wip-us.apache.org/repos/asf/activemq/blob/6cebd2c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java index 4126f06..eae86d6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java @@ -200,8 +200,8 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport { msg = (TextMessage) consumer.receive(4000); LOG.info("redelivered? got: " + msg); assertNotNull("got the message again", msg); - assertEquals("re delivery flag", true, msg.getJMSRedelivered()); - assertTrue("redelivery count survives reconnect", msg.getLongProperty("JMSXDeliveryCount") > 1); + assertEquals("re delivery flag on:" + i, true, msg.getJMSRedelivered()); + assertTrue("redelivery count survives reconnect for:" + i, msg.getLongProperty("JMSXDeliveryCount") > 1); msg.acknowledge(); }
