[ 
https://issues.apache.org/jira/browse/CAMEL-17347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461414#comment-17461414
 ] 

Andrey Filippov edited comment on CAMEL-17347 at 12/19/21, 8:52 AM:
--------------------------------------------------------------------

[~davsclaus] thanks for the reply. Yes, 
org.apache.camel.component.springrabbit.SpringRabbitPollingConsumer uses 
org.springframework.amqp.rabbit.core.RabbitTemplate but the last one has a flag 
called 'channelTransacted' - [1]. And if I mark a route with <transacted /> and 
add 

<bean id="rabbitTxManager"
        
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    </bean>

to the scope the behavior does not change - messages are read from a queue 
immediately without intermediate acks. At the same time If I do the same tx 
stuff with Artemis, messages will not be marked as delivered until the end of 
the route. I might be wrong but Camel could use 'channelTransacted' flag in the 
polling consumer.

 

 

[1] [https://docs.spring.io/spring-amqp/reference/html/amqp.html#transactions]


was (Author: afilippov):
[~davsclaus] thanks for the reply. Yes, 
org.apache.camel.component.springrabbit.SpringRabbitPollingConsumer uses 
org.springframework.amqp.rabbit.core.RabbitTemplate but the last one has a flag 
called 'channelTransacted' - [1]. And if I mark a route with <transacted /> and 
add 

<bean id="rabbitTxManager"
        
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    </bean>

to the scope the behavior does not change - messages are read from a queue 
without acks. At the same time If I do the same tx stuff with Artemis messages 
will not be marked as delivered until the end of the route. I might be wrong 
but Camel could use 'channelTransacted' flag in the polling consumer.

 

 

[1] https://docs.spring.io/spring-amqp/reference/html/amqp.html#transactions

> Wrong polling camel-spring-rabbitmq consumer onexception behavior 
> ------------------------------------------------------------------
>
>                 Key: CAMEL-17347
>                 URL: https://issues.apache.org/jira/browse/CAMEL-17347
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-spring
>    Affects Versions: 3.11.3
>         Environment: Java 11, Karaf 4.2.9
>            Reporter: Andrey Filippov
>            Priority: Major
>
> Hello,
> I am checking 2 simple routes. First one is:
> {{<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"}}
> {{    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{    xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 
> blueprint-1.0.0.xsd ">}}
> {{    <bean id="rabbitConnectionFactory" 
> class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">}}
> {{      <property name="uri" value="amqp://localhost:5672"/>}}
> {{      <property name="username" value="guest"/>}}
> {{      <property name="password" value="guest"/>}}
> {{    </bean>}}{{    <camelContext}}
> {{        xmlns="http://camel.apache.org/schema/blueprint"}}
> {{        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{        xsi:schemaLocation="http://camel.apache.org/schema/blueprint 
> camel-blueprint-3.4.5.xsd">}}{{        <route id="consume">}}{{            
> <from uri="spring-rabbitmq:ex1?queues=bar&amp;prefetchCount=1" />}}
> {{            <onException>}}
> {{                <exception>java.lang.Exception</exception>}}
> {{                <redeliveryPolicy maximumRedeliveries="2" 
> redeliveryDelay="10000" retryAttemptedLogLevel="WARN"/>}}
> {{                <handled><constant>true</constant></handled>}}
> {{                <to uri="direct:error" />}}
> {{            </onException>}}{{            <log message=">>>Consumer 
> start"/>}}{{            <throwException 
> exceptionType="java.lang.NullPointerException"}}
> {{                                message="Not supported, sorjan"/>}}{{       
>      <log message=">>>Body:  ${body}"/>}}
> {{            <log message=">>>Headers:  ${headers}"/>}}
> {{            <when>}}
> {{                <simple>${body} == null</simple>}}
> {{                <log message=">>>@@@@@@@@@@@@@@@@@@@@@@ Body is 
> null!!!!"/>}}
> {{            </when>}}
> {{            }}
> {{        </route>}}{{        <route>}}
> {{            <from uri="direct:error" />}}
> {{            <log message=">>>Error happens body:  ${body}"/>}}
> {{        </route>}}{{    </camelContext>}}
> {{</blueprint>}}
> In this case a message from a queue is marked unacknowledged until the very 
> end - after all redelivery attempts have fired. By the other hand, if I use 
> polling consumer - like this:
> {{<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"}}
> {{    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{    xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 
> blueprint-1.0.0.xsd ">}}
> {{    <bean id="rabbitConnectionFactory" 
> class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">}}
> {{      <property name="uri" value="amqp://localhost:5672"/>}}
> {{      <property name="username" value="guest"/>}}
> {{      <property name="password" value="guest"/>}}
> {{    </bean>}}{{    <camelContext}}
> {{        xmlns="http://camel.apache.org/schema/blueprint"}}
> {{        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"}}
> {{        xsi:schemaLocation="http://camel.apache.org/schema/blueprint 
> camel-blueprint-3.4.5.xsd">}}{{        <route id="consume">}}
> {{            <from uri="timer:consume?period=60000" />}}
> {{            <onException>}}
> {{                <exception>java.lang.Exception</exception>}}
> {{                <redeliveryPolicy maximumRedeliveries="2" 
> redeliveryDelay="10000" retryAttemptedLogLevel="WARN"/>}}
> {{                <handled><constant>true</constant></handled>}}
> {{                <to uri="direct:error" />}}
> {{            </onException>}}
> {{            <log message=">>>Consumer start"/>}}{{            <pollEnrich>}}
> {{                
> <constant>spring-rabbitmq:ex1?queues=bar&amp;prefetchCount=1&amp;acknowledgeMode=MANUAL</constant>}}
> {{            </pollEnrich>}}{{            <throwException 
> exceptionType="java.lang.NullPointerException"}}
> {{                                message="Not supported, sorjan"/>}}
> {{            <log message=">>>Body:  ${body}"/>}}
> {{            <log message=">>>Headers:  ${headers}"/>}}
> {{            <when>}}
> {{                <simple>${body} == null</simple>}}
> {{                <log message=">>>@@@@@@@@@@@@@@@@@@@@@@ Body is 
> null!!!!"/>}}
> {{            </when>}}
> {{            }}
> {{        </route>}}{{        <route>}}
> {{            <from uri="direct:error" />}}
> {{            <log message=">>>Error happens body:  ${body}"/>}}
> {{        </route>}}{{    </camelContext>}}
> {{</blueprint>}}
> In this case a message is disappeared from a queue immediately after 
> pollEnrich without intermediate unacknowledged mark. 
> I think this behavior is not 100% correct. 
> P.s. Sorry - I used a wrong component in this ticket description cause 
> camel-spring-rabbitmq is not in the list.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to