Espen Andreassen created CAMEL-17925:
----------------------------------------

             Summary: camel-kafka: "breakOnFirstError" option is not respected
                 Key: CAMEL-17925
                 URL: https://issues.apache.org/jira/browse/CAMEL-17925
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
    Affects Versions: 3.16.0, 3.15.0, 3.14.2, 3.12.0
            Reporter: Espen Andreassen


*Description:*
Errors thrown in kafka-consumer routes with "breakOnFirstError" enabled does 
not trigger reprocessing in 3.12.0 and forward, as if option 
"breakOnFirstError" is not respected.

*Reproduction:*
 * Configure a route consuming from kafka with option "breakOnFirstError" 
enabled
 * Trigger an exception in route

*Expected result:*
 * Should break out, seek back to offset of the message that caused the failure 
and then reattempt to process the message (as per the 
[docs|[https://camel.apache.org/components/3.14.x/kafka-component.html]]: )

*Actual result:*
 * Consumer continuous to next message



I've created  a simplified test demonstrating the issue. The test reports OK in 
3.11.6, but not in version 3.12.0 through 3.16.0:
{code:java}
package no.statnett.integration.tdm.eventhandler;import 
org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import 
org.apache.camel.builder.endpoint.dsl.DirectEndpointBuilderFactory.DirectEndpointBuilder;
import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
import org.apache.camel.test.spring.junit5.UseAdviceWith;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;import static 
org.apache.camel.builder.endpoint.StaticEndpointBuilders.direct;
import static org.apache.camel.builder.endpoint.StaticEndpointBuilders.kafka;
import static org.awaitility.Awaitility.await;@CamelSpringBootTest
@SpringBootTest(classes = {Application.class})
@UseAdviceWith
@EmbeddedKafka(controlledShutdown = true, partitions = 1)
public class AbstractApplicationTest {
    public static final String TOPIC_NAME = "topicName";
    @Autowired
    protected ProducerTemplate kafkaInputProducer;    @Autowired
    protected CamelContext camelContext;    private int consumptionCounter = 0; 
   @Test
    public void shouldReconsumeFromKafkaOnError() throws Exception {
        DirectEndpointBuilder mockKafkaProducer = direct("mockKafkaProducer");
        
kafkaInputProducer.setDefaultEndpoint(mockKafkaProducer.resolve(camelContext)); 
       AdviceWithRouteBuilder.addRoutes(camelContext, builder -> {
            createProducerRoute(mockKafkaProducer, builder);
            createConsumerRoute(builder);
        });        camelContext.start();
        kafkaInputProducer.sendBody("A TEST MESSAGE");
        // Counter is only incremented once, and test fails
        await().until(() -> consumptionCounter > 1);
    }    private void createConsumerRoute(RouteBuilder builder) {
        builder.from(kafka(TOPIC_NAME)
                        .breakOnFirstError(true)
                        .autoCommitEnable(false)
                        .autoOffsetReset("earliest")
                        .seekTo("beginning")
                        .allowManualCommit(true)
                )
                .log("CONSUMED_RECORD")
                .process(e -> {
                    consumptionCounter++;
                    throw new RuntimeException("ERROR_TRIGGER_BY_TEST");
                });
    }    private void createProducerRoute(DirectEndpointBuilder 
mockKafkaProducer, RouteBuilder builder) {
        builder.from(mockKafkaProducer)
                .to(kafka(TOPIC_NAME));
    }
}
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to