Please direct support questions to the ActiveMQ Users mailing list as this list if for discussion of development of the broker itself.
On 3/28/20 5:17 AM, vedion wrote:
Hi, I have a ActiveMQ where I have setup Redelivery on the client side. With a simple consumer it works as expected with the below configurations: ``` import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.springframework.boot.jta.atomikos.AtomikosConnectionFactoryBean; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.listener.DefaultMessageListenerContainer; ... @Bean public ConnectionFactory atomikosConnectionFactoryBean() { String mqUrl = System.getenv("MQ_URL"); AtomikosConnectionFactoryBean atomikos = new AtomikosConnectionFactoryBean(); atomikos.setLocalTransactionMode(false); atomikos.setMaxPoolSize(10); atomikos.setUniqueResourceName("QUEUE_BROKER"); RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setMaximumRedeliveries(4); redeliveryPolicy.setBackOffMultiplier(10); redeliveryPolicy.setRedeliveryDelay(1000L); redeliveryPolicy.setInitialRedeliveryDelay(1000L); redeliveryPolicy.setUseExponentialBackOff(true); redeliveryPolicy.setMaximumRedeliveryDelay(86400000L); ActiveMQXAConnectionFactory xaConnectionFactoryBean = new ActiveMQXAConnectionFactory(System.getenv("MQ_USERNAME"), System.getenv("MQ_PASSWORD"), mqUrl); xaConnectionFactoryBean.setRedeliveryPolicy(redeliveryPolicy); xaConnectionFactoryBean.setNonBlockingRedelivery(true); atomikos.setXaConnectionFactory(xaConnectionFactoryBean); return atomikos; } @Bean public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setErrorHandler(new EHealthEventErrorHandler()); factory.setMessageConverter(jacksonJmsMessageConverter());factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);factory.setDestinationResolver(new EHealthDestinationResolver()); factory.setSessionTransacted(true); return factory; } @Bean(autowire = Autowire.BY_TYPE) public JmsTemplate jmsTemplate() { JmsTemplate jmsTemplate = new JmsTemplate(atomikosConnectionFactoryBean()); jmsTemplate.setDestinationResolver(new EHealthDestinationResolver()); jmsTemplate.setSessionTransacted(true); return jmsTemplate; } ... ``` ``` import org.springframework.jms.annotation.JmsListener; import org.springframework.transaction.annotation.Transactional; @Transactional @JmsListener(destination = "XXX") public void onMessageReceived(XXXEvent event) { throw new Exception(); } ``` So the above works as expected and the message is redelivered with the ExponentialBackOff strategy. BUT it goes sideways when the message consumer (onMessageReceived) calls a method on a class that sends a message to another queue in a new transaction. Then the message is not redelivered if the exception is thrown after the new transaction have been committed, ex: ``` import org.springframework.transaction.annotation.Transactional; public class FooClass { @Transactional(propagation = Propagation.REQUIRES_NEW) public void createInNewTransaction() { sendMessageToAnotherQueue(); } } ``` ``` import org.springframework.jms.annotation.JmsListener; import org.springframework.transaction.annotation.Transactional; @Transactional @JmsListener(destination = "Foo") public void onMessageReceived(FooEvent event) { fooClass.createInNewTransaction(); throw new Exception(); } ``` In the stacktrace below it is seen that the org.apache.activemq.TransactionContext.synchronizations are nulled when sending the message in the new transaction. The TransactionContext.synchronizations contains the ActiveMQMessageConsumer that is used to receive the message and is needed for the redelivery after the exception is thrown. When this is cleared the message is not redelivered: <http://activemq.2283324.n4.nabble.com/file/t379855/Sync_nulled.png> ``` private void afterRollback() throws JMSException { if (synchronizations == null) { return; } ... } ``` It is the method com.atomikos.datasource.xa.session.BranchEnlistedStateHandler.checkEnlistBeforeUse() that detects that the transaction context is different and throws an exception that is catched in SessionHandleState.notifyBeforeUse(): ``` TransactionContextStateHandler checkEnlistBeforeUse ( CompositeTransaction currentTx) throws InvalidSessionHandleStateException, UnexpectedTransactionContextException { if ( currentTx == null || !currentTx.isSameTransaction ( ct ) ) { //OOPS! we are being used a different tx context than the one expected... //TODO check: what if subtransaction? Possible solution: ignore if serial_jta mode, error otherwise. String msg = "The connection/session object is already enlisted in a (different) transaction."; if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( msg ); throw new UnexpectedTransactionContextException(); } //tx context is still the same -> no change in state required return null; } ``` Then a new context is created and currentContext.checkEnlistBeforeUse(ct) is called which ends up clearing the TransactionContext.synchronizations There is a comment in BranchEnlistedStateHandler.checkEnlistBeforeUse(): "//TODO check: what if subtransaction? Possible solution: ignore if serial_jta mode, error otherwise." I have a subtransaction and have "com.atomikos.icatch.serial_jta_transactions" set to true. Am I just unlucky to have hit something that is not supported yet? Versions used: "org.springframework:spring-jms:5.1.10.RELEASE", "com.atomikos:transactions:5.0.3", "org.apache.activemq:activemq-client:5.15.10" Have tried to bump to newest versions, but didn't make a difference. -- Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-f2368404.html
-- Tim Bish
