ah, I see your jira and patch. will try and have a peek tomorrow. On 11 September 2014 22:47, Gary Tully <[email protected]> wrote: > A store failure is typically fatal. There is an ioexception handler in > the loop for message sends and the default behaviour is to stop the > broker. > One option is to pull in the ioexception handler in this case also, > allowing the broker stop behaviour. > > at this point, predispatch, the message is pending an ack and won't > get dispatched again till the consumer/subscription closes so not > dispatching will leave it dangling. > > What is the ideal behaviour for a message in this case? > > > On 4 September 2014 21:26, Fugitt, Jesse <[email protected]> wrote: >> When using the new option persisteJMSRedelivered (to ensure the redelivered >> flag is set correctly on potentially duplicate messages that are >> re-dispatched by the broker even after a restart): <policyEntry queue=">" >> persistJMSRedelivered="true"></policyEntry> >> >> there is still a case where a message can be re-sent and will not be marked >> as redelivered. I can open a JIRA and probably create a unit test but it is >> pretty clear from the pasted code below where the exception is getting >> swallowed. Would the preferred fix be to update the broker interface and >> make preProcessDispatch throw an IOException or would it be better to add a >> new field to the MessageDispatch class to indicate an exception occurred and >> leave the interface alone? >> >> The specific case when this can happen is when a MessageStore returns an >> exception during the updateMessage call, which then gets swallowed (and an >> ERROR logged) and still allows the message to be dispatched to the consumer. >> The exception seems like it should actually propagate out of the >> preProcessDispatch function in RegionBroker as shown below, but this would >> require changing the Broker interface and making the void preProcessDispatch >> function throw an IOException. >> >> //RegionBroker.java >> @Override >> public void preProcessDispatch(MessageDispatch messageDispatch) { >> Message message = messageDispatch.getMessage(); >> if (message != null) { >> long endTime = System.currentTimeMillis(); >> message.setBrokerOutTime(endTime); >> if (getBrokerService().isEnableStatistics()) { >> long totalTime = endTime - message.getBrokerInTime(); >> ((Destination) >> message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); >> } >> if (((BaseDestination) >> message.getRegionDestination()).isPersistJMSRedelivered() && >> !message.isRedelivered() && message.isPersistent()) { >> final int originalValue = message.getRedeliveryCounter(); >> message.incrementRedeliveryCounter(); >> try { >> ((BaseDestination) >> message.getRegionDestination()).getMessageStore().updateMessage(message); >> } catch (IOException error) { >> LOG.error("Failed to persist JMSRedeliveryFlag on {} in >> {}", message.getMessageId(), message.getDestination(), error); >> } finally { >> message.setRedeliveryCounter(originalValue); >> } >> } >> } >> } >> >> //TransportConnection.java >> protected void processDispatch(Command command) throws IOException { >> MessageDispatch messageDispatch = (MessageDispatch) >> (command.isMessageDispatch() ? command : null); >> try { >> if (!stopping.get()) { >> if (messageDispatch != null) { >> broker.preProcessDispatch(messageDispatch); >> } >> dispatch(command); //This code will dispatch the message >> whether or not the updateMessage function actually worked >> } >> ... >> >> >> >> I wanted to get input on this issue before proceeding further with it. >> >> Thanks, >> Jesse > > > > -- > http://redhat.com > http://blog.garytully.com
-- http://redhat.com http://blog.garytully.com
