This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4e1bd78bb25cb13e5ec066ce824a4b969549582a Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Mon Feb 14 18:10:27 2022 +0100 CAMEL-15562: fixed not starting strategies instances --- .../org/apache/camel/component/kafka/KafkaConsumer.java | 14 +++++++++++++- .../kafka/consumer/support/ResumeStrategyFactory.java | 3 +-- .../KafkaConsumerWithResumeRouteStrategyIT.java | 3 +-- .../java/org/apache/camel/reifier/ResumableReifier.java | 1 + 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 20226f5..eaefec2 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.camel.Processor; +import org.apache.camel.ResumeAware; +import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy; import org.apache.camel.spi.StateRepository; import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler; import org.apache.camel.support.DefaultConsumer; @@ -35,7 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaConsumer extends DefaultConsumer { +public class KafkaConsumer extends DefaultConsumer implements ResumeAware<KafkaConsumerResumeStrategy> { private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class); @@ -45,6 +47,7 @@ public class KafkaConsumer extends DefaultConsumer { private final List<KafkaFetchRecords> tasks = new ArrayList<>(); private volatile boolean stopOffsetRepo; private PollExceptionStrategy pollExceptionStrategy; + private KafkaConsumerResumeStrategy resumeStrategy; public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -52,6 +55,15 @@ public class KafkaConsumer extends DefaultConsumer { } @Override + public void setResumeStrategy(KafkaConsumerResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } + + public KafkaConsumerResumeStrategy getResumeStrategy() { + return resumeStrategy; + } + + @Override protected void doBuild() throws Exception { super.doBuild(); if (endpoint.getComponent().getPollExceptionStrategy() != null) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java index 2db61f1..d1656f86 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java @@ -56,8 +56,7 @@ public final class ResumeStrategyFactory { public static KafkaConsumerResumeStrategy newResumeStrategy(KafkaConsumer kafkaConsumer) { // When using resumable routes, which register the strategy via service, it takes priority over everything else - KafkaConsumerResumeStrategy resumableRouteStrategy - = kafkaConsumer.getEndpoint().getCamelContext().hasService(KafkaConsumerResumeStrategy.class); + KafkaConsumerResumeStrategy resumableRouteStrategy = kafkaConsumer.getResumeStrategy(); if (resumableRouteStrategy != null) { return resumableRouteStrategy; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java index 5b0e828..1f95f30 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java @@ -48,8 +48,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport { private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerWithResumeRouteStrategyIT.class); - private static final String TOPIC = "resumable-route-tp"; + private static final int RANDOM_VALUE = ThreadLocalRandom.current().nextInt(1, 1000); @EndpointInject("mock:result") private MockEndpoint result; @@ -57,7 +57,6 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes @BindToRegistry("resumeStrategy") private TestKafkaConsumerResumeStrategy resumeStrategy; private CountDownLatch messagesLatch; - private static final int RANDOM_VALUE = ThreadLocalRandom.current().nextInt(1, 1000); private KafkaProducer<Object, Object> producer; private static class TestKafkaConsumerResumeStrategy diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java index d6f683e..6eba53f 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java @@ -38,6 +38,7 @@ public class ResumableReifier extends ExpressionReifier<ResumableDefinition> { ResumeStrategy resumeStrategy = resolveResumeStrategy(); ObjectHelper.notNull(resumeStrategy, "resumeStrategy", definition); + resumeStrategy.start(); route.setResumeStrategy(resumeStrategy); Expression expression = createExpression(definition.getExpression());
