[
https://issues.apache.org/jira/browse/CAMEL-17925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Espen Andreassen updated CAMEL-17925:
-------------------------------------
Description:
*Description:*
Errors thrown in kafka-consumer routes with "breakOnFirstError" enabled does
not trigger reprocessing in 3.12.0 and forward, as if option
"breakOnFirstError" is not respected.
*Reproduction:*
* Configure a route consuming from kafka with option "breakOnFirstError"
enabled
* Trigger an exception in route
*Expected result:*
* Should break out, seek back to offset of the message that caused the failure
and then reattempt to process the message (as per the
[docs|[https://camel.apache.org/components/3.14.x/kafka-component.html]]: )
*Actual result:*
* Consumer continuous to next message
*GitHub project reproducing the issue*
[https://github.com/espeandr/camel-kafka-break-on-error-demo]
Test-runs
3.16.0 (failing):
[https://github.com/espeandr/camel-kafka-break-on-error-demo/runs/5886049034?check_suite_focus=true]
3.11.6 (succeeding):
https://github.com/espeandr/camel-kafka-break-on-error-demo/runs/5886221956?check_suite_focus=true
was:
*Description:*
Errors thrown in kafka-consumer routes with "breakOnFirstError" enabled does
not trigger reprocessing in 3.12.0 and forward, as if option
"breakOnFirstError" is not respected.
*Reproduction:*
* Configure a route consuming from kafka with option "breakOnFirstError"
enabled
* Trigger an exception in route
*Expected result:*
* Should break out, seek back to offset of the message that caused the failure
and then reattempt to process the message (as per the
[docs|[https://camel.apache.org/components/3.14.x/kafka-component.html]]: )
*Actual result:*
* Consumer continuous to next message
I've created a simplified test demonstrating the issue. The test reports OK in
3.11.6, but not in version 3.12.0 through 3.16.0:
{code:java}
package no.statnett.integration.tdm.eventhandler;import
org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import
org.apache.camel.builder.endpoint.dsl.DirectEndpointBuilderFactory.DirectEndpointBuilder;
import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
import org.apache.camel.test.spring.junit5.UseAdviceWith;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;import static
org.apache.camel.builder.endpoint.StaticEndpointBuilders.direct;
import static org.apache.camel.builder.endpoint.StaticEndpointBuilders.kafka;
import static org.awaitility.Awaitility.await;@CamelSpringBootTest
@SpringBootTest(classes = {Application.class})
@UseAdviceWith
@EmbeddedKafka(controlledShutdown = true, partitions = 1)
public class AbstractApplicationTest {
public static final String TOPIC_NAME = "topicName";
@Autowired
protected ProducerTemplate kafkaInputProducer; @Autowired
protected CamelContext camelContext; private int consumptionCounter = 0;
@Test
public void shouldReconsumeFromKafkaOnError() throws Exception {
DirectEndpointBuilder mockKafkaProducer = direct("mockKafkaProducer");
kafkaInputProducer.setDefaultEndpoint(mockKafkaProducer.resolve(camelContext));
AdviceWithRouteBuilder.addRoutes(camelContext, builder -> {
createProducerRoute(mockKafkaProducer, builder);
createConsumerRoute(builder);
}); camelContext.start();
kafkaInputProducer.sendBody("A TEST MESSAGE");
// Counter is only incremented once, and test fails
await().until(() -> consumptionCounter > 1);
} private void createConsumerRoute(RouteBuilder builder) {
builder.from(kafka(TOPIC_NAME)
.breakOnFirstError(true)
.autoCommitEnable(false)
.autoOffsetReset("earliest")
.seekTo("beginning")
.allowManualCommit(true)
)
.log("CONSUMED_RECORD")
.process(e -> {
consumptionCounter++;
throw new RuntimeException("ERROR_TRIGGER_BY_TEST");
});
} private void createProducerRoute(DirectEndpointBuilder
mockKafkaProducer, RouteBuilder builder) {
builder.from(mockKafkaProducer)
.to(kafka(TOPIC_NAME));
}
}
{code}
> camel-kafka: "breakOnFirstError" option is not respected
> --------------------------------------------------------
>
> Key: CAMEL-17925
> URL: https://issues.apache.org/jira/browse/CAMEL-17925
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.12.0, 3.14.2, 3.15.0, 3.16.0
> Reporter: Espen Andreassen
> Priority: Major
> Fix For: 3.17.0
>
>
> *Description:*
> Errors thrown in kafka-consumer routes with "breakOnFirstError" enabled does
> not trigger reprocessing in 3.12.0 and forward, as if option
> "breakOnFirstError" is not respected.
> *Reproduction:*
> * Configure a route consuming from kafka with option "breakOnFirstError"
> enabled
> * Trigger an exception in route
> *Expected result:*
> * Should break out, seek back to offset of the message that caused the
> failure and then reattempt to process the message (as per the
> [docs|[https://camel.apache.org/components/3.14.x/kafka-component.html]]: )
> *Actual result:*
> * Consumer continuous to next message
> *GitHub project reproducing the issue*
> [https://github.com/espeandr/camel-kafka-break-on-error-demo]
> Test-runs
> 3.16.0 (failing):
> [https://github.com/espeandr/camel-kafka-break-on-error-demo/runs/5886049034?check_suite_focus=true]
> 3.11.6 (succeeding):
> https://github.com/espeandr/camel-kafka-break-on-error-demo/runs/5886221956?check_suite_focus=true
--
This message was sent by Atlassian Jira
(v8.20.1#820001)