[ 
https://issues.apache.org/jira/browse/CAMEL-17925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Espen Andreassen updated CAMEL-17925:
-------------------------------------
    Description: 
*Description:*
Errors thrown in kafka-consumer routes with "breakOnFirstError" enabled does 
not trigger reprocessing in 3.12.0 and forward, as if option 
"breakOnFirstError" is not respected.

*Reproduction:*
 * Configure a route consuming from kafka with option "breakOnFirstError" 
enabled
 * Trigger an exception in route

*Expected result:*
 * Should break out, seek back to offset of the message that caused the failure 
and then reattempt to process the message (as per the 
[docs|[https://camel.apache.org/components/3.14.x/kafka-component.html]]: )

*Actual result:*
 * Consumer continuous to next message



*GitHub project reproducing the issue*

[https://github.com/espeandr/camel-kafka-break-on-error-demo]

Test-runs
3.16.0 (failing): 
[https://github.com/espeandr/camel-kafka-break-on-error-demo/runs/5886049034?check_suite_focus=true]
3.11.6 (succeeding): 
https://github.com/espeandr/camel-kafka-break-on-error-demo/runs/5886221956?check_suite_focus=true

  was:
*Description:*
Errors thrown in kafka-consumer routes with "breakOnFirstError" enabled does 
not trigger reprocessing in 3.12.0 and forward, as if option 
"breakOnFirstError" is not respected.

*Reproduction:*
 * Configure a route consuming from kafka with option "breakOnFirstError" 
enabled
 * Trigger an exception in route

*Expected result:*
 * Should break out, seek back to offset of the message that caused the failure 
and then reattempt to process the message (as per the 
[docs|[https://camel.apache.org/components/3.14.x/kafka-component.html]]: )

*Actual result:*
 * Consumer continuous to next message



I've created  a simplified test demonstrating the issue. The test reports OK in 
3.11.6, but not in version 3.12.0 through 3.16.0:
{code:java}
package no.statnett.integration.tdm.eventhandler;import 
org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import 
org.apache.camel.builder.endpoint.dsl.DirectEndpointBuilderFactory.DirectEndpointBuilder;
import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
import org.apache.camel.test.spring.junit5.UseAdviceWith;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;import static 
org.apache.camel.builder.endpoint.StaticEndpointBuilders.direct;
import static org.apache.camel.builder.endpoint.StaticEndpointBuilders.kafka;
import static org.awaitility.Awaitility.await;@CamelSpringBootTest
@SpringBootTest(classes = {Application.class})
@UseAdviceWith
@EmbeddedKafka(controlledShutdown = true, partitions = 1)
public class AbstractApplicationTest {
    public static final String TOPIC_NAME = "topicName";
    @Autowired
    protected ProducerTemplate kafkaInputProducer;    @Autowired
    protected CamelContext camelContext;    private int consumptionCounter = 0; 
   @Test
    public void shouldReconsumeFromKafkaOnError() throws Exception {
        DirectEndpointBuilder mockKafkaProducer = direct("mockKafkaProducer");
        
kafkaInputProducer.setDefaultEndpoint(mockKafkaProducer.resolve(camelContext)); 
       AdviceWithRouteBuilder.addRoutes(camelContext, builder -> {
            createProducerRoute(mockKafkaProducer, builder);
            createConsumerRoute(builder);
        });        camelContext.start();
        kafkaInputProducer.sendBody("A TEST MESSAGE");
        // Counter is only incremented once, and test fails
        await().until(() -> consumptionCounter > 1);
    }    private void createConsumerRoute(RouteBuilder builder) {
        builder.from(kafka(TOPIC_NAME)
                        .breakOnFirstError(true)
                        .autoCommitEnable(false)
                        .autoOffsetReset("earliest")
                        .seekTo("beginning")
                        .allowManualCommit(true)
                )
                .log("CONSUMED_RECORD")
                .process(e -> {
                    consumptionCounter++;
                    throw new RuntimeException("ERROR_TRIGGER_BY_TEST");
                });
    }    private void createProducerRoute(DirectEndpointBuilder 
mockKafkaProducer, RouteBuilder builder) {
        builder.from(mockKafkaProducer)
                .to(kafka(TOPIC_NAME));
    }
}
{code}
 

 


> camel-kafka: "breakOnFirstError" option is not respected
> --------------------------------------------------------
>
>                 Key: CAMEL-17925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-17925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.12.0, 3.14.2, 3.15.0, 3.16.0
>            Reporter: Espen Andreassen
>            Priority: Major
>             Fix For: 3.17.0
>
>
> *Description:*
> Errors thrown in kafka-consumer routes with "breakOnFirstError" enabled does 
> not trigger reprocessing in 3.12.0 and forward, as if option 
> "breakOnFirstError" is not respected.
> *Reproduction:*
>  * Configure a route consuming from kafka with option "breakOnFirstError" 
> enabled
>  * Trigger an exception in route
> *Expected result:*
>  * Should break out, seek back to offset of the message that caused the 
> failure and then reattempt to process the message (as per the 
> [docs|[https://camel.apache.org/components/3.14.x/kafka-component.html]]: )
> *Actual result:*
>  * Consumer continuous to next message
> *GitHub project reproducing the issue*
> [https://github.com/espeandr/camel-kafka-break-on-error-demo]
> Test-runs
> 3.16.0 (failing): 
> [https://github.com/espeandr/camel-kafka-break-on-error-demo/runs/5886049034?check_suite_focus=true]
> 3.11.6 (succeeding): 
> https://github.com/espeandr/camel-kafka-break-on-error-demo/runs/5886221956?check_suite_focus=true



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to