[
https://issues.apache.org/jira/browse/CAMEL-17925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen updated CAMEL-17925:
--------------------------------
Fix Version/s: 3.17.0
> camel-kafka: "breakOnFirstError" option is not respected
> --------------------------------------------------------
>
> Key: CAMEL-17925
> URL: https://issues.apache.org/jira/browse/CAMEL-17925
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.12.0, 3.14.2, 3.15.0, 3.16.0
> Reporter: Espen Andreassen
> Priority: Major
> Fix For: 3.17.0
>
>
> *Description:*
> Errors thrown in kafka-consumer routes with "breakOnFirstError" enabled does
> not trigger reprocessing in 3.12.0 and forward, as if option
> "breakOnFirstError" is not respected.
> *Reproduction:*
> * Configure a route consuming from kafka with option "breakOnFirstError"
> enabled
> * Trigger an exception in route
> *Expected result:*
> * Should break out, seek back to offset of the message that caused the
> failure and then reattempt to process the message (as per the
> [docs|[https://camel.apache.org/components/3.14.x/kafka-component.html]]: )
> *Actual result:*
> * Consumer continuous to next message
> I've created a simplified test demonstrating the issue. The test reports OK
> in 3.11.6, but not in version 3.12.0 through 3.16.0:
> {code:java}
> package no.statnett.integration.tdm.eventhandler;import
> org.apache.camel.CamelContext;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.AdviceWithRouteBuilder;
> import org.apache.camel.builder.RouteBuilder;
> import
> org.apache.camel.builder.endpoint.dsl.DirectEndpointBuilderFactory.DirectEndpointBuilder;
> import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
> import org.apache.camel.test.spring.junit5.UseAdviceWith;
> import org.junit.jupiter.api.Test;
> import org.springframework.beans.factory.annotation.Autowired;
> import org.springframework.boot.test.context.SpringBootTest;
> import org.springframework.kafka.test.context.EmbeddedKafka;import static
> org.apache.camel.builder.endpoint.StaticEndpointBuilders.direct;
> import static org.apache.camel.builder.endpoint.StaticEndpointBuilders.kafka;
> import static org.awaitility.Awaitility.await;@CamelSpringBootTest
> @SpringBootTest(classes = {Application.class})
> @UseAdviceWith
> @EmbeddedKafka(controlledShutdown = true, partitions = 1)
> public class AbstractApplicationTest {
> public static final String TOPIC_NAME = "topicName";
> @Autowired
> protected ProducerTemplate kafkaInputProducer; @Autowired
> protected CamelContext camelContext; private int consumptionCounter =
> 0; @Test
> public void shouldReconsumeFromKafkaOnError() throws Exception {
> DirectEndpointBuilder mockKafkaProducer = direct("mockKafkaProducer");
>
> kafkaInputProducer.setDefaultEndpoint(mockKafkaProducer.resolve(camelContext));
> AdviceWithRouteBuilder.addRoutes(camelContext, builder -> {
> createProducerRoute(mockKafkaProducer, builder);
> createConsumerRoute(builder);
> }); camelContext.start();
> kafkaInputProducer.sendBody("A TEST MESSAGE");
> // Counter is only incremented once, and test fails
> await().until(() -> consumptionCounter > 1);
> } private void createConsumerRoute(RouteBuilder builder) {
> builder.from(kafka(TOPIC_NAME)
> .breakOnFirstError(true)
> .autoCommitEnable(false)
> .autoOffsetReset("earliest")
> .seekTo("beginning")
> .allowManualCommit(true)
> )
> .log("CONSUMED_RECORD")
> .process(e -> {
> consumptionCounter++;
> throw new RuntimeException("ERROR_TRIGGER_BY_TEST");
> });
> } private void createProducerRoute(DirectEndpointBuilder
> mockKafkaProducer, RouteBuilder builder) {
> builder.from(mockKafkaProducer)
> .to(kafka(TOPIC_NAME));
> }
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)