This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4e1bd78bb25cb13e5ec066ce824a4b969549582a
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Feb 14 18:10:27 2022 +0100

    CAMEL-15562: fixed not starting strategies instances
---
 .../org/apache/camel/component/kafka/KafkaConsumer.java    | 14 +++++++++++++-
 .../kafka/consumer/support/ResumeStrategyFactory.java      |  3 +--
 .../KafkaConsumerWithResumeRouteStrategyIT.java            |  3 +--
 .../java/org/apache/camel/reifier/ResumableReifier.java    |  1 +
 4 files changed, 16 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 20226f5..eaefec2 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import org.apache.camel.Processor;
+import org.apache.camel.ResumeAware;
+import 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.support.DefaultConsumer;
@@ -35,7 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaConsumer extends DefaultConsumer {
+public class KafkaConsumer extends DefaultConsumer implements 
ResumeAware<KafkaConsumerResumeStrategy> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumer.class);
 
@@ -45,6 +47,7 @@ public class KafkaConsumer extends DefaultConsumer {
     private final List<KafkaFetchRecords> tasks = new ArrayList<>();
     private volatile boolean stopOffsetRepo;
     private PollExceptionStrategy pollExceptionStrategy;
+    private KafkaConsumerResumeStrategy resumeStrategy;
 
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -52,6 +55,15 @@ public class KafkaConsumer extends DefaultConsumer {
     }
 
     @Override
+    public void setResumeStrategy(KafkaConsumerResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
+    }
+
+    public KafkaConsumerResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
+    }
+
+    @Override
     protected void doBuild() throws Exception {
         super.doBuild();
         if (endpoint.getComponent().getPollExceptionStrategy() != null) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 2db61f1..d1656f86 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -56,8 +56,7 @@ public final class ResumeStrategyFactory {
 
     public static KafkaConsumerResumeStrategy newResumeStrategy(KafkaConsumer 
kafkaConsumer) {
         // When using resumable routes, which register the strategy via 
service, it takes priority over everything else
-        KafkaConsumerResumeStrategy resumableRouteStrategy
-                = 
kafkaConsumer.getEndpoint().getCamelContext().hasService(KafkaConsumerResumeStrategy.class);
+        KafkaConsumerResumeStrategy resumableRouteStrategy = 
kafkaConsumer.getResumeStrategy();
 
         if (resumableRouteStrategy != null) {
             return resumableRouteStrategy;
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
index 5b0e828..1f95f30 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -48,8 +48,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaConsumerWithResumeRouteStrategyIT extends 
BaseEmbeddedKafkaTestSupport {
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerWithResumeRouteStrategyIT.class);
-
     private static final String TOPIC = "resumable-route-tp";
+    private static final int RANDOM_VALUE = 
ThreadLocalRandom.current().nextInt(1, 1000);
 
     @EndpointInject("mock:result")
     private MockEndpoint result;
@@ -57,7 +57,6 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends 
BaseEmbeddedKafkaTes
     @BindToRegistry("resumeStrategy")
     private TestKafkaConsumerResumeStrategy resumeStrategy;
     private CountDownLatch messagesLatch;
-    private static final int RANDOM_VALUE = 
ThreadLocalRandom.current().nextInt(1, 1000);
     private KafkaProducer<Object, Object> producer;
 
     private static class TestKafkaConsumerResumeStrategy
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index d6f683e..6eba53f 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -38,6 +38,7 @@ public class ResumableReifier extends 
ExpressionReifier<ResumableDefinition> {
 
         ResumeStrategy resumeStrategy = resolveResumeStrategy();
         ObjectHelper.notNull(resumeStrategy, "resumeStrategy", definition);
+        resumeStrategy.start();
         route.setResumeStrategy(resumeStrategy);
 
         Expression expression = createExpression(definition.getExpression());

Reply via email to