When using the new option persisteJMSRedelivered (to ensure the redelivered 
flag is set correctly on potentially duplicate messages that are re-dispatched 
by the broker even after a restart): <policyEntry queue=">" 
persistJMSRedelivered="true"></policyEntry>

there is still a case where a message can be re-sent and will not be marked as 
redelivered.  I can open a JIRA and probably create a unit test but it is 
pretty clear from the pasted code below where the exception is getting 
swallowed.  Would the preferred fix be to update the broker interface and make 
preProcessDispatch throw an IOException or would it be better to add a new 
field to the MessageDispatch class to indicate an exception occurred and leave 
the interface alone?

The specific case when this can happen is when a MessageStore returns an 
exception during the updateMessage call, which then gets swallowed (and an 
ERROR logged) and still allows the message to be dispatched to the consumer.  
The exception seems like it should actually propagate out of the 
preProcessDispatch function in RegionBroker as shown below, but this would 
require changing the Broker interface and making the void preProcessDispatch 
function throw an IOException.

//RegionBroker.java
    @Override
    public void preProcessDispatch(MessageDispatch messageDispatch) {
        Message message = messageDispatch.getMessage();
        if (message != null) {
            long endTime = System.currentTimeMillis();
            message.setBrokerOutTime(endTime);
            if (getBrokerService().isEnableStatistics()) {
                long totalTime = endTime - message.getBrokerInTime();
                ((Destination) 
message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
            }
            if (((BaseDestination) 
message.getRegionDestination()).isPersistJMSRedelivered() && 
!message.isRedelivered() && message.isPersistent()) {
                final int originalValue = message.getRedeliveryCounter();
                message.incrementRedeliveryCounter();
                try {
                    ((BaseDestination) 
message.getRegionDestination()).getMessageStore().updateMessage(message);
                } catch (IOException error) {
                    LOG.error("Failed to persist JMSRedeliveryFlag on {} in 
{}", message.getMessageId(), message.getDestination(), error);
                } finally {
                    message.setRedeliveryCounter(originalValue);
                }
            }
        }
    }

//TransportConnection.java
    protected void processDispatch(Command command) throws IOException {
        MessageDispatch messageDispatch = (MessageDispatch) 
(command.isMessageDispatch() ? command : null);
        try {
            if (!stopping.get()) {
                if (messageDispatch != null) {
                    broker.preProcessDispatch(messageDispatch);
                }
                dispatch(command);  //This code will dispatch the message 
whether or not the updateMessage function actually worked
            }
        ...



I wanted to get input on this issue before proceeding further with it.

Thanks,
Jesse

Reply via email to