This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 63fc1e07e814a5146731e9747345a4b6dc2919c8 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Mon Feb 21 15:43:55 2022 +0100 CAMEL-15562: further simplification of the API - standardize the offset header - fixed tests --- .../KafkaConsumerWithResumeRouteStrategyIT.java | 5 +-- .../org/apache/camel/ExchangeConstantProvider.java | 3 +- .../src/main/java/org/apache/camel/Exchange.java | 1 + .../apache/camel/model/ProcessorDefinition.java | 4 +-- .../camel/processor/resume/NoOffsetException.java | 41 ++++++++++++++++++++++ .../camel/processor/resume/ResumableProcessor.java | 11 +++--- .../org/apache/camel/reifier/ResumableReifier.java | 5 +-- .../FileConsumerResumeFromOffsetStrategyTest.java | 24 +++++++++++-- .../file/FileConsumerResumeStrategyTest.java | 15 ++++++-- 9 files changed, 87 insertions(+), 22 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java index 1f95f30..aeb5de3 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.BindToRegistry; import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; import org.apache.camel.Offset; import org.apache.camel.Resumable; import org.apache.camel.Service; @@ -228,9 +229,9 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes from("kafka:" + TOPIC + "?groupId=" + TOPIC + "_GROUP&autoCommitIntervalMs=1000" + "&autoOffsetReset=earliest&consumersCount=1") .routeId("resume-strategy-route") - .setHeader("CamelOffset", + .setHeader(Exchange.OFFSET, constant(Resumables.of("key", RANDOM_VALUE))) - .resumable().header("CamelOffset").resumableStrategyRef("resumeStrategy") + .resumable().resumableStrategyRef("resumeStrategy") .to("mock:result"); } }; diff --git a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java index a9e6e6f..b9ff62d 100644 --- a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java +++ b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java @@ -11,7 +11,7 @@ public class ExchangeConstantProvider { private static final Map<String, String> MAP; static { - Map<String, String> map = new HashMap<>(151); + Map<String, String> map = new HashMap<>(152); map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType"); map.put("AGGREGATED_COLLECTION_GUARD", "CamelAggregatedCollectionGuard"); map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy"); @@ -114,6 +114,7 @@ public class ExchangeConstantProvider { map.put("MULTICAST_COMPLETE", "CamelMulticastComplete"); map.put("MULTICAST_INDEX", "CamelMulticastIndex"); map.put("NOTIFY_EVENT", "CamelNotifyEvent"); + map.put("OFFSET", "CamelOffset"); map.put("ON_COMPLETION", "CamelOnCompletion"); map.put("ON_COMPLETION_ROUTE_IDS", "CamelOnCompletionRouteIds"); map.put("OVERRULE_FILE_NAME", "CamelOverruleFileName"); diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index 7cce1f2..c44b73c 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -194,6 +194,7 @@ public interface Exchange { String ON_COMPLETION = "CamelOnCompletion"; String ON_COMPLETION_ROUTE_IDS = "CamelOnCompletionRouteIds"; + String OFFSET = "CamelOffset"; String OVERRULE_FILE_NAME = "CamelOverruleFileName"; String PARENT_UNIT_OF_WORK = "CamelParentUnitOfWork"; diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java index cb0c74b..7769c3e 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -3780,11 +3780,11 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * * @return The expression to create the Resumable */ - public ExpressionClause<ResumableDefinition> resumable() { + public ResumableDefinition resumable() { ResumableDefinition answer = new ResumableDefinition(); addOutput(answer); - return createAndSetExpression(answer); + return answer; } // Properties diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/NoOffsetException.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/NoOffsetException.java new file mode 100644 index 0000000..3621a70 --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/NoOffsetException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor.resume; + +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeCamelException; + +/** + * An exception thrown if no camel offset header could be found on the message. + */ +public class NoOffsetException extends RuntimeCamelException { + + private final Exchange exchange; + + public NoOffsetException(Exchange exchange) { + super("There was no " + Exchange.OFFSET + " header defined on the message exchange: " + exchange); + this.exchange = exchange; + } + + /** + * The exchange which caused this failure + */ + public Exchange getExchange() { + return exchange; + } +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java index 35cf8af..4c64a1c 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java @@ -26,7 +26,6 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; -import org.apache.camel.Expression; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Resumable; @@ -45,7 +44,6 @@ public class ResumableProcessor extends AsyncProcessorSupport private CamelContext camelContext; private ResumeStrategy resumeStrategy; private AsyncProcessor processor; - private final Expression offsetExpression; private String id; private String routeId; @@ -75,17 +73,16 @@ public class ResumableProcessor extends AsyncProcessorSupport } } - public ResumableProcessor(Expression offsetExpression, ResumeStrategy resumeStrategy, Processor processor) { + public ResumableProcessor(ResumeStrategy resumeStrategy, Processor processor) { this.resumeStrategy = Objects.requireNonNull(resumeStrategy); this.processor = AsyncProcessorConverterHelper.convert(processor); - this.offsetExpression = offsetExpression; LOG.info("Enabling the resumable strategy of type: {}", resumeStrategy.getClass().getSimpleName()); } @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { - Object offset = exchange.getMessage().getHeader("CamelOffset"); + Object offset = exchange.getMessage().getHeader(Exchange.OFFSET); if (offset instanceof Resumable) { Resumable<?, ?> resumable = (Resumable<?, ?>) offset; @@ -98,10 +95,10 @@ public class ResumableProcessor extends AsyncProcessorSupport return processor.process(exchange, target); } else { + exchange.setException(new NoOffsetException(exchange)); LOG.warn("Cannot update the last offset because it's not available"); + return true; } - - return processor.process(exchange, callback); } @Override diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java index 6eba53f..ecf626a 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java @@ -17,7 +17,6 @@ package org.apache.camel.reifier; -import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.ResumeStrategy; import org.apache.camel.Route; @@ -41,9 +40,7 @@ public class ResumableReifier extends ExpressionReifier<ResumableDefinition> { resumeStrategy.start(); route.setResumeStrategy(resumeStrategy); - Expression expression = createExpression(definition.getExpression()); - - return new ResumableProcessor(expression, resumeStrategy, childProcessor); + return new ResumableProcessor(resumeStrategy, childProcessor); } protected ResumeStrategy resolveResumeStrategy() { diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java index 2ef283c..fdf0315 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java @@ -55,7 +55,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport } } - @DisplayName("Tests whether we can resume from an offset") + @DisplayName("Tests whether it can resume from an offset") @Test public void testResumeFromOffset() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); @@ -63,7 +63,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport Map<String, Object> headers = new HashMap<>(); headers.put(Exchange.FILE_NAME, "resume-from-offset.txt"); - headers.put("CamelOffset", Resumables.of("resume-from-offset.txt", 3L)); + headers.put(Exchange.OFFSET, Resumables.of("resume-from-offset.txt", 3L)); template.sendBodyAndHeaders(fileUri("resumeOff"), "01234567890", headers); @@ -71,6 +71,17 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport assertMockEndpointsSatisfied(); } + @DisplayName("Tests whether it a missing offset causes a failure") + @Test + public void testMissingOffset() throws InterruptedException { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceivedInAnyOrder("34567890"); + + template.sendBodyAndHeader(fileUri("resumeMissingOffset"), "01234567890", Exchange.FILE_NAME, "resume-from-offset.txt"); + + mock.assertIsNotSatisfied(); + } + @DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)") @Test public void testNoResume() throws Exception { @@ -92,7 +103,14 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport bindToRegistry("resumeStrategy", new TestResumeStrategy()); from(fileUri("resumeOff?noop=true&recursive=true")) - .resumable().header("CamelOffset").resumableStrategyRef("resumeStrategy") + .setHeader(Exchange.OFFSET, + constant(Resumables.of("resume-none.txt", 3))) + .resumable().resumableStrategyRef("resumeStrategy") + .log("${body}") + .convertBodyTo(String.class).to("mock:result"); + + from(fileUri("resumeMissingOffset?noop=true&recursive=true")) + .resumable().resumableStrategyRef("resumeStrategy") .log("${body}") .convertBodyTo(String.class).to("mock:result"); diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java index e867cf2..5adb2dd 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java @@ -59,7 +59,7 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { private static Map<String, Object> headerFor(int num) { String name = num + ".txt"; - return Map.of(Exchange.FILE_NAME, name, "id", Resumables.of(name, num)); + return Map.of(Exchange.FILE_NAME, name); } @Test @@ -67,7 +67,6 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceivedInAnyOrder("3", "4", "5", "6"); - template.sendBodyAndHeaders(fileUri("resume"), "0", headerFor(0)); template.sendBodyAndHeaders(fileUri("resume"), "1", headerFor(1)); template.sendBodyAndHeaders(fileUri("resume"), "2", headerFor(2)); @@ -80,6 +79,15 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { assertMockEndpointsSatisfied(); } + private void setOffset(Exchange exchange) { + String body = exchange.getMessage().getBody(String.class); + + if (body != null) { + Integer num = Integer.valueOf(body); + exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(body + ".txt", num)); + } + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -89,7 +97,8 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { bindToRegistry("testResumeStrategy", new TestResumeStrategy()); from(fileUri("resume?noop=true&recursive=true")) - .resumable().header("id").resumableStrategyRef("testResumeStrategy") + .process(e -> setOffset(e)) + .resumable().resumableStrategyRef("testResumeStrategy") .convertBodyTo(String.class) .to("mock:result"); }
