[ 
https://issues.apache.org/jira/browse/CAMEL-15333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad Harvey updated CAMEL-15333:
--------------------------------
    Description: 
Pre-reqs:  SJMS producer with transacted=true, and not linked to a shared 
session (ReturnProducerCallback chosen in SjmsProducer#process)

Sequence of events in method InOnlyProducer.sendMessage:
 # message is sent successfully by  producer.getMessageProducer().send(message);
 # producer is returned to pool by releaseProducerCallback.release(producer);

 
{code:java}
@Override
public void sendMessage(final Exchange exchange, final AsyncCallback callback, 
final MessageProducerResources producer, final ReleaseProducerCallback 
releaseProducerCallback) throws Exception {
    try {
        Message message = getEndpoint().getBinding().makeJmsMessage(exchange, 
producer.getSession());
        producer.getMessageProducer().send(message);
    } catch (Exception e) {
        exchange.setException(new CamelExchangeException("Unable to complete 
sending the JMS message", exchange, e));
    } finally {
        releaseProducerCallback.release(producer);
        callback.done(isSynchronous());
    }
}
 {code}
 

When the producer is returned to the pool, the pool may decide that it should 
be disposed.  See GenericObjectPool#addToObjectPool.  The variable 
shouldDestroy can be set to true for a few reasons, but in my case it was 
because there were too many idle producers in the pool (_pool.size() >= 
_maxIdle). 

When it is destroyed, 
SjmsProducer$MessageProducerResourcesFactory#destroyObject is called.  If the 
session is transacted, then this method will roll it back.
{code:java}
@Override
public void destroyObject(MessageProducerResources model) throws Exception {
    if (model.getMessageProducer() != null) {
        model.getMessageProducer().close();
    }

    if (model.getSession() != null) {
        try {
            if (model.getSession().getTransacted()) {
                try {
                    model.getSession().rollback();
                } catch (Exception e) {
                    // Do nothing. Just make sure we are cleaned up
                }
            }
            model.getSession().close();
        } catch (Exception e) {
            // TODO why is the session closed already?
        }
    }
} {code}
 

There was only a subtle indication that this happened in the log, with some 
logging from the JMS client library (qpid in this case) at debug level.
{code:java}
2020-07-22 11:34:48.259 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] 
o.a.camel.component.sjms.SjmsProducer : Processing 
Exchange.id:ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:48.260 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms2.Sjms2Endpoint : 
Creating ConnectionResource with connectionCount: 1 using ConnectionFactory: 
org.springframework.jms.connection.CachingConnectionFactory@2db98e22 2020-07-22 
11:34:49.158 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] 
o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started: 
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext : 
Begin: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory : Registering 
cached JMS Session for mode 0: org.apache.qpid.jms.JmsSession@322f173f 
2020-07-22 11:34:49.173 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] 
o.s.j.c.CachingConnectionFactory : Registering cached JMS MessageProducer for 
destination [queuename]: org.apache.qpid.jms.JmsMessageProducer@24ff95c9 
2020-07-22 11:34:49.175 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] 
o.a.camel.component.sjms.SjmsProducer : Sending message synchronously: <?xml 
version="1.0" encoding="UTF-8"?> (snip) 2020-07-22 11:34:49.201 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext : 
Rollback: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 
11:34:49.207 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] 
o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started: 
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 2020-07-22 11:34:49.367 DEBUG||| 
27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] 
o.a.q.j.p.a.AmqpTransactionCoordinator : Last TX request succeeded: 
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.367 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer : 
Processing Exchange.id:ID-EXCHANGE-1595392487340-0-3 - SUCCESS 2020-07-22 
11:34:49.371 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- 
[Camel (camel-1) thread #15 - SjmsBatchConsumer] 
.c.s.t.SessionTransactionSynchronization : Processing completion of ExchangeId: 
ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:49.371 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext : 
Commit: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 
{code}
Even the subsequent session.commit() in 
SessionTransactionSynchronization#onComplete did not fail - I think a new 
transaction (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4) was started on the 
session when the first transaction was rolled back 
(TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2).

 

The end result of this is that InOnlyProducer.sendMessage completes 
successfully without setting any exception on the exchange, but the message has 
been rolled back and not sent.  It is lost.

 

 

  was:
Pre-reqs:  SJMS producer with transacted=true, and not linked to a shared 
session (ReturnProducerCallback chosen in SjmsProducer#process)

Sequence of events in method InOnlyProducer.sendMessage:
 # message is sent successfully by  producer.getMessageProducer().send(message);
 # producer is returned to pool by releaseProducerCallback.release(producer);

 
{code:java}
@Override
public void sendMessage(final Exchange exchange, final AsyncCallback callback, 
final MessageProducerResources producer, final ReleaseProducerCallback 
releaseProducerCallback) throws Exception {
    try {
        Message message = getEndpoint().getBinding().makeJmsMessage(exchange, 
producer.getSession());
        producer.getMessageProducer().send(message);
    } catch (Exception e) {
        exchange.setException(new CamelExchangeException("Unable to complete 
sending the JMS message", exchange, e));
    } finally {
        releaseProducerCallback.release(producer);
        callback.done(isSynchronous());
    }
}
 {code}
 

When the producer is returned to the pool, the pool may decide that it should 
be disposed.  See GenericObjectPool#addToObjectPool.  The variable 
shouldDestroy can be set to true for a few reasons, but in my case it was 
because there were too many idle producers in the pool (_pool.size() >= 
_maxIdle). 

When it is destroyed, 
SjmsProducer$MessageProducerResourcesFactory#destroyObject is called.  If the 
session is transacted, then this method will roll it back.
{code:java}
@Override
public void destroyObject(MessageProducerResources model) throws Exception {
    if (model.getMessageProducer() != null) {
        model.getMessageProducer().close();
    }

    if (model.getSession() != null) {
        try {
            if (model.getSession().getTransacted()) {
                try {
                    model.getSession().rollback();
                } catch (Exception e) {
                    // Do nothing. Just make sure we are cleaned up
                }
            }
            model.getSession().close();
        } catch (Exception e) {
            // TODO why is the session closed already?
        }
    }
} {code}
 

There was only a subtle indication that this happened in the log, with some 
logging from the JMS client library (qpid in this case) at debug level.
{code:java}
2020-07-22 11:34:48.259 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] 
o.a.camel.component.sjms.SjmsProducer : Processing 
Exchange.id:ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:48.260 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms2.Sjms2Endpoint : 
Creating ConnectionResource with connectionCount: 1 using ConnectionFactory: 
org.springframework.jms.connection.CachingConnectionFactory@2db98e22 2020-07-22 
11:34:49.158 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] 
o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started: 
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext : 
Begin: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory : Registering 
cached JMS Session for mode 0: org.apache.qpid.jms.JmsSession@322f173f 
2020-07-22 11:34:49.173 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] 
o.s.j.c.CachingConnectionFactory : Registering cached JMS MessageProducer for 
destination [queuename]: org.apache.qpid.jms.JmsMessageProducer@24ff95c9 
2020-07-22 11:34:49.175 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 
27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] 
o.a.camel.component.sjms.SjmsProducer : Sending message synchronously: <?xml 
version="1.0" encoding="UTF-8"?> (snip) 2020-07-22 11:34:49.201 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext : 
Rollback: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 
11:34:49.207 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] 
o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started: 
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 2020-07-22 11:34:49.367 DEBUG||| 
27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] 
o.a.q.j.p.a.AmqpTransactionCoordinator : Last TX request succeeded: 
TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.367 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer : 
Processing Exchange.id:ID-EXCHANGE-1595392487340-0-3 - SUCCESS 2020-07-22 
11:34:49.371 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- 
[Camel (camel-1) thread #15 - SjmsBatchConsumer] 
.c.s.t.SessionTransactionSynchronization : Processing completion of ExchangeId: 
ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:49.371 
DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext : 
Commit: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 
{code}
Even the subsequent session.commit() in 
SessionTransactionSynchronization#onComplete did not fail - I think a new one 
was started (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4) when the first one 
was rolled back (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2).

 

The end result of this is that InOnlyProducer.sendMessage completes 
successfully without setting any exception on the exchange, but the message has 
been rolled back and not sent.  It is lost.

 

 


> SJMS transacted producer can lose messages
> ------------------------------------------
>
>                 Key: CAMEL-15333
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15333
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-sjms
>    Affects Versions: 3.4.1
>            Reporter: Brad Harvey
>            Priority: Major
>
> Pre-reqs:  SJMS producer with transacted=true, and not linked to a shared 
> session (ReturnProducerCallback chosen in SjmsProducer#process)
> Sequence of events in method InOnlyProducer.sendMessage:
>  # message is sent successfully by  
> producer.getMessageProducer().send(message);
>  # producer is returned to pool by releaseProducerCallback.release(producer);
>  
> {code:java}
> @Override
> public void sendMessage(final Exchange exchange, final AsyncCallback 
> callback, final MessageProducerResources producer, final 
> ReleaseProducerCallback releaseProducerCallback) throws Exception {
>     try {
>         Message message = getEndpoint().getBinding().makeJmsMessage(exchange, 
> producer.getSession());
>         producer.getMessageProducer().send(message);
>     } catch (Exception e) {
>         exchange.setException(new CamelExchangeException("Unable to complete 
> sending the JMS message", exchange, e));
>     } finally {
>         releaseProducerCallback.release(producer);
>         callback.done(isSynchronous());
>     }
> }
>  {code}
>  
> When the producer is returned to the pool, the pool may decide that it should 
> be disposed.  See GenericObjectPool#addToObjectPool.  The variable 
> shouldDestroy can be set to true for a few reasons, but in my case it was 
> because there were too many idle producers in the pool (_pool.size() >= 
> _maxIdle). 
> When it is destroyed, 
> SjmsProducer$MessageProducerResourcesFactory#destroyObject is called.  If the 
> session is transacted, then this method will roll it back.
> {code:java}
> @Override
> public void destroyObject(MessageProducerResources model) throws Exception {
>     if (model.getMessageProducer() != null) {
>         model.getMessageProducer().close();
>     }
>     if (model.getSession() != null) {
>         try {
>             if (model.getSession().getTransacted()) {
>                 try {
>                     model.getSession().rollback();
>                 } catch (Exception e) {
>                     // Do nothing. Just make sure we are cleaned up
>                 }
>             }
>             model.getSession().close();
>         } catch (Exception e) {
>             // TODO why is the session closed already?
>         }
>     }
> } {code}
>  
> There was only a subtle indication that this happened in the log, with some 
> logging from the JMS client library (qpid in this case) at debug level.
> {code:java}
> 2020-07-22 11:34:48.259 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 
> 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] 
> o.a.camel.component.sjms.SjmsProducer : Processing 
> Exchange.id:ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:48.260 
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
> thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms2.Sjms2Endpoint : 
> Creating ConnectionResource with connectionCount: 1 using ConnectionFactory: 
> org.springframework.jms.connection.CachingConnectionFactory@2db98e22 
> 2020-07-22 11:34:49.158 DEBUG||| 27108 --- [AmqpProvider 
> :(1):[amqps://localhost:5672]] o.a.q.j.p.a.AmqpTransactionCoordinator : New 
> TX started: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 
> 11:34:49.158 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- 
> [Camel (camel-1) thread #15 - SjmsBatchConsumer] 
> o.a.qpid.jms.JmsLocalTransactionContext : Begin: 
> TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158 
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
> thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory : 
> Registering cached JMS Session for mode 0: 
> org.apache.qpid.jms.JmsSession@322f173f 2020-07-22 11:34:49.173 
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
> thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory : 
> Registering cached JMS MessageProducer for destination [queuename]: 
> org.apache.qpid.jms.JmsMessageProducer@24ff95c9 2020-07-22 11:34:49.175 
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
> thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer : 
> Sending message synchronously: <?xml version="1.0" encoding="UTF-8"?> (snip) 
> 2020-07-22 11:34:49.201 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 
> 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] 
> o.a.qpid.jms.JmsLocalTransactionContext : Rollback: 
> TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.207 
> DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] 
> o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started: 
> TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 2020-07-22 11:34:49.367 
> DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] 
> o.a.q.j.p.a.AmqpTransactionCoordinator : Last TX request succeeded: 
> TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.367 
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
> thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer : 
> Processing Exchange.id:ID-EXCHANGE-1595392487340-0-3 - SUCCESS 2020-07-22 
> 11:34:49.371 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- 
> [Camel (camel-1) thread #15 - SjmsBatchConsumer] 
> .c.s.t.SessionTransactionSynchronization : Processing completion of 
> ExchangeId: ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:49.371 
> DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) 
> thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext : 
> Commit: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 
> {code}
> Even the subsequent session.commit() in 
> SessionTransactionSynchronization#onComplete did not fail - I think a new 
> transaction (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4) was started on 
> the session when the first transaction was rolled back 
> (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2).
>  
> The end result of this is that InOnlyProducer.sendMessage completes 
> successfully without setting any exception on the exchange, but the message 
> has been rolled back and not sent.  It is lost.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to