[
https://issues.apache.org/jira/browse/CAMEL-15333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brad Harvey updated CAMEL-15333:
--------------------------------
Description:
Pre-reqs: SJMS producer with transacted=true, and not linked to a shared
session (ReturnProducerCallback chosen in SjmsProducer#process)
Sequence of events in method InOnlyProducer.sendMessage:
# message is sent successfully by producer.getMessageProducer().send(message);
# producer is returned to pool by releaseProducerCallback.release(producer);
{code:java}
@Override
public void sendMessage(final Exchange exchange, final AsyncCallback callback,
final MessageProducerResources producer, final ReleaseProducerCallback
releaseProducerCallback) throws Exception {
try {
Message message = getEndpoint().getBinding().makeJmsMessage(exchange,
producer.getSession());
producer.getMessageProducer().send(message);
} catch (Exception e) {
exchange.setException(new CamelExchangeException("Unable to complete
sending the JMS message", exchange, e));
} finally {
releaseProducerCallback.release(producer);
callback.done(isSynchronous());
}
}
{code}
When the producer is returned to the pool, the pool may decide that it should
be disposed. See GenericObjectPool#addToObjectPool. The variable
shouldDestroy can be set to true for a few reasons, but in my case it was
because there were too many idle producers in the pool (_pool.size() >=
_maxIdle).
When it is destroyed,
SjmsProducer$MessageProducerResourcesFactory#destroyObject is called. If the
session is transacted, then this method will roll it back.
{code:java}
@Override
public void destroyObject(MessageProducerResources model) throws Exception {
if (model.getMessageProducer() != null) {
model.getMessageProducer().close();
}
if (model.getSession() != null) {
try {
if (model.getSession().getTransacted()) {
try {
model.getSession().rollback();
} catch (Exception e) {
// Do nothing. Just make sure we are cleaned up
}
}
model.getSession().close();
} catch (Exception e) {
// TODO why is the session closed already?
}
}
} {code}
There was only a subtle indication that this happened in the log, with some
logging from the JMS client library (qpid in this case) at debug level.
{code:java}
2020-07-22 11:34:48.259 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
o.a.camel.component.sjms.SjmsProducer : Processing
Exchange.id:ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:48.260
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms2.Sjms2Endpoint :
Creating ConnectionResource with connectionCount: 1 using ConnectionFactory:
org.springframework.jms.connection.CachingConnectionFactory@2db98e22 2020-07-22
11:34:49.158 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started:
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext :
Begin: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory : Registering
cached JMS Session for mode 0: org.apache.qpid.jms.JmsSession@322f173f
2020-07-22 11:34:49.173 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
o.s.j.c.CachingConnectionFactory : Registering cached JMS MessageProducer for
destination [queuename]: org.apache.qpid.jms.JmsMessageProducer@24ff95c9
2020-07-22 11:34:49.175 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
o.a.camel.component.sjms.SjmsProducer : Sending message synchronously: <?xml
version="1.0" encoding="UTF-8"?> (snip) 2020-07-22 11:34:49.201
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext :
Rollback: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22
11:34:49.207 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started:
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 2020-07-22 11:34:49.367 DEBUG|||
27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
o.a.q.j.p.a.AmqpTransactionCoordinator : Last TX request succeeded:
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.367
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer :
Processing Exchange.id:ID-EXCHANGE-1595392487340-0-3 - SUCCESS 2020-07-22
11:34:49.371 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 ---
[Camel (camel-1) thread #15 - SjmsBatchConsumer]
.c.s.t.SessionTransactionSynchronization : Processing completion of ExchangeId:
ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:49.371
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext :
Commit: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4
{code}
Even the subsequent session.commit() in
SessionTransactionSynchronization#onComplete did not fail - I think a new
transaction (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4) was started on the
session when the first transaction was rolled back
(TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2).
The end result of this is that InOnlyProducer.sendMessage completes
successfully without setting any exception on the exchange, but the message has
been rolled back and not sent. It is lost.
was:
Pre-reqs: SJMS producer with transacted=true, and not linked to a shared
session (ReturnProducerCallback chosen in SjmsProducer#process)
Sequence of events in method InOnlyProducer.sendMessage:
# message is sent successfully by producer.getMessageProducer().send(message);
# producer is returned to pool by releaseProducerCallback.release(producer);
{code:java}
@Override
public void sendMessage(final Exchange exchange, final AsyncCallback callback,
final MessageProducerResources producer, final ReleaseProducerCallback
releaseProducerCallback) throws Exception {
try {
Message message = getEndpoint().getBinding().makeJmsMessage(exchange,
producer.getSession());
producer.getMessageProducer().send(message);
} catch (Exception e) {
exchange.setException(new CamelExchangeException("Unable to complete
sending the JMS message", exchange, e));
} finally {
releaseProducerCallback.release(producer);
callback.done(isSynchronous());
}
}
{code}
When the producer is returned to the pool, the pool may decide that it should
be disposed. See GenericObjectPool#addToObjectPool. The variable
shouldDestroy can be set to true for a few reasons, but in my case it was
because there were too many idle producers in the pool (_pool.size() >=
_maxIdle).
When it is destroyed,
SjmsProducer$MessageProducerResourcesFactory#destroyObject is called. If the
session is transacted, then this method will roll it back.
{code:java}
@Override
public void destroyObject(MessageProducerResources model) throws Exception {
if (model.getMessageProducer() != null) {
model.getMessageProducer().close();
}
if (model.getSession() != null) {
try {
if (model.getSession().getTransacted()) {
try {
model.getSession().rollback();
} catch (Exception e) {
// Do nothing. Just make sure we are cleaned up
}
}
model.getSession().close();
} catch (Exception e) {
// TODO why is the session closed already?
}
}
} {code}
There was only a subtle indication that this happened in the log, with some
logging from the JMS client library (qpid in this case) at debug level.
{code:java}
2020-07-22 11:34:48.259 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
o.a.camel.component.sjms.SjmsProducer : Processing
Exchange.id:ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:48.260
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms2.Sjms2Endpoint :
Creating ConnectionResource with connectionCount: 1 using ConnectionFactory:
org.springframework.jms.connection.CachingConnectionFactory@2db98e22 2020-07-22
11:34:49.158 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started:
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext :
Begin: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory : Registering
cached JMS Session for mode 0: org.apache.qpid.jms.JmsSession@322f173f
2020-07-22 11:34:49.173 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
o.s.j.c.CachingConnectionFactory : Registering cached JMS MessageProducer for
destination [queuename]: org.apache.qpid.jms.JmsMessageProducer@24ff95c9
2020-07-22 11:34:49.175 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
o.a.camel.component.sjms.SjmsProducer : Sending message synchronously: <?xml
version="1.0" encoding="UTF-8"?> (snip) 2020-07-22 11:34:49.201
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext :
Rollback: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22
11:34:49.207 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started:
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 2020-07-22 11:34:49.367 DEBUG|||
27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
o.a.q.j.p.a.AmqpTransactionCoordinator : Last TX request succeeded:
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.367
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer :
Processing Exchange.id:ID-EXCHANGE-1595392487340-0-3 - SUCCESS 2020-07-22
11:34:49.371 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 ---
[Camel (camel-1) thread #15 - SjmsBatchConsumer]
.c.s.t.SessionTransactionSynchronization : Processing completion of ExchangeId:
ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:49.371
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext :
Commit: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4
{code}
Even the subsequent session.commit() in
SessionTransactionSynchronization#onComplete did not fail - I think a new one
was started (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4) when the first one
was rolled back (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2).
The end result of this is that InOnlyProducer.sendMessage completes
successfully without setting any exception on the exchange, but the message has
been rolled back and not sent. It is lost.
> SJMS transacted producer can lose messages
> ------------------------------------------
>
> Key: CAMEL-15333
> URL: https://issues.apache.org/jira/browse/CAMEL-15333
> Project: Camel
> Issue Type: Bug
> Components: camel-sjms
> Affects Versions: 3.4.1
> Reporter: Brad Harvey
> Priority: Major
>
> Pre-reqs: SJMS producer with transacted=true, and not linked to a shared
> session (ReturnProducerCallback chosen in SjmsProducer#process)
> Sequence of events in method InOnlyProducer.sendMessage:
> # message is sent successfully by
> producer.getMessageProducer().send(message);
> # producer is returned to pool by releaseProducerCallback.release(producer);
>
> {code:java}
> @Override
> public void sendMessage(final Exchange exchange, final AsyncCallback
> callback, final MessageProducerResources producer, final
> ReleaseProducerCallback releaseProducerCallback) throws Exception {
> try {
> Message message = getEndpoint().getBinding().makeJmsMessage(exchange,
> producer.getSession());
> producer.getMessageProducer().send(message);
> } catch (Exception e) {
> exchange.setException(new CamelExchangeException("Unable to complete
> sending the JMS message", exchange, e));
> } finally {
> releaseProducerCallback.release(producer);
> callback.done(isSynchronous());
> }
> }
> {code}
>
> When the producer is returned to the pool, the pool may decide that it should
> be disposed. See GenericObjectPool#addToObjectPool. The variable
> shouldDestroy can be set to true for a few reasons, but in my case it was
> because there were too many idle producers in the pool (_pool.size() >=
> _maxIdle).
> When it is destroyed,
> SjmsProducer$MessageProducerResourcesFactory#destroyObject is called. If the
> session is transacted, then this method will roll it back.
> {code:java}
> @Override
> public void destroyObject(MessageProducerResources model) throws Exception {
> if (model.getMessageProducer() != null) {
> model.getMessageProducer().close();
> }
> if (model.getSession() != null) {
> try {
> if (model.getSession().getTransacted()) {
> try {
> model.getSession().rollback();
> } catch (Exception e) {
> // Do nothing. Just make sure we are cleaned up
> }
> }
> model.getSession().close();
> } catch (Exception e) {
> // TODO why is the session closed already?
> }
> }
> } {code}
>
> There was only a subtle indication that this happened in the log, with some
> logging from the JMS client library (qpid in this case) at debug level.
> {code:java}
> 2020-07-22 11:34:48.259 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
> 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
> o.a.camel.component.sjms.SjmsProducer : Processing
> Exchange.id:ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:48.260
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
> thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms2.Sjms2Endpoint :
> Creating ConnectionResource with connectionCount: 1 using ConnectionFactory:
> org.springframework.jms.connection.CachingConnectionFactory@2db98e22
> 2020-07-22 11:34:49.158 DEBUG||| 27108 --- [AmqpProvider
> :(1):[amqps://localhost:5672]] o.a.q.j.p.a.AmqpTransactionCoordinator : New
> TX started: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22
> 11:34:49.158 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 ---
> [Camel (camel-1) thread #15 - SjmsBatchConsumer]
> o.a.qpid.jms.JmsLocalTransactionContext : Begin:
> TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
> thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory :
> Registering cached JMS Session for mode 0:
> org.apache.qpid.jms.JmsSession@322f173f 2020-07-22 11:34:49.173
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
> thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory :
> Registering cached JMS MessageProducer for destination [queuename]:
> org.apache.qpid.jms.JmsMessageProducer@24ff95c9 2020-07-22 11:34:49.175
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
> thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer :
> Sending message synchronously: <?xml version="1.0" encoding="UTF-8"?> (snip)
> 2020-07-22 11:34:49.201 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3|
> 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer]
> o.a.qpid.jms.JmsLocalTransactionContext : Rollback:
> TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.207
> DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
> o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started:
> TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 2020-07-22 11:34:49.367
> DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]]
> o.a.q.j.p.a.AmqpTransactionCoordinator : Last TX request succeeded:
> TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.367
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
> thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer :
> Processing Exchange.id:ID-EXCHANGE-1595392487340-0-3 - SUCCESS 2020-07-22
> 11:34:49.371 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 ---
> [Camel (camel-1) thread #15 - SjmsBatchConsumer]
> .c.s.t.SessionTransactionSynchronization : Processing completion of
> ExchangeId: ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:49.371
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1)
> thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext :
> Commit: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4
> {code}
> Even the subsequent session.commit() in
> SessionTransactionSynchronization#onComplete did not fail - I think a new
> transaction (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4) was started on
> the session when the first transaction was rolled back
> (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2).
>
> The end result of this is that InOnlyProducer.sendMessage completes
> successfully without setting any exception on the exchange, but the message
> has been rolled back and not sent. It is lost.
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)