This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 63fc1e07e814a5146731e9747345a4b6dc2919c8
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Feb 21 15:43:55 2022 +0100

    CAMEL-15562: further simplification of the API
    
    - standardize the offset header
    - fixed tests
---
 .../KafkaConsumerWithResumeRouteStrategyIT.java    |  5 +--
 .../org/apache/camel/ExchangeConstantProvider.java |  3 +-
 .../src/main/java/org/apache/camel/Exchange.java   |  1 +
 .../apache/camel/model/ProcessorDefinition.java    |  4 +--
 .../camel/processor/resume/NoOffsetException.java  | 41 ++++++++++++++++++++++
 .../camel/processor/resume/ResumableProcessor.java | 11 +++---
 .../org/apache/camel/reifier/ResumableReifier.java |  5 +--
 .../FileConsumerResumeFromOffsetStrategyTest.java  | 24 +++++++++++--
 .../file/FileConsumerResumeStrategyTest.java       | 15 ++++++--
 9 files changed, 87 insertions(+), 22 deletions(-)

diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
index 1f95f30..aeb5de3 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
 import org.apache.camel.Offset;
 import org.apache.camel.Resumable;
 import org.apache.camel.Service;
@@ -228,9 +229,9 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends 
BaseEmbeddedKafkaTes
                 from("kafka:" + TOPIC + "?groupId=" + TOPIC + 
"_GROUP&autoCommitIntervalMs=1000"
                      + "&autoOffsetReset=earliest&consumersCount=1")
                              .routeId("resume-strategy-route")
-                             .setHeader("CamelOffset",
+                             .setHeader(Exchange.OFFSET,
                                      constant(Resumables.of("key", 
RANDOM_VALUE)))
-                             
.resumable().header("CamelOffset").resumableStrategyRef("resumeStrategy")
+                             
.resumable().resumableStrategyRef("resumeStrategy")
                              .to("mock:result");
             }
         };
diff --git 
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
 
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
index a9e6e6f..b9ff62d 100644
--- 
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
+++ 
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
@@ -11,7 +11,7 @@ public class ExchangeConstantProvider {
 
     private static final Map<String, String> MAP;
     static {
-        Map<String, String> map = new HashMap<>(151);
+        Map<String, String> map = new HashMap<>(152);
         map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType");
         map.put("AGGREGATED_COLLECTION_GUARD", 
"CamelAggregatedCollectionGuard");
         map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy");
@@ -114,6 +114,7 @@ public class ExchangeConstantProvider {
         map.put("MULTICAST_COMPLETE", "CamelMulticastComplete");
         map.put("MULTICAST_INDEX", "CamelMulticastIndex");
         map.put("NOTIFY_EVENT", "CamelNotifyEvent");
+        map.put("OFFSET", "CamelOffset");
         map.put("ON_COMPLETION", "CamelOnCompletion");
         map.put("ON_COMPLETION_ROUTE_IDS", "CamelOnCompletionRouteIds");
         map.put("OVERRULE_FILE_NAME", "CamelOverruleFileName");
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java 
b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 7cce1f2..c44b73c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -194,6 +194,7 @@ public interface Exchange {
 
     String ON_COMPLETION = "CamelOnCompletion";
     String ON_COMPLETION_ROUTE_IDS = "CamelOnCompletionRouteIds";
+    String OFFSET = "CamelOffset";
     String OVERRULE_FILE_NAME = "CamelOverruleFileName";
 
     String PARENT_UNIT_OF_WORK = "CamelParentUnitOfWork";
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index cb0c74b..7769c3e 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3780,11 +3780,11 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * 
      * @return The expression to create the Resumable
      */
-    public ExpressionClause<ResumableDefinition> resumable() {
+    public ResumableDefinition resumable() {
         ResumableDefinition answer = new ResumableDefinition();
 
         addOutput(answer);
-        return createAndSetExpression(answer);
+        return answer;
     }
 
     // Properties
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/NoOffsetException.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/NoOffsetException.java
new file mode 100644
index 0000000..3621a70
--- /dev/null
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/NoOffsetException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+
+/**
+ * An exception thrown if no camel offset header could be found on the message.
+ */
+public class NoOffsetException extends RuntimeCamelException {
+
+    private final Exchange exchange;
+
+    public NoOffsetException(Exchange exchange) {
+        super("There was no " + Exchange.OFFSET + " header defined on the 
message exchange: " + exchange);
+        this.exchange = exchange;
+    }
+
+    /**
+     * The exchange which caused this failure
+     */
+    public Exchange getExchange() {
+        return exchange;
+    }
+}
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
index 35cf8af..4c64a1c 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
@@ -26,7 +26,6 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
-import org.apache.camel.Expression;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Resumable;
@@ -45,7 +44,6 @@ public class ResumableProcessor extends AsyncProcessorSupport
     private CamelContext camelContext;
     private ResumeStrategy resumeStrategy;
     private AsyncProcessor processor;
-    private final Expression offsetExpression;
     private String id;
     private String routeId;
 
@@ -75,17 +73,16 @@ public class ResumableProcessor extends 
AsyncProcessorSupport
         }
     }
 
-    public ResumableProcessor(Expression offsetExpression, ResumeStrategy 
resumeStrategy, Processor processor) {
+    public ResumableProcessor(ResumeStrategy resumeStrategy, Processor 
processor) {
         this.resumeStrategy = Objects.requireNonNull(resumeStrategy);
         this.processor = AsyncProcessorConverterHelper.convert(processor);
-        this.offsetExpression = offsetExpression;
 
         LOG.info("Enabling the resumable strategy of type: {}", 
resumeStrategy.getClass().getSimpleName());
     }
 
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
-        Object offset = exchange.getMessage().getHeader("CamelOffset");
+        Object offset = exchange.getMessage().getHeader(Exchange.OFFSET);
 
         if (offset instanceof Resumable) {
             Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
@@ -98,10 +95,10 @@ public class ResumableProcessor extends 
AsyncProcessorSupport
             return processor.process(exchange, target);
 
         } else {
+            exchange.setException(new NoOffsetException(exchange));
             LOG.warn("Cannot update the last offset because it's not 
available");
+            return true;
         }
-
-        return processor.process(exchange, callback);
     }
 
     @Override
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index 6eba53f..ecf626a 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.reifier;
 
-import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.ResumeStrategy;
 import org.apache.camel.Route;
@@ -41,9 +40,7 @@ public class ResumableReifier extends 
ExpressionReifier<ResumableDefinition> {
         resumeStrategy.start();
         route.setResumeStrategy(resumeStrategy);
 
-        Expression expression = createExpression(definition.getExpression());
-
-        return new ResumableProcessor(expression, resumeStrategy, 
childProcessor);
+        return new ResumableProcessor(resumeStrategy, childProcessor);
     }
 
     protected ResumeStrategy resolveResumeStrategy() {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index 2ef283c..fdf0315 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -55,7 +55,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends 
ContextTestSupport
         }
     }
 
-    @DisplayName("Tests whether we can resume from an offset")
+    @DisplayName("Tests whether it can resume from an offset")
     @Test
     public void testResumeFromOffset() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
@@ -63,7 +63,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends 
ContextTestSupport
 
         Map<String, Object> headers = new HashMap<>();
         headers.put(Exchange.FILE_NAME, "resume-from-offset.txt");
-        headers.put("CamelOffset", Resumables.of("resume-from-offset.txt", 
3L));
+        headers.put(Exchange.OFFSET, Resumables.of("resume-from-offset.txt", 
3L));
 
         template.sendBodyAndHeaders(fileUri("resumeOff"), "01234567890", 
headers);
 
@@ -71,6 +71,17 @@ public class FileConsumerResumeFromOffsetStrategyTest 
extends ContextTestSupport
         assertMockEndpointsSatisfied();
     }
 
+    @DisplayName("Tests whether it a missing offset causes a failure")
+    @Test
+    public void testMissingOffset() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceivedInAnyOrder("34567890");
+
+        template.sendBodyAndHeader(fileUri("resumeMissingOffset"), 
"01234567890", Exchange.FILE_NAME, "resume-from-offset.txt");
+
+        mock.assertIsNotSatisfied();
+    }
+
     @DisplayName("Tests whether we can start from the beginning (i.e.: no 
resume strategy)")
     @Test
     public void testNoResume() throws Exception {
@@ -92,7 +103,14 @@ public class FileConsumerResumeFromOffsetStrategyTest 
extends ContextTestSupport
                 bindToRegistry("resumeStrategy", new TestResumeStrategy());
 
                 from(fileUri("resumeOff?noop=true&recursive=true"))
-                        
.resumable().header("CamelOffset").resumableStrategyRef("resumeStrategy")
+                        .setHeader(Exchange.OFFSET,
+                                constant(Resumables.of("resume-none.txt", 3)))
+                        .resumable().resumableStrategyRef("resumeStrategy")
+                        .log("${body}")
+                        .convertBodyTo(String.class).to("mock:result");
+
+                from(fileUri("resumeMissingOffset?noop=true&recursive=true"))
+                        .resumable().resumableStrategyRef("resumeStrategy")
                         .log("${body}")
                         .convertBodyTo(String.class).to("mock:result");
 
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index e867cf2..5adb2dd 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -59,7 +59,7 @@ public class FileConsumerResumeStrategyTest extends 
ContextTestSupport {
     private static Map<String, Object> headerFor(int num) {
         String name = num + ".txt";
 
-        return Map.of(Exchange.FILE_NAME, name, "id", Resumables.of(name, 
num));
+        return Map.of(Exchange.FILE_NAME, name);
     }
 
     @Test
@@ -67,7 +67,6 @@ public class FileConsumerResumeStrategyTest extends 
ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceivedInAnyOrder("3", "4", "5", "6");
 
-
         template.sendBodyAndHeaders(fileUri("resume"), "0", headerFor(0));
         template.sendBodyAndHeaders(fileUri("resume"), "1", headerFor(1));
         template.sendBodyAndHeaders(fileUri("resume"), "2", headerFor(2));
@@ -80,6 +79,15 @@ public class FileConsumerResumeStrategyTest extends 
ContextTestSupport {
         assertMockEndpointsSatisfied();
     }
 
+    private void setOffset(Exchange exchange) {
+        String body = exchange.getMessage().getBody(String.class);
+
+        if (body != null) {
+            Integer num = Integer.valueOf(body);
+            exchange.getMessage().setHeader(Exchange.OFFSET, 
Resumables.of(body + ".txt", num));
+        }
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -89,7 +97,8 @@ public class FileConsumerResumeStrategyTest extends 
ContextTestSupport {
                 bindToRegistry("testResumeStrategy", new TestResumeStrategy());
 
                 from(fileUri("resume?noop=true&recursive=true"))
-                        
.resumable().header("id").resumableStrategyRef("testResumeStrategy")
+                        .process(e -> setOffset(e))
+                        .resumable().resumableStrategyRef("testResumeStrategy")
                         .convertBodyTo(String.class)
                         .to("mock:result");
             }

Reply via email to