[ 
https://issues.apache.org/jira/browse/CAMEL-17925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated CAMEL-17925:
--------------------------------
    Fix Version/s: 3.17.0

> camel-kafka: "breakOnFirstError" option is not respected
> --------------------------------------------------------
>
>                 Key: CAMEL-17925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-17925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.12.0, 3.14.2, 3.15.0, 3.16.0
>            Reporter: Espen Andreassen
>            Priority: Major
>             Fix For: 3.17.0
>
>
> *Description:*
> Errors thrown in kafka-consumer routes with "breakOnFirstError" enabled does 
> not trigger reprocessing in 3.12.0 and forward, as if option 
> "breakOnFirstError" is not respected.
> *Reproduction:*
>  * Configure a route consuming from kafka with option "breakOnFirstError" 
> enabled
>  * Trigger an exception in route
> *Expected result:*
>  * Should break out, seek back to offset of the message that caused the 
> failure and then reattempt to process the message (as per the 
> [docs|[https://camel.apache.org/components/3.14.x/kafka-component.html]]: )
> *Actual result:*
>  * Consumer continuous to next message
> I've created  a simplified test demonstrating the issue. The test reports OK 
> in 3.11.6, but not in version 3.12.0 through 3.16.0:
> {code:java}
> package no.statnett.integration.tdm.eventhandler;import 
> org.apache.camel.CamelContext;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.AdviceWithRouteBuilder;
> import org.apache.camel.builder.RouteBuilder;
> import 
> org.apache.camel.builder.endpoint.dsl.DirectEndpointBuilderFactory.DirectEndpointBuilder;
> import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
> import org.apache.camel.test.spring.junit5.UseAdviceWith;
> import org.junit.jupiter.api.Test;
> import org.springframework.beans.factory.annotation.Autowired;
> import org.springframework.boot.test.context.SpringBootTest;
> import org.springframework.kafka.test.context.EmbeddedKafka;import static 
> org.apache.camel.builder.endpoint.StaticEndpointBuilders.direct;
> import static org.apache.camel.builder.endpoint.StaticEndpointBuilders.kafka;
> import static org.awaitility.Awaitility.await;@CamelSpringBootTest
> @SpringBootTest(classes = {Application.class})
> @UseAdviceWith
> @EmbeddedKafka(controlledShutdown = true, partitions = 1)
> public class AbstractApplicationTest {
>     public static final String TOPIC_NAME = "topicName";
>     @Autowired
>     protected ProducerTemplate kafkaInputProducer;    @Autowired
>     protected CamelContext camelContext;    private int consumptionCounter = 
> 0;    @Test
>     public void shouldReconsumeFromKafkaOnError() throws Exception {
>         DirectEndpointBuilder mockKafkaProducer = direct("mockKafkaProducer");
>         
> kafkaInputProducer.setDefaultEndpoint(mockKafkaProducer.resolve(camelContext));
>         AdviceWithRouteBuilder.addRoutes(camelContext, builder -> {
>             createProducerRoute(mockKafkaProducer, builder);
>             createConsumerRoute(builder);
>         });        camelContext.start();
>         kafkaInputProducer.sendBody("A TEST MESSAGE");
>         // Counter is only incremented once, and test fails
>         await().until(() -> consumptionCounter > 1);
>     }    private void createConsumerRoute(RouteBuilder builder) {
>         builder.from(kafka(TOPIC_NAME)
>                         .breakOnFirstError(true)
>                         .autoCommitEnable(false)
>                         .autoOffsetReset("earliest")
>                         .seekTo("beginning")
>                         .allowManualCommit(true)
>                 )
>                 .log("CONSUMED_RECORD")
>                 .process(e -> {
>                     consumptionCounter++;
>                     throw new RuntimeException("ERROR_TRIGGER_BY_TEST");
>                 });
>     }    private void createProducerRoute(DirectEndpointBuilder 
> mockKafkaProducer, RouteBuilder builder) {
>         builder.from(mockKafkaProducer)
>                 .to(kafka(TOPIC_NAME));
>     }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to