[ 
https://issues.apache.org/jira/browse/CAMEL-9606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15574220#comment-15574220
 ] 

Tomohisa Igarashi commented on CAMEL-9606:
------------------------------------------

I created an automated unit test and verified it actually is a problem.
https://github.com/igarashitm/issues/blob/master/camel/misc/src/test/java/CamelSjmsTxTest.java

The camel sjms does make Synchronization effort, but it's not really safe. 
Sometimes only the producer bit of the 
SessionTransactionSynchronization.onComplete() is invoked before terminating a 
process, then eventually the consumer side is rolled back (i.e. put back to 
queue-in) and the message duplicates. Note that message '0017' is in both of 
'queue-in' and 'queue-out'.
{code}
[       ActiveMQ Session Task-1] sionTransactionSynchronization DEBUG 
Processing completion of Exchange 
id:ID-localhost-localdomain-44085-1476420609350-0-17
[       ActiveMQ Session Task-1] sionTransactionSynchronization DEBUG 
Processing completion of Exchange 
id:ID-localhost-localdomain-44085-1476420609350-0-17
[       ActiveMQ Session Task-1] route1                         INFO  =====> 
Forwarding a message:[0017] from 'test-in' queue to 'test-out' queue...
[       ActiveMQ Session Task-1] route1                         INFO  =====> 
Done.
[       ActiveMQ Session Task-1] sionTransactionSynchronization DEBUG 
Processing completion of Exchange 
id:ID-localhost-localdomain-44085-1476420609350-0-18
[                          main] CamelSjmsTxTest                INFO  =====> 
Detroying camel process
....
[                          main] CamelSjmsTxTest                INFO  
======================================================
[                          main] CamelSjmsTxTest                INFO  Initial, 
count=60
[                          main] CamelSjmsTxTest                INFO  
'test-in', count=43       :[0018, 0020, 0022, 0024, 0026, 0028, 0030, 0032, 
0034, 0036, 0038, 0040, 0042, 0044, 0046, 0048, 0050, 0052, 0054, 0056, 0058, 
0017, 0019, 0021, 0023, 0025, 0027, 0029, 0031, 0033, 0035, 0037, 0039, 0041, 
0043, 0045, 0047, 0049, 0051, 0053, 0055, 0057, 0059]
[                          main] CamelSjmsTxTest                INFO  
'test-out', count=18      :[0000, 0001, 0002, 0003, 0004, 0005, 0006, 0007, 
0008, 0009, 0010, 0011, 0012, 0013, 0014, 0015, 0016, 0017]
[                          main] CamelSjmsTxTest                INFO  
======================================================
{code}

The solution would be to let SJMS consumers/producers share a single JMS 
session in a Exchange, so that we can just rely on broker implementation to 
guarantee transaction atomicity.

> SJMS Consumer-Producer in transaciton
> -------------------------------------
>
>                 Key: CAMEL-9606
>                 URL: https://issues.apache.org/jira/browse/CAMEL-9606
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-sjms
>    Affects Versions: 2.15.4, 2.16.2
>            Reporter: Zdeněk Obst
>
> I'm not 100% sure this is a bug but it feels that way from conversation I had 
> via mailing lists.
> I'm trying to ensure transactional processing between SJMS consumer and 
> producer (e.g. using same JMS session). 
> In other words this simple case:
> 1. prepare higher amount of JMS messages in broker (e.g. ActiveMQ with 1000 
> messages) 
> 2. use Camel route from input queue to output queue using trasacted=true 
> 3. start context (starts consuming messages) and in any time kill java 
> process 
> When I kill process, I would expect that sum of messages in input and output 
> queue will be 1000 - so the transaction works. But what happens is that I 
> always end up with 1001+ messages. Maybe it is misconfiguration of routes or 
> misunderstanding how SJMS can work.
> I feel this is critical because JMS is generally used because it its 
> transactional capabilities.
> Here is the sample code I used for reproduction (using ActiveMQ):
> {code:java}
> public class SjmsTransaction {
>     public static void main(String[] args) throws Exception {
>         RouteBuilder rb = new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 onException(Exception.class)
>                         .process(systemOut("Exception!!"));
>                 from("sjms:queue:test-in?transacted=true&consumerCount=5")
>                         .process(systemOut("Processing"))
>                         .to("sjms:queue:test-out?transacted=true")
>                         .process(systemOut("Processed"));
>             }
>         };
>         CamelContext context = new DefaultCamelContext();
>         addJmsComponent(context);
>         context.addRoutes(rb);
>         System.out.println("=====> Starting context");
>         context.start();
>         // Now the context will run and consume messages, when I kill 
> application by force in any time
>         // I expect this to be true: <#messagesInInputAtBeginning> == 
> <#messagesInInputNow> + <#messagesInOutputNow>
>         // What happens is that there is always < (e.g. I submitted 1000 
> messages, out has 500, in has 501)
>     }
>     private static void addJmsComponent(CamelContext context) {
>         ConnectionFactory factory = new 
> ActiveMQConnectionFactory("tcp://localhost:61616");
>         ConnectionFactoryResource connResource = new 
> ConnectionFactoryResource(5, factory);
>         SjmsComponent comp = new SjmsComponent();
>         comp.setConnectionResource(connResource);
>         context.addComponent("sjms", comp);
>     }
>     private static Processor systemOut(final String message) {
>         return new Processor() {
>             @Override
>             public void process(Exchange exchange) throws Exception {
>                 System.out.println(exchange.getExchangeId() + ": " + message);
>             }
>         };
>     }
> }
> {code}
> Note that I tried to use it with various combinations of acknowledgeMode and 
> In/InOut exchange pattern - but without luck.
> I'm not that much oriented in Camel source code but I found that JMS session 
> is held within the exchange so probably when producer finds in an exchange 
> existing JMS session and is configured to be transacted, then maybe it can 
> participate this session? Or maybe there are other hooks (like 
> Synchronization objects) in some registry that take care of this issue?
> Here is the link to the previous mailing list conversation: 
> http://camel.465427.n5.nabble.com/SJMS-transaction-td5777522.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to