I am trying to get support for using the JMS MessageID as the JMS CorrelationID 
as specified in https://issues.apache.org/jira/browse/CXF-2760 . After putting 
some work/thought into this issue, I became aware that this feature is 
available on the trunk but was not back-merged to the 2.1.x and 2.2.x branches. 
I am in the process of trying take what is done on trunk implement something 
similar on 2.1 and 2.2. However I have a  couple of issues with the 
implementation on trunk that I want to sort out before back-porting.

1)There is no attribute in the clientConfig schema to specify that the user 
wants to use the MessageID in lieu of the CorrelationID. Currently the logic 
for deciding whether to use the MessageID instead of a generated CorrelationID 
looks like this:

            } else if (!jmsConfig.isSetConduitSelectorPrefix()
                       && (exchange.isSynchronous() || exchange.isOneWay())
                       && (!jmsConfig.isSetUseConduitIdSelector() 
                           || !jmsConfig.isUseConduitIdSelector())) {
                messageIdPattern = true;

This is quite a bit of mumbo-jumbo which could be sorted out by specifying a 
config attribute.

2)There is a bit of code which seem left over from a previous implementation 
that has no value:

            if (exchange.isSynchronous()) {
                synchronized (exchange) {
                    exchange.put(CORRELATED, Boolean.TRUE);
                    exchange.notifyAll();
                }
            }

I don't see the current purpose of this as I don't see any code which has 
another thread waiting on the exchange mutex.

3)The biggest issue with the current implementation on the trunk is the fact 
that using the MessageID as CorrelationID is not supported for asynchronous 
calls. I don't know if this was purposeful or not but the MessageID as 
CorrelationID paradigm is only implemented for synchronous calls. Here is the 
source of the problem:

        if (!exchange.isOneWay()) {
            synchronized (exchange) {
                jmsTemplate.send(jmsConfig.getTargetDestination(), 
messageCreator);
                if (messageIdPattern) {
                    correlationId = messageCreator.getMessageID();
                }
                headers.setJMSMessageID(messageCreator.getMessageID());

                final String messageSelector = "JMSCorrelationID = '" + 
correlationId + "'";
                if (exchange.isSynchronous()) {
                    javax.jms.Message replyMessage = 
jmsTemplate.receiveSelected(replyToDestination,
                                                                                
 messageSelector);
                    if (replyMessage == null) {
                        throw new RuntimeException("Timeout receiving message 
with correlationId "
                                                   + correlationId);
                    } else {
                        doReplyMessage(exchange, replyMessage);
                    }
                }
            }

In this situation the MessageID is never put into the correlationMap for future 
correlation in onMessage(). Furthermore if the call is async, there is no 
JMSListener set up to receive the reply using a selector which selects for the 
CorrrelationID equal to the MessageID. So the JMSConduit will never receive the 
async callback. In order to support the async scenario, the JMSListener needs 
to dynamically set the MessageSelector after the message is sent and the 
MessageID is available. Furthermore, in a multi-threaded environment, there has 
to be one of these listeners per thread so that threads don't modify the same 
message selector when making concurrent calls.

Feedback on these issues is appreciated so that I can move ahead with modifying 
trunk/2.2.x/2.1.x.

Regards,
Seumas




Reply via email to