I am trying to get support for using the JMS MessageID as the JMS CorrelationID
as specified in https://issues.apache.org/jira/browse/CXF-2760 . After putting
some work/thought into this issue, I became aware that this feature is
available on the trunk but was not back-merged to the 2.1.x and 2.2.x branches.
I am in the process of trying take what is done on trunk implement something
similar on 2.1 and 2.2. However I have a couple of issues with the
implementation on trunk that I want to sort out before back-porting.
1)There is no attribute in the clientConfig schema to specify that the user
wants to use the MessageID in lieu of the CorrelationID. Currently the logic
for deciding whether to use the MessageID instead of a generated CorrelationID
looks like this:
} else if (!jmsConfig.isSetConduitSelectorPrefix()
&& (exchange.isSynchronous() || exchange.isOneWay())
&& (!jmsConfig.isSetUseConduitIdSelector()
|| !jmsConfig.isUseConduitIdSelector())) {
messageIdPattern = true;
This is quite a bit of mumbo-jumbo which could be sorted out by specifying a
config attribute.
2)There is a bit of code which seem left over from a previous implementation
that has no value:
if (exchange.isSynchronous()) {
synchronized (exchange) {
exchange.put(CORRELATED, Boolean.TRUE);
exchange.notifyAll();
}
}
I don't see the current purpose of this as I don't see any code which has
another thread waiting on the exchange mutex.
3)The biggest issue with the current implementation on the trunk is the fact
that using the MessageID as CorrelationID is not supported for asynchronous
calls. I don't know if this was purposeful or not but the MessageID as
CorrelationID paradigm is only implemented for synchronous calls. Here is the
source of the problem:
if (!exchange.isOneWay()) {
synchronized (exchange) {
jmsTemplate.send(jmsConfig.getTargetDestination(),
messageCreator);
if (messageIdPattern) {
correlationId = messageCreator.getMessageID();
}
headers.setJMSMessageID(messageCreator.getMessageID());
final String messageSelector = "JMSCorrelationID = '" +
correlationId + "'";
if (exchange.isSynchronous()) {
javax.jms.Message replyMessage =
jmsTemplate.receiveSelected(replyToDestination,
messageSelector);
if (replyMessage == null) {
throw new RuntimeException("Timeout receiving message
with correlationId "
+ correlationId);
} else {
doReplyMessage(exchange, replyMessage);
}
}
}
In this situation the MessageID is never put into the correlationMap for future
correlation in onMessage(). Furthermore if the call is async, there is no
JMSListener set up to receive the reply using a selector which selects for the
CorrrelationID equal to the MessageID. So the JMSConduit will never receive the
async callback. In order to support the async scenario, the JMSListener needs
to dynamically set the MessageSelector after the message is sent and the
MessageID is available. Furthermore, in a multi-threaded environment, there has
to be one of these listeners per thread so that threads don't modify the same
message selector when making concurrent calls.
Feedback on these issues is appreciated so that I can move ahead with modifying
trunk/2.2.x/2.1.x.
Regards,
Seumas