Brad Harvey created CAMEL-15333:
-----------------------------------
Summary: SJMS transacted producer can lose messages
Key: CAMEL-15333
URL: https://issues.apache.org/jira/browse/CAMEL-15333
Project: Camel
Issue Type: Bug
Components: camel-sjms
Affects Versions: 3.4.1
Reporter: Brad Harvey
Pre-reqs: SJMS producer with transacted=true, and not linked to a shared
session (ReturnProducerCallback chosen in SjmsProducer#process)
Sequence of events in method InOnlyProducer.sendMessage:
# message is sent successfully by producer.getMessageProducer().send(message);
# producer is returned to pool by releaseProducerCallback.release(producer);
{code:java}
@Override
public void sendMessage(final Exchange exchange, final AsyncCallback callback,
final MessageProducerResources producer, final ReleaseProducerCallback
releaseProducerCallback) throws Exception {
try {
Message message = getEndpoint().getBinding().makeJmsMessage(exchange,
producer.getSession());
producer.getMessageProducer().send(message);
} catch (Exception e) {
exchange.setException(new CamelExchangeException("Unable to complete
sending the JMS message", exchange, e));
} finally {
releaseProducerCallback.release(producer);
callback.done(isSynchronous());
}
}
{code}
When the producer is returned to the pool, the pool may decide that it should
be disposed. See GenericObjectPool#addToObjectPool. The variable
shouldDestroy can be set to true for a few reasons, but in my case it was
because there were too many idle producers in the pool (_pool.size() >=
_maxIdle).
When it is destroyed,
SjmsProducer$MessageProducerResourcesFactory#destroyObject is called. If the
session is transacted, then this method will roll it back.
{code:java}
@Override
public void destroyObject(MessageProducerResources model) throws Exception {
if (model.getMessageProducer() != null) {
model.getMessageProducer().close();
}
if (model.getSession() != null) {
try {
if (model.getSession().getTransacted()) {
try {
model.getSession().rollback();
} catch (Exception e) {
// Do nothing. Just make sure we are cleaned up
}
}
model.getSession().close();
} catch (Exception e) {
// TODO why is the session closed already?
}
}
} {code}
There was only a subtle indication that this happened in the log, with some
logging from the JMS client library (qpid in this case) at debug level.
{code:java}
2020-07-22 11:34:48.259 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
o.a.camel.component.sjms.SjmsProducer : Processing
Exchange.id:ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:48.260
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms2.Sjms2Endpoint :
Creating ConnectionResource with connectionCount: 1 using ConnectionFactory:
org.springframework.jms.connection.CachingConnectionFactory@2db98e22 2020-07-22
11:34:49.158 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started:
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext :
Begin: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory : Registering
cached JMS Session for mode 0: org.apache.qpid.jms.JmsSession@322f173f
2020-07-22 11:34:49.173 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
o.s.j.c.CachingConnectionFactory : Registering cached JMS MessageProducer for
destination [queuename]: org.apache.qpid.jms.JmsMessageProducer@24ff95c9
2020-07-22 11:34:49.175 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
o.a.camel.component.sjms.SjmsProducer : Sending message synchronously: <?xml
version="1.0" encoding="UTF-8"?> (snip) 2020-07-22 11:34:49.201
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext :
Rollback: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22
11:34:49.207 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started:
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 2020-07-22 11:34:49.367 DEBUG|||
27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
o.a.q.j.p.a.AmqpTransactionCoordinator : Last TX request succeeded:
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.367
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer :
Processing Exchange.id:ID-EXCHANGE-1595392487340-0-3 - SUCCESS 2020-07-22
11:34:49.371 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 ---
[Camel (camel-1) thread #15 - SjmsBatchConsumer]
.c.s.t.SessionTransactionSynchronization : Processing completion of ExchangeId:
ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:49.371
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext :
Commit: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4
{code}
Even the subsequent session.commit() in
SessionTransactionSynchronization#onComplete did not fail - I think a new one
was started (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4) when the first one
was rolled back (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2).
The end result of this is that InOnlyProducer.sendMessage completes
successfully without setting any exception on the exchange, but the message has
been rolled back and not sent. It is lost.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)