[
https://issues.apache.org/jira/browse/AMQ-5347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gary Tully resolved AMQ-5347.
-----------------------------
Resolution: Fixed
thanks for the test case and patch. I decided to keep the broker interface as
is b/c lots of folks have their own plugins so the exception is propagated via
a runtime exception. The exception kills the connection and hence the consumer
and the delivery failure is properly accounted for.
A variant of your test where the broker remains alive exposed another issue
with the redelivery flag and dispatch ordering. That is also sorted with
http://git-wip-us.apache.org/repos/asf/activemq/commit/52e1a05a
> persistJMSRedelivered flag doesn't work correctly when exceptions occur
> -----------------------------------------------------------------------
>
> Key: AMQ-5347
> URL: https://issues.apache.org/jira/browse/AMQ-5347
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.10.0
> Reporter: Jesse Fugitt
> Assignee: Gary Tully
> Fix For: 5.11.0
>
> Attachments: AMQ5347.patch, RedeliveryRestartWithExceptionTest.java
>
>
> The new flag in 5.10 that ensures the JMSRedelivered flag persists across
> broker restarts does not work correctly when an exception occurs when
> attempting to write the "message update" to disk before the restart. In that
> case, messages can be assigned to receivers, the broker can be restarted, and
> then the messages are re-assigned to receivers and do not include the
> JMSRedelivered flag as expected. I will attach a unit test and proposed fix
> to illustrate the problem.
> Also, here is additional information I had sent to the mailing list:
> When using the new option persisteJMSRedelivered (to ensure the redelivered
> flag is set correctly on potentially duplicate messages that are
> re-dispatched by the broker even after a restart): <policyEntry queue=">"
> persistJMSRedelivered="true"></policyEntry>
> there is still a case where a message can be re-sent and will not be marked
> as redelivered. I can open a JIRA and probably create a unit test but it is
> pretty clear from the pasted code below where the exception is getting
> swallowed. Would the preferred fix be to update the broker interface and
> make preProcessDispatch throw an IOException or would it be better to add a
> new field to the MessageDispatch class to indicate an exception occurred and
> leave the interface alone?
> The specific case when this can happen is when a MessageStore returns an
> exception during the updateMessage call, which then gets swallowed (and an
> ERROR logged) and still allows the message to be dispatched to the consumer.
> The exception seems like it should actually propagate out of the
> preProcessDispatch function in RegionBroker as shown below, but this would
> require changing the Broker interface and making the void preProcessDispatch
> function throw an IOException.
> //RegionBroker.java
> @Override
> public void preProcessDispatch(MessageDispatch messageDispatch) {
> Message message = messageDispatch.getMessage();
> if (message != null) {
> long endTime = System.currentTimeMillis();
> message.setBrokerOutTime(endTime);
> if (getBrokerService().isEnableStatistics()) {
> long totalTime = endTime - message.getBrokerInTime();
> ((Destination)
> message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
> }
> if (((BaseDestination)
> message.getRegionDestination()).isPersistJMSRedelivered() &&
> !message.isRedelivered() && message.isPersistent()) {
> final int originalValue = message.getRedeliveryCounter();
> message.incrementRedeliveryCounter();
> try {
> ((BaseDestination)
> message.getRegionDestination()).getMessageStore().updateMessage(message);
> } catch (IOException error) {
> LOG.error("Failed to persist JMSRedeliveryFlag on {} in
> {}", message.getMessageId(), message.getDestination(), error);
> } finally {
> message.setRedeliveryCounter(originalValue);
> }
> }
> }
> }
> //TransportConnection.java
> protected void processDispatch(Command command) throws IOException {
> MessageDispatch messageDispatch = (MessageDispatch)
> (command.isMessageDispatch() ? command : null);
> try {
> if (!stopping.get()) {
> if (messageDispatch != null) {
> broker.preProcessDispatch(messageDispatch);
> }
> dispatch(command); //This code will dispatch the message
> whether or not the updateMessage function actually worked
> }
> ...
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)