This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2de2764a687 CAMEL-17051: extend the Resume API to include support for
pausable consumers
2de2764a687 is described below
commit 2de2764a687ae59d8b035a2163b269ab88850a1b
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Mar 30 16:29:35 2022 +0200
CAMEL-17051: extend the Resume API to include support for pausable consumers
---
.../org/apache/camel/catalog/components/kafka.json | 4 +-
.../org/apache/camel/catalog/models.properties | 1 +
.../org/apache/camel/catalog/models/pausable.json | 20 ++
.../apache/camel/catalog/schemas/camel-spring.xsd | 50 +++++
components/camel-kafka/pom.xml | 18 ++
.../component/kafka/KafkaComponentConfigurer.java | 4 +-
.../component/kafka/KafkaEndpointConfigurer.java | 4 +-
.../org/apache/camel/component/kafka/kafka.json | 4 +-
.../camel-kafka/src/main/docs/kafka-component.adoc | 16 ++
.../camel/component/kafka/KafkaConfiguration.java | 12 +-
.../camel/component/kafka/KafkaConsumer.java | 41 ++++-
.../camel/component/kafka/KafkaFetchRecords.java | 65 +++++--
.../component/kafka/PollExceptionStrategy.java | 15 ++
...{PollExceptionStrategy.java => SeekPolicy.java} | 19 +-
.../consumer/errorhandler/BridgeErrorStrategy.java | 5 +
.../errorhandler/DiscardErrorStrategy.java | 5 +
.../errorhandler/KafkaConsumerListener.java | 89 +++++++++
.../errorhandler/KafkaErrorStrategies.java | 2 +-
.../errorhandler/ReconnectErrorStrategy.java | 14 +-
.../consumer/errorhandler/RetryErrorStrategy.java | 10 +-
.../consumer/errorhandler/StopErrorStrategy.java | 13 +-
.../consumer/support/KafkaRecordProcessor.java | 4 +-
.../support/KafkaRecordProcessorFacade.java | 15 +-
.../kafka/consumer/support/ProcessingResult.java | 11 +-
.../consumer/support/ResumeStrategyFactory.java | 3 +-
.../SeekPolicyKafkaConsumerResumeStrategy.java | 9 +-
.../integration/BaseEmbeddedKafkaTestSupport.java | 10 +-
.../kafka/integration/KafkaConsumerFullIT.java | 5 +-
.../integration/pause/KafkaPausableConsumerIT.java | 203 +++++++++++++++++++++
.../src/test/resources/log4j2.properties | 2 +-
.../java/org/apache/camel/ConsumerListener.java | 52 ++++++
.../org/apache/camel/ConsumerListenerAware.java | 25 ++-
.../src/main/java/org/apache/camel/Route.java | 5 +
.../org/apache/camel/impl/engine/DefaultRoute.java | 12 ++
.../docs/modules/eips/examples/json/pausable.json | 1 +
.../docs/modules/eips/pages/resume-strategies.adoc | 28 ++-
.../services/org/apache/camel/model.properties | 1 +
.../resources/org/apache/camel/model/jaxb.index | 1 +
.../resources/org/apache/camel/model/pausable.json | 20 ++
.../org/apache/camel/model/PausableDefinition.java | 115 ++++++++++++
.../apache/camel/model/ProcessorDefinition.java | 58 ++++++
.../apache/camel/processor/PausableProcessor.java | 99 ++++++++++
.../org/apache/camel/reifier/PausableReifier.java | 71 +++++++
.../org/apache/camel/reifier/ProcessorReifier.java | 3 +
.../java/org/apache/camel/xml/in/ModelParser.java | 11 ++
.../dsl/KafkaComponentBuilderFactory.java | 13 +-
.../endpoint/dsl/KafkaEndpointBuilderFactory.java | 27 ++-
.../dsl/yaml/deserializers/ModelDeserializers.java | 60 ++++++
.../deserializers/ModelDeserializersResolver.java | 2 +
.../src/generated/resources/camel-yaml-dsl.json | 24 +++
.../src/generated/resources/camelYamlDsl.json | 24 +++
51 files changed, 1232 insertions(+), 93 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 190096241df..b7e0f5bb104 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -54,7 +54,7 @@
"partitionAssignor": { "kind": "property", "displayName": "Partition
Assignor", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue":
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The class name of the partition assignme [...]
"pollOnError": { "kind": "property", "displayName": "Poll On Error",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "de [...]
"pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms",
"group": "consumer", "label": "consumer", "required": false, "type":
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used when polling the
KafkaConsumer." },
- "seekTo": { "kind": "property", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Set if KafkaConsumer will read from beginning
or end on startup: beginning : read from beginning [...]
+ "seekTo": { "kind": "property", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.SeekPolicy", "enum": [
"BEGINNING", "END" ], "deprecated": false, "autowired": false, "secret": false,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "Set if KafkaConsumer
will read from beginning or end on startup: SeekP [...]
"sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout
Ms", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "10000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used to detect failures when using
Kafka's group management facilities." },
"specificAvroReader": { "kind": "property", "displayName": "Specific Avro
Reader", "group": "consumer", "label": "confluent,consumer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This enables the use of a specific Avro reader
for use with the Confluent Platf [...]
"topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether the topic is a pattern (regular
expression). This can be used to subscribe to dynamic num [...]
@@ -177,7 +177,7 @@
"partitionAssignor": { "kind": "parameter", "displayName": "Partition
Assignor", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue":
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The class name of the partition assignm [...]
"pollOnError": { "kind": "parameter", "displayName": "Poll On Error",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "d [...]
"pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms",
"group": "consumer", "label": "consumer", "required": false, "type":
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used when polling the
KafkaConsumer." },
- "seekTo": { "kind": "parameter", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Set if KafkaConsumer will read from beginning
or end on startup: beginning : read from beginning [...]
+ "seekTo": { "kind": "parameter", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.SeekPolicy", "enum": [
"BEGINNING", "END" ], "deprecated": false, "autowired": false, "secret": false,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "Set if KafkaConsumer
will read from beginning or end on startup: Seek [...]
"sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout
Ms", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "10000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used to detect failures when using
Kafka's group management facilities." },
"specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro
Reader", "group": "consumer", "label": "confluent,consumer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This enables the use of a specific Avro reader
for use with the Confluent Plat [...]
"topicIsPattern": { "kind": "parameter", "displayName": "Topic Is
Pattern", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether the topic is a pattern (regular
expression). This can be used to subscribe to dynamic nu [...]
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models.properties
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models.properties
index c6467f167e4..f683e8e288b 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models.properties
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models.properties
@@ -113,6 +113,7 @@ packageScan
param
passThroughServiceFilter
patch
+pausable
pgp
pipeline
policy
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
new file mode 100644
index 00000000000..88b088805ab
--- /dev/null
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
@@ -0,0 +1,20 @@
+{
+ "model": {
+ "kind": "model",
+ "name": "pausable",
+ "title": "Pausable",
+ "description": "Pausable EIP to support resuming processing from last
known offset.",
+ "deprecated": false,
+ "label": "eip,routing",
+ "javaType": "org.apache.camel.model.PausableDefinition",
+ "abstract": false,
+ "input": true,
+ "output": false
+ },
+ "properties": {
+ "consumerListener": { "kind": "attribute", "displayName": "Consumer
Listener", "required": true, "type": "object", "javaType":
"org.apache.camel.ConsumerListener", "deprecated": false, "autowired": false,
"secret": false, "description": "Sets the consumer listener to use" },
+ "untilCheck": { "kind": "attribute", "displayName": "Until Check",
"required": true, "type": "object", "javaType": "java.util.function.Predicate",
"deprecated": false, "autowired": false, "secret": false },
+ "id": { "kind": "attribute", "displayName": "Id", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Sets the id of this node"
},
+ "description": { "kind": "element", "displayName": "Description",
"required": false, "type": "object", "javaType":
"org.apache.camel.model.DescriptionDefinition", "deprecated": false,
"autowired": false, "secret": false, "description": "Sets the description of
this node" }
+ }
+}
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index 94a28c4d2f4..72e6d5fa578 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -957,6 +957,14 @@ Rest PATCH command
</xs:annotation>
</xs:element>
+ <xs:element name="pausable" type="tns:pausableDefinition">
+ <xs:annotation>
+ <xs:documentation xml:lang="en"><![CDATA[
+Pausable EIP to support resuming processing from last known offset.
+ ]]></xs:documentation>
+ </xs:annotation>
+ </xs:element>
+
<xs:element name="pgp" type="tns:pgpDataFormat">
<xs:annotation>
<xs:documentation xml:lang="en"><![CDATA[
@@ -3034,6 +3042,7 @@ will fallback to use the fixed value if the Expression
result was null or 0.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -3538,6 +3547,7 @@ should be intercepted by this exception type or not.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -3638,6 +3648,7 @@ should be intercepted by this exception type or not.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -3735,6 +3746,7 @@ the branch that matched. Default value: false
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -3814,6 +3826,7 @@ the branch that matched. Default value: false
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -5011,6 +5024,7 @@ default WARN. Default value: WARN
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -5096,6 +5110,7 @@ Setting this allows to know if the filter predicate
evaluated as true or false.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -5235,6 +5250,7 @@ Global option value.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -5414,6 +5430,7 @@ Whether if validation is required for this input type.
Default value: false
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -5507,6 +5524,7 @@ configured, then all incoming messages is intercepted.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -5607,6 +5625,7 @@ process its result.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -5701,6 +5720,7 @@ specified using uri syntax, eg mynamecount=4&type=gold.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -5995,6 +6015,7 @@ To refer to a custom logger instance to lookup from the
registry.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -8957,6 +8978,7 @@ decompressed size. Default value: 1073741824
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -9158,6 +9180,7 @@ should be invoked or not.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -9326,6 +9349,7 @@ failure. If this option is enabled then its considered
handled as well.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -9498,6 +9522,7 @@ Default value: false
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -9601,6 +9626,22 @@ Include finding route builder from these java package
names.
</xs:sequence>
</xs:complexType>
+ <xs:complexType name="pausableDefinition">
+ <xs:complexContent>
+ <xs:extension base="tns:noOutputDefinition">
+ <xs:sequence/>
+ <xs:attribute name="consumerListener" type="xs:string" use="required">
+ <xs:annotation>
+ <xs:documentation xml:lang="en"><![CDATA[
+Sets the consumer listener to use.
+ ]]></xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
+ <xs:attribute name="untilCheck" type="xs:string" use="required"/>
+ </xs:extension>
+ </xs:complexContent>
+ </xs:complexType>
+
<xs:complexType name="pipelineDefinition">
<xs:complexContent>
<xs:extension base="tns:output">
@@ -9635,6 +9676,7 @@ Include finding route builder from these java package
names.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -9712,6 +9754,7 @@ Include finding route builder from these java package
names.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -10256,6 +10299,7 @@ Name of property to remove.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -10563,6 +10607,7 @@ Reference to the routes in the xml dsl.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -10921,6 +10966,7 @@ compensation/completion exchange.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -11272,6 +11318,7 @@ Sets the comparator to use for sorting.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -11481,6 +11528,7 @@ individual unit of work. Default value: false
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -11974,6 +12022,7 @@ Whether to auto startup components when toD is starting
up. Default value: true
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
@@ -12088,6 +12137,7 @@ Sets a reference to use for lookup the policy in the
registry.
<xs:element ref="tns:onCompletion"/>
<xs:element ref="tns:onException"/>
<xs:element ref="tns:onFallback"/>
+ <xs:element ref="tns:pausable"/>
<xs:element ref="tns:pipeline"/>
<xs:element ref="tns:policy"/>
<xs:element ref="tns:pollEnrich"/>
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 7fe17b4d13a..cac4ed03e60 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -55,6 +55,18 @@
</dependency>
<!-- test -->
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-infra-kafka</artifactId>
@@ -64,6 +76,12 @@
</dependency>
<!-- Required by the admin client-->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-direct</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-junit5</artifactId>
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 0ac20776bdd..a9e413a5361 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -176,7 +176,7 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "securityprotocol":
case "securityProtocol":
getOrCreateConfiguration(target).setSecurityProtocol(property(camelContext,
java.lang.String.class, value)); return true;
case "seekto":
- case "seekTo":
getOrCreateConfiguration(target).setSeekTo(property(camelContext,
java.lang.String.class, value)); return true;
+ case "seekTo":
getOrCreateConfiguration(target).setSeekTo(property(camelContext,
org.apache.camel.component.kafka.SeekPolicy.class, value)); return true;
case "sendbufferbytes":
case "sendBufferBytes":
getOrCreateConfiguration(target).setSendBufferBytes(property(camelContext,
java.lang.Integer.class, value)); return true;
case "sessiontimeoutms":
@@ -394,7 +394,7 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "securityprotocol":
case "securityProtocol": return java.lang.String.class;
case "seekto":
- case "seekTo": return java.lang.String.class;
+ case "seekTo": return
org.apache.camel.component.kafka.SeekPolicy.class;
case "sendbufferbytes":
case "sendBufferBytes": return java.lang.Integer.class;
case "sessiontimeoutms":
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index b36f1d003ad..be6375442c4 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -164,7 +164,7 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "securityprotocol":
case "securityProtocol":
target.getConfiguration().setSecurityProtocol(property(camelContext,
java.lang.String.class, value)); return true;
case "seekto":
- case "seekTo":
target.getConfiguration().setSeekTo(property(camelContext,
java.lang.String.class, value)); return true;
+ case "seekTo":
target.getConfiguration().setSeekTo(property(camelContext,
org.apache.camel.component.kafka.SeekPolicy.class, value)); return true;
case "sendbufferbytes":
case "sendBufferBytes":
target.getConfiguration().setSendBufferBytes(property(camelContext,
java.lang.Integer.class, value)); return true;
case "sessiontimeoutms":
@@ -366,7 +366,7 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "securityprotocol":
case "securityProtocol": return java.lang.String.class;
case "seekto":
- case "seekTo": return java.lang.String.class;
+ case "seekTo": return
org.apache.camel.component.kafka.SeekPolicy.class;
case "sendbufferbytes":
case "sendBufferBytes": return java.lang.Integer.class;
case "sessiontimeoutms":
diff --git
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 190096241df..b7e0f5bb104 100644
---
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -54,7 +54,7 @@
"partitionAssignor": { "kind": "property", "displayName": "Partition
Assignor", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue":
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The class name of the partition assignme [...]
"pollOnError": { "kind": "property", "displayName": "Poll On Error",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "de [...]
"pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms",
"group": "consumer", "label": "consumer", "required": false, "type":
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used when polling the
KafkaConsumer." },
- "seekTo": { "kind": "property", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Set if KafkaConsumer will read from beginning
or end on startup: beginning : read from beginning [...]
+ "seekTo": { "kind": "property", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.SeekPolicy", "enum": [
"BEGINNING", "END" ], "deprecated": false, "autowired": false, "secret": false,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "Set if KafkaConsumer
will read from beginning or end on startup: SeekP [...]
"sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout
Ms", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "10000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used to detect failures when using
Kafka's group management facilities." },
"specificAvroReader": { "kind": "property", "displayName": "Specific Avro
Reader", "group": "consumer", "label": "confluent,consumer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This enables the use of a specific Avro reader
for use with the Confluent Platf [...]
"topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether the topic is a pattern (regular
expression). This can be used to subscribe to dynamic num [...]
@@ -177,7 +177,7 @@
"partitionAssignor": { "kind": "parameter", "displayName": "Partition
Assignor", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue":
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The class name of the partition assignm [...]
"pollOnError": { "kind": "parameter", "displayName": "Poll On Error",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "d [...]
"pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms",
"group": "consumer", "label": "consumer", "required": false, "type":
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used when polling the
KafkaConsumer." },
- "seekTo": { "kind": "parameter", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Set if KafkaConsumer will read from beginning
or end on startup: beginning : read from beginning [...]
+ "seekTo": { "kind": "parameter", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.SeekPolicy", "enum": [
"BEGINNING", "END" ], "deprecated": false, "autowired": false, "secret": false,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "Set if KafkaConsumer
will read from beginning or end on startup: Seek [...]
"sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout
Ms", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "10000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used to detect failures when using
Kafka's group management facilities." },
"specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro
Reader", "group": "consumer", "label": "confluent,consumer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This enables the use of a specific Avro reader
for use with the Confluent Plat [...]
"topicIsPattern": { "kind": "parameter", "displayName": "Topic Is
Pattern", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether the topic is a pattern (regular
expression). This can be used to subscribe to dynamic nu [...]
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index c4621066a63..a56ef6e2013 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -376,6 +376,22 @@ This is mostly useful with aggregation's completion
timeout strategies.
If you want to use a custom implementation of `KafkaManualCommit` then you can
configure a custom `KafkaManualCommitFactory`
on the `KafkaComponent` that creates instances of your custom implementation.
+== Pausable Consumers
+
+The Kafka component supports pausable consumers. This type of consumer can
pause consuming data based on
+conditions external to the component itself (such as an external system being
unavailable).
+
+[source,java]
+----
+from("kafka:topic")
+ .pausable(new KafkaConsumerListener(), () -> canContinue())
+ .routeId("pausable-route")
+ .process(exchange -> LOG.info("Got record from Kafka: {}",
exchange.getMessage().getBody()))
+ .to("some:destination");
+----
+
+In this example, consuming messages can pause (by calling the Kafka's Consumer
pause method) if the result from `canContinue` is false.
+
== Kafka Headers propagation
When consuming messages from Kafka, headers will be propagated to camel
exchange headers automatically.
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 0452fc33031..df624832871 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -129,8 +129,8 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
// fetch.max.wait.ms
@UriParam(label = "consumer", defaultValue = "500")
private Integer fetchWaitMaxMs = 500;
- @UriParam(label = "consumer", enums = "beginning,end")
- private String seekTo;
+ @UriParam(label = "consumer")
+ private SeekPolicy seekTo;
// Consumer configuration properties
@UriParam(label = "consumer", defaultValue = "true")
@@ -1612,15 +1612,15 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
this.valueDeserializer = valueDeserializer;
}
- public String getSeekTo() {
+ public SeekPolicy getSeekTo() {
return seekTo;
}
/**
- * Set if KafkaConsumer will read from beginning or end on startup:
beginning : read from beginning end : read from
- * end This is replacing the earlier property seekToBeginning
+ * Set if KafkaConsumer will read from beginning or end on startup:
SeekPolicy.BEGINNING: read from beginning.
+ * SeekPolicy.END: read from end.
*/
- public void setSeekTo(String seekTo) {
+ public void setSeekTo(SeekPolicy seekTo) {
this.seekTo = seekTo;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 107827870c0..8b65d3fae04 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -24,8 +24,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import org.apache.camel.ConsumerListenerAware;
import org.apache.camel.Processor;
import org.apache.camel.ResumeAware;
+import org.apache.camel.Suspendable;
+import
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
import org.apache.camel.health.HealthCheckAware;
import org.apache.camel.health.HealthCheckHelper;
@@ -39,7 +42,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KafkaConsumer extends DefaultConsumer implements
ResumeAware<KafkaConsumerResumeStrategy>, HealthCheckAware {
+public class KafkaConsumer extends DefaultConsumer
+ implements ResumeAware<KafkaConsumerResumeStrategy>, HealthCheckAware,
ConsumerListenerAware<KafkaConsumerListener>,
+ Suspendable {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaConsumer.class);
@@ -51,6 +56,7 @@ public class KafkaConsumer extends DefaultConsumer implements
ResumeAware<KafkaC
private final List<KafkaFetchRecords> tasks = new ArrayList<>();
private volatile boolean stopOffsetRepo;
private KafkaConsumerResumeStrategy resumeStrategy;
+ private KafkaConsumerListener consumerListener;
public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -67,6 +73,16 @@ public class KafkaConsumer extends DefaultConsumer
implements ResumeAware<KafkaC
return resumeStrategy;
}
+ @Override
+ public KafkaConsumerListener getConsumerListener() {
+ return consumerListener;
+ }
+
+ @Override
+ public void setConsumerListener(KafkaConsumerListener consumerListener) {
+ this.consumerListener = consumerListener;
+ }
+
@Override
protected void doBuild() throws Exception {
super.doBuild();
@@ -140,7 +156,8 @@ public class KafkaConsumer extends DefaultConsumer
implements ResumeAware<KafkaC
BridgeExceptionHandlerToErrorHandler bridge = new
BridgeExceptionHandlerToErrorHandler(this);
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount();
i++) {
KafkaFetchRecords task = new KafkaFetchRecords(
- this, bridge, topic, pattern, i + "", getProps());
+ this, bridge, topic, pattern, i + "", getProps(),
consumerListener);
+
executor.submit(task);
tasks.add(task);
@@ -196,4 +213,24 @@ public class KafkaConsumer extends DefaultConsumer
implements ResumeAware<KafkaC
super.doStop();
}
+
+ @Override
+ protected void doSuspend() throws Exception {
+ for (KafkaFetchRecords task : tasks) {
+ LOG.info("Pausing Kafka record fetcher task running client ID {}",
task.getClientId());
+ task.pause();
+ }
+
+ super.doSuspend();
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ for (KafkaFetchRecords task : tasks) {
+ LOG.info("Resuming Kafka record fetcher task running client ID
{}", task.getClientId());
+ task.resume();
+ }
+
+ super.doResume();
+ }
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 19321dede6b..0c54ea23363 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.CommitManagers;
+import
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import
org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies;
import
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
import
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
@@ -64,20 +65,21 @@ public class KafkaFetchRecords implements Runnable {
private final ReentrantLock lock = new ReentrantLock();
private CommitManager commitManager;
private Exception lastError;
+ private final KafkaConsumerListener consumerListener;
private boolean terminated;
private long currentBackoffInterval;
- private boolean retry = true;
private boolean reconnect; // must be false at init (this is the policy
whether to reconnect)
private boolean connected; // this is the state (connected or not)
KafkaFetchRecords(KafkaConsumer kafkaConsumer,
BridgeExceptionHandlerToErrorHandler bridge, String
topicName, Pattern topicPattern, String id,
- Properties kafkaProps) {
+ Properties kafkaProps, KafkaConsumerListener
consumerListener) {
this.kafkaConsumer = kafkaConsumer;
this.bridge = bridge;
this.topicName = topicName;
this.topicPattern = topicPattern;
+ this.consumerListener = consumerListener;
this.threadId = topicName + "-" + "Thread " + id;
this.kafkaProps = kafkaProps;
@@ -140,7 +142,7 @@ public class KafkaFetchRecords implements Runnable {
lastError = null;
startPolling();
- } while ((isRetrying() || isReconnect()) && isKafkaConsumerRunnable());
+ } while ((pollExceptionStrategy.canContinue() || isReconnect()) &&
isKafkaConsumerRunnable());
if (LOG.isInfoEnabled()) {
LOG.info("Terminating KafkaConsumer thread {} receiving from {}",
threadId, getPrintableTopic());
@@ -188,6 +190,16 @@ public class KafkaFetchRecords implements Runnable {
commitManager
= CommitManagers.createCommitManager(consumer,
kafkaConsumer, threadId, getPrintableTopic());
+ if (consumerListener != null) {
+ consumerListener.setConsumer(consumer);
+
+ SeekPolicy seekPolicy =
kafkaConsumer.getEndpoint().getComponent().getConfiguration().getSeekTo();
+ if (seekPolicy == null) {
+ seekPolicy = SeekPolicy.BEGINNING;
+ }
+
+ consumerListener.setSeekPolicy(seekPolicy);
+ }
} catch (Exception e) {
setConnected(false);
// ensure this is logged so users can see the problem
@@ -240,8 +252,7 @@ public class KafkaFetchRecords implements Runnable {
// set reconnect to false as the connection and resume is done at this
point
setConnected(false);
- // set retry to true to continue polling
- setRetry(true);
+ pollExceptionStrategy.reset();
}
private void subscribe() {
@@ -280,22 +291,26 @@ public class KafkaFetchRecords implements Runnable {
}
KafkaRecordProcessorFacade recordProcessorFacade = new
KafkaRecordProcessorFacade(
- kafkaConsumer, lastProcessedOffset, threadId,
commitManager);
+ kafkaConsumer, lastProcessedOffset, threadId,
commitManager, consumerListener);
Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
- while (isKafkaConsumerRunnable() && isRetrying() && isConnected())
{
+ while (isKafkaConsumerRunnable() && isConnected() &&
pollExceptionStrategy.canContinue()) {
ConsumerRecords<Object, Object> allRecords =
consumer.poll(pollDuration);
+ if (consumerListener != null) {
+ if (!consumerListener.afterConsume(consumer)) {
+ continue;
+ }
+ }
commitManager.processAsyncCommits();
- ProcessingResult result =
recordProcessorFacade.processPolledRecords(allRecords);
+ ProcessingResult result =
recordProcessorFacade.processPolledRecords(allRecords, consumer);
if (result.isBreakOnErrorHit()) {
LOG.debug("We hit an error ... setting flags to force
reconnect");
// force re-connect
setReconnect(true);
setConnected(false);
- setRetry(false); // to close the current consumer
}
}
@@ -334,7 +349,7 @@ public class KafkaFetchRecords implements Runnable {
pollExceptionStrategy.handle(partitionLastOffset, e);
} finally {
// only close if not retry
- if (!isRetrying()) {
+ if (!pollExceptionStrategy.canContinue()) {
LOG.debug("Closing consumer {}", threadId);
safeUnsubscribe();
IOHelper.close(consumer);
@@ -382,14 +397,6 @@ public class KafkaFetchRecords implements Runnable {
return kafkaConsumer.getEndpoint().getCamelContext().isStopping() &&
!kafkaConsumer.isRunAllowed();
}
- private boolean isRetrying() {
- return retry;
- }
-
- public void setRetry(boolean value) {
- retry = value;
- }
-
private boolean isReconnect() {
return reconnect;
}
@@ -442,6 +449,10 @@ public class KafkaFetchRecords implements Runnable {
return connected;
}
+ public boolean isPaused() {
+ return !consumer.paused().isEmpty();
+ }
+
public void setConnected(boolean connected) {
this.connected = connected;
}
@@ -489,7 +500,7 @@ public class KafkaFetchRecords implements Runnable {
}
boolean isRecoverable() {
- return (isRetrying() || isReconnect()) && isKafkaConsumerRunnable();
+ return (pollExceptionStrategy.canContinue() || isReconnect()) &&
isKafkaConsumerRunnable();
}
long getCurrentRecoveryInterval() {
@@ -499,4 +510,20 @@ public class KafkaFetchRecords implements Runnable {
public BridgeExceptionHandlerToErrorHandler getBridge() {
return bridge;
}
+
+ /*
+ * This is for manually pausing the consumer. This is mostly used for
directly calling pause from Java code
+ * or via JMX
+ */
+ public void pause() {
+ consumer.pause(consumer.assignment());
+ }
+
+ /*
+ * This is for manually resuming the consumer (not to be confused w/ the
Resume API). This is
+ * mostly used for directly calling resume from Java code or via JMX
+ */
+ public void resume() {
+ consumer.resume(consumer.assignment());
+ }
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
index a2c6fef690d..019c0849724 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
@@ -24,6 +24,21 @@ package org.apache.camel.component.kafka;
*/
public interface PollExceptionStrategy {
+ /**
+ * Reset any error flags set by a previous error condition
+ */
+ default void reset() {
+
+ }
+
+ /**
+ * This method provides an "answer" to whether the consumer can continue
polling or not. This is specific to each
+ * polling exception strategy and must be implemented accordingly
+ *
+ * @return true if polling should continue or false otherwise
+ */
+ boolean canContinue();
+
/**
* Controls how to handle the exception while polling from Kafka.
*
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/SeekPolicy.java
similarity index 52%
copy from
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
copy to
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/SeekPolicy.java
index a2c6fef690d..2433fe89b9f 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/SeekPolicy.java
@@ -14,21 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.camel.component.kafka;
/**
- * Strategy to decide when a Kafka exception was thrown during polling, how to
handle this. For example by re-connecting
- * and polling the same message again, by stopping the consumer (allows to
re-balance and let another consumer try), or
- * to let Camel route the message as an exception which allows Camel error
handling to handle the exception, or to
- * discard this message and poll the next message.
+ * BEGINNING configures the consumer to consume from the beginning of the
topic/partition. END configures the consumer
+ * to consume from the end of the topic/partition
*/
-public interface PollExceptionStrategy {
-
- /**
- * Controls how to handle the exception while polling from Kafka.
- *
- * @param exception the caused exception which typically would be a
{@link org.apache.kafka.common.KafkaException}
- * @return how to handle the exception
- */
- void handle(long partitionLastOffset, Exception exception);
+public enum SeekPolicy {
+ BEGINNING,
+ END
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
index 2141baf5e8d..fdc2c49b0fd 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
@@ -33,6 +33,11 @@ public class BridgeErrorStrategy implements
PollExceptionStrategy {
this.consumer = consumer;
}
+ @Override
+ public boolean canContinue() {
+ return true;
+ }
+
@Override
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Deferring processing to the exception handler based on
polling exception strategy");
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
index 7387b85d675..d5424003f00 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
@@ -30,6 +30,11 @@ public class DiscardErrorStrategy implements
PollExceptionStrategy {
this.consumer = consumer;
}
+ @Override
+ public boolean canContinue() {
+ return true;
+ }
+
@Override
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Requesting the consumer to discard the message and continue
to the next based on polling exception strategy");
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
new file mode 100644
index 00000000000..44f9f6337ed
--- /dev/null
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.errorhandler;
+
+import java.util.function.Predicate;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.component.kafka.SeekPolicy;
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConsumerListener implements ConsumerListener<Object,
ProcessingResult> {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaConsumerListener.class);
+ private Consumer<?, ?> consumer;
+ private SeekPolicy seekPolicy;
+
+ private Predicate<?> afterConsumeEval;
+
+ public Consumer<?, ?> getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(Consumer<?, ?> consumer) {
+ this.consumer = consumer;
+ }
+
+ public SeekPolicy getSeekPolicy() {
+ return seekPolicy;
+ }
+
+ public void setSeekPolicy(SeekPolicy seekPolicy) {
+ this.seekPolicy = seekPolicy;
+ }
+
+ @Override
+ public void setResumableCheck(Predicate<?> afterConsumeEval) {
+ this.afterConsumeEval = afterConsumeEval;
+ }
+
+ @Override
+ public boolean afterConsume(@SuppressWarnings("unused") Object ignored) {
+ if (afterConsumeEval.test(null)) {
+ LOG.warn("State changed, therefore resuming the consumer");
+ consumer.resume(consumer.assignment());
+
+ return true;
+ }
+
+ LOG.warn("The consumer is not yet resumable");
+ return false;
+ }
+
+ @Override
+ public boolean afterProcess(ProcessingResult result) {
+ if (result.isFailed()) {
+ LOG.warn("Pausing consumer due to error on the last processing");
+ consumer.pause(consumer.assignment());
+
+ if (seekPolicy == SeekPolicy.BEGINNING) {
+ LOG.debug("Seeking from the beginning of topic");
+ consumer.seekToBeginning(consumer.assignment());
+ } else if (seekPolicy == SeekPolicy.END) {
+ LOG.debug("Seeking from the end off the topic");
+ consumer.seekToEnd(consumer.assignment());
+ }
+
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
index b6b9f4acea9..3541e08dbce 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
@@ -41,7 +41,7 @@ public final class KafkaErrorStrategies {
switch (onError) {
case RETRY:
- return new RetryErrorStrategy(recordFetcher);
+ return new RetryErrorStrategy();
case RECONNECT:
return new ReconnectErrorStrategy(recordFetcher);
case ERROR_HANDLER:
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
index ddac560720e..0d54cbe0706 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
@@ -26,10 +26,22 @@ public class ReconnectErrorStrategy implements
PollExceptionStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(ReconnectErrorStrategy.class);
private KafkaFetchRecords recordFetcher;
+ private boolean retry = true;
+
public ReconnectErrorStrategy(KafkaFetchRecords recordFetcher) {
this.recordFetcher = recordFetcher;
}
+ @Override
+ public void reset() {
+ retry = true;
+ }
+
+ @Override
+ public boolean canContinue() {
+ return retry;
+ }
+
@Override
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Requesting the consumer to re-connect on the next run based
on polling exception strategy");
@@ -39,6 +51,6 @@ public class ReconnectErrorStrategy implements
PollExceptionStrategy {
recordFetcher.setConnected(false);
// to close the current consumer
- recordFetcher.setRetry(false);
+ retry = false;
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/RetryErrorStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/RetryErrorStrategy.java
index 46ddcb761a8..3df6f43c18d 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/RetryErrorStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/RetryErrorStrategy.java
@@ -17,24 +17,20 @@
package org.apache.camel.component.kafka.consumer.errorhandler;
-import org.apache.camel.component.kafka.KafkaFetchRecords;
import org.apache.camel.component.kafka.PollExceptionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RetryErrorStrategy implements PollExceptionStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(RetryErrorStrategy.class);
- private KafkaFetchRecords recordFetcher;
- public RetryErrorStrategy(KafkaFetchRecords recordFetcher) {
- this.recordFetcher = recordFetcher;
+ @Override
+ public boolean canContinue() {
+ return true;
}
@Override
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Requesting the consumer to retry polling the same message
based on polling exception strategy");
-
- // consumer retry the same message again
- recordFetcher.setRetry(true);
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
index b330a368173..6258ad3ac63 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
@@ -25,17 +25,28 @@ import org.slf4j.LoggerFactory;
public class StopErrorStrategy implements PollExceptionStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(StopErrorStrategy.class);
private KafkaFetchRecords recordFetcher;
+ private boolean retry = true;
public StopErrorStrategy(KafkaFetchRecords recordFetcher) {
this.recordFetcher = recordFetcher;
}
+ @Override
+ public void reset() {
+ retry = true;
+ }
+
+ @Override
+ public boolean canContinue() {
+ return retry;
+ }
+
@Override
public void handle(long partitionLastOffset, Exception exception) {
// stop and terminate consumer
LOG.warn("Requesting the consumer to stop based on polling exception
strategy");
- recordFetcher.setRetry(false);
+ retry = false;
recordFetcher.setConnected(false);
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index df288d78388..723cd0f4e96 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -113,10 +113,10 @@ public class KafkaRecordProcessor {
boolean breakOnErrorExit = processException(exchange, partition,
lastResult.getPartitionLastOffset(),
exceptionHandler);
- return new ProcessingResult(breakOnErrorExit,
lastResult.getPartitionLastOffset());
+ return new ProcessingResult(breakOnErrorExit,
lastResult.getPartitionLastOffset(), true);
}
- return new ProcessingResult(false, record.offset());
+ return new ProcessingResult(false, record.offset(),
exchange.getException() != null);
}
private boolean processException(
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 10d893ef15f..0ced3260246 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -25,6 +25,8 @@ import java.util.Set;
import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
+import
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
@@ -41,23 +43,24 @@ public class KafkaRecordProcessorFacade {
private final String threadId;
private final KafkaRecordProcessor kafkaRecordProcessor;
private final CommitManager commitManager;
+ private final KafkaConsumerListener consumerListener;
public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer,
Map<String, Long> lastProcessedOffset, String threadId,
- CommitManager commitManager) {
+ CommitManager commitManager,
KafkaConsumerListener consumerListener) {
this.camelKafkaConsumer = camelKafkaConsumer;
this.lastProcessedOffset = lastProcessedOffset;
this.threadId = threadId;
this.commitManager = commitManager;
kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager);
-
+ this.consumerListener = consumerListener;
}
private boolean isStopping() {
return camelKafkaConsumer.isStopping();
}
- public ProcessingResult processPolledRecords(ConsumerRecords<Object,
Object> allRecords) {
+ public ProcessingResult processPolledRecords(ConsumerRecords<Object,
Object> allRecords, Consumer<?, ?> consumer) {
logRecords(allRecords);
Set<TopicPartition> partitions = allRecords.partitions();
@@ -79,6 +82,12 @@ public class KafkaRecordProcessorFacade {
lastResult = processRecord(partition,
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
kafkaRecordProcessor, record);
+
+ if (consumerListener != null) {
+ if (!consumerListener.afterProcess(lastResult)) {
+ return lastResult;
+ }
+ }
}
if (!lastResult.isBreakOnErrorHit()) {
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index 0845bcab343..36f0c69c8b2 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -20,14 +20,17 @@ package org.apache.camel.component.kafka.consumer.support;
import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
public final class ProcessingResult {
- private static final ProcessingResult UNPROCESSED_RESULT = new
ProcessingResult(false, AbstractCommitManager.START_OFFSET);
+ private static final ProcessingResult UNPROCESSED_RESULT
+ = new ProcessingResult(false, AbstractCommitManager.START_OFFSET,
false);
private final boolean breakOnErrorHit;
private final long partitionLastOffset;
+ private final boolean failed;
- ProcessingResult(boolean breakOnErrorHit, long partitionLastOffset) {
+ ProcessingResult(boolean breakOnErrorHit, long partitionLastOffset,
boolean failed) {
this.breakOnErrorHit = breakOnErrorHit;
this.partitionLastOffset = partitionLastOffset;
+ this.failed = failed;
}
public boolean isBreakOnErrorHit() {
@@ -38,6 +41,10 @@ public final class ProcessingResult {
return partitionLastOffset;
}
+ public boolean isFailed() {
+ return failed;
+ }
+
public static ProcessingResult newUnprocessed() {
return UNPROCESSED_RESULT;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index c4fdb6d12b9..3784b4c5c15 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.kafka.consumer.support;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConsumer;
+import org.apache.camel.component.kafka.SeekPolicy;
import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
@@ -65,7 +66,7 @@ public final class ResumeStrategyFactory {
private static KafkaConsumerResumeStrategy
builtinResumeStrategies(KafkaConfiguration configuration) {
LOG.debug("No resume strategy was provided ... checking for built-ins
...");
StateRepository<String, String> offsetRepository =
configuration.getOffsetRepository();
- String seekTo = configuration.getSeekTo();
+ SeekPolicy seekTo = configuration.getSeekTo();
if (offsetRepository != null) {
LOG.info("Using resume from offset strategy");
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
index 91db91b85ed..b877763854e 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.kafka.consumer.support;
+import org.apache.camel.component.kafka.SeekPolicy;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,10 +28,10 @@ public class SeekPolicyKafkaConsumerResumeStrategy
implements KafkaConsumerResum
private static final Logger LOG =
LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class);
- private final String seekPolicy;
+ private final SeekPolicy seekPolicy;
private Consumer<?, ?> consumer;
- public SeekPolicyKafkaConsumerResumeStrategy(String seekPolicy) {
+ public SeekPolicyKafkaConsumerResumeStrategy(SeekPolicy seekPolicy) {
this.seekPolicy = seekPolicy;
}
@@ -41,10 +42,10 @@ public class SeekPolicyKafkaConsumerResumeStrategy
implements KafkaConsumerResum
@Override
public void resume() {
- if (seekPolicy.equals("beginning")) {
+ if (seekPolicy == SeekPolicy.BEGINNING) {
LOG.debug("Seeking from the beginning of topic");
consumer.seekToBeginning(consumer.assignment());
- } else if (seekPolicy.equals("end")) {
+ } else if (seekPolicy == SeekPolicy.END) {
LOG.debug("Seeking from the end off the topic");
consumer.seekToEnd(consumer.assignment());
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
index ff427f21e7f..d044dc1b708 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
@@ -57,7 +57,7 @@ public abstract class BaseEmbeddedKafkaTestSupport extends
CamelTestSupport {
}
}
- protected Properties getDefaultProperties() {
+ public static Properties getDefaultProperties(KafkaService service) {
LOG.info("Connecting to Kafka {}", service.getBootstrapServers());
Properties props = new Properties();
@@ -69,6 +69,10 @@ public abstract class BaseEmbeddedKafkaTestSupport extends
CamelTestSupport {
return props;
}
+ protected Properties getDefaultProperties() {
+ return getDefaultProperties(service);
+ }
+
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
@@ -87,6 +91,10 @@ public abstract class BaseEmbeddedKafkaTestSupport extends
CamelTestSupport {
}
private static AdminClient createAdminClient() {
+ return createAdminClient(service);
+ }
+
+ public static AdminClient createAdminClient(KafkaService service) {
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
service.getBootstrapServers());
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index 6a3d4af2303..46e304d0613 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.SeekPolicy;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -165,7 +166,7 @@ public class KafkaConsumerFullIT extends
BaseEmbeddedKafkaTestSupport {
context.getRouteController().stopRoute("full-it");
KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from;
- kafkaEndpoint.getConfiguration().setSeekTo("beginning");
+ kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.BEGINNING);
context.getRouteController().startRoute("full-it");
@@ -193,7 +194,7 @@ public class KafkaConsumerFullIT extends
BaseEmbeddedKafkaTestSupport {
context.getRouteController().stopRoute("full-it");
KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from;
- kafkaEndpoint.getConfiguration().setSeekTo("end");
+ kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.END);
context.getRouteController().startRoute("full-it");
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
new file mode 100644
index 00000000000..7ed6fdeb57f
--- /dev/null
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration.pause;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import
org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport {
+ // Just a wrapper for us to check if the expected methods are being called
+ private static class TestListener extends KafkaConsumerListener {
+ boolean afterConsumeCalled;
+ boolean afterProcessCalled;
+
+ @Override
+ public boolean afterConsume(Object ignored) {
+ afterConsumeCalled = true;
+ return super.afterConsume(ignored);
+ }
+
+ @Override
+ public boolean afterProcess(ProcessingResult result) {
+ afterProcessCalled = true;
+ return super.afterProcess(result);
+ }
+ }
+
+ public static final String SOURCE_TOPIC = "pause-source";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaPausableConsumerIT.class);
+
+ private static final int RETRY_COUNT = 10;
+ private LongAdder count = new LongAdder();
+
+ @EndpointInject("kafka:" + SOURCE_TOPIC
+ +
"?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+ private Endpoint from;
+
+ @EndpointInject("direct:intermediate")
+ private Endpoint intermediate;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ private ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+
+ private TestListener testConsumerListener = new TestListener();
+
+ @BeforeEach
+ public void before() {
+ Properties props =
BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ MockConsumerInterceptor.recordsCaptured.clear();
+
+ executorService.scheduleAtFixedRate(this::increment, 5, 1,
TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ // clean all test topics
+ BaseEmbeddedKafkaTestSupport.createAdminClient(service)
+ .deleteTopics(Collections.singletonList(SOURCE_TOPIC)).all();
+
+ executorService.shutdownNow();
+ }
+
+ private boolean canContinue() {
+ // First one should go through ...
+ if (count.intValue() <= 1) {
+ return true;
+ }
+
+ if (count.intValue() >= RETRY_COUNT) {
+ return true;
+ }
+
+ return false;
+ }
+
+ public void increment() {
+ count.increment();
+ }
+
+ public int getCount() {
+ return count.intValue();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+ from(from)
+ .pausable(testConsumerListener, o -> canContinue())
+ .routeId("pausable-it")
+ .process(exchange -> LOG.info("Got record from Kafka:
{}", exchange.getMessage().getBody()))
+ .to(intermediate);
+
+ from(intermediate)
+ .process(exchange -> {
+ LOG.info("Got record on the intermediate
processor: {}", exchange.getMessage().getBody());
+
+ if (getCount() <= RETRY_COUNT) {
+ throw new RuntimeCamelException("Error");
+ }
+ })
+ .to(to);
+ }
+ };
+ }
+
+ @Test
+ @Timeout(value = 1, unit = TimeUnit.MINUTES)
+ public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+ String propagatedHeaderKey = "PropagatedCustomHeader";
+ byte[] propagatedHeaderValue = "propagated header value".getBytes();
+ String skippedHeaderKey = "CamelSkippedHeader";
+
+ // Although all messages will be sent more than once to the exception
only 5 messages should reach the final
+ // destination, because sending them on the first few tries should fail
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
+
+ // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any
+ // exchange because autoCommitEnable=true
+
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
null, null, null, null, null);
+ to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
+
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new
ProducerRecord<>(SOURCE_TOPIC, "1", msg);
+ data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped
header value".getBytes()));
+ data.headers().add(new RecordHeader(propagatedHeaderKey,
propagatedHeaderValue));
+ producer.send(data);
+ }
+
+ await().atMost(30, TimeUnit.SECONDS).untilAdder(count,
greaterThan(10L));
+
+ assertTrue(testConsumerListener.afterConsumeCalled, "The afterConsume
method should have been called");
+ assertTrue(testConsumerListener.afterProcessCalled, "The afterProcess
method should have been called");
+
+ to.assertIsSatisfied();
+ assertEquals(5, to.getExchanges().size(), "Did not receive the
expected amount of messages");
+
+ Map<String, Object> headers =
to.getExchanges().get(0).getIn().getHeaders();
+ assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive
skipped header");
+ assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive
propagated header");
+ }
+}
diff --git a/components/camel-kafka/src/test/resources/log4j2.properties
b/components/camel-kafka/src/test/resources/log4j2.properties
index 61f9e8e29b4..d1ad676c533 100644
--- a/components/camel-kafka/src/test/resources/log4j2.properties
+++ b/components/camel-kafka/src/test/resources/log4j2.properties
@@ -41,7 +41,7 @@ logger.resume.name=org.apache.camel.processor.resume.kafka
logger.resume.level=INFO
logger.kafka.name=org.apache.kafka
-logger.kafka.level=INFO
+logger.kafka.level=WARN
# To verify the ResumableCompletion onFailure
# logger.resume-processor.name=org.apache.camel.processor.resume
diff --git
a/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
b/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
new file mode 100644
index 00000000000..d92808e5d2a
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel;
+
+import java.util.function.Predicate;
+
+/**
+ * An interface for listening to consumer events and allow proxying between a
consumer predicate and the Camel
+ * component. The whole of the consumer predicate is that of evaluating
whether the consumption (from the internal Camel
+ * consumer) can continue or should be put on pause.
+ */
+public interface ConsumerListener<C, P> {
+
+ /**
+ * This sets the predicate responsible for evaluating whether the
processing can resume or not. Such predicate
+ * should return true if the consumption can resume, or false otherwise.
The exact point of when the predicate is
+ * called is dependent on the component, and it may be called on either
one of the available events. Implementations
+ * should not assume the predicate to be called at any specific point.
+ */
+ void setResumableCheck(Predicate<?> afterConsumeEval);
+
+ /**
+ * This is an event that runs after data consumption.
+ *
+ * @param consumePayload the resume payload if any
+ * @return true if the consumer should processing or false
otherwise.
+ */
+ boolean afterConsume(C consumePayload);
+
+ /**
+ * This is an event that runs after data processing.
+ *
+ * @param processingPayload the resume payload if any
+ * @return true if the consumer should continue or false
otherwise.
+ */
+ boolean afterProcess(P processingPayload);
+}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
b/core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
similarity index 51%
copy from
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
index a2c6fef690d..bbd24c59e47 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
@@ -14,21 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.kafka;
+
+package org.apache.camel;
/**
- * Strategy to decide when a Kafka exception was thrown during polling, how to
handle this. For example by re-connecting
- * and polling the same message again, by stopping the consumer (allows to
re-balance and let another consumer try), or
- * to let Camel route the message as an exception which allows Camel error
handling to handle the exception, or to
- * discard this message and poll the next message.
+ * An interface to represent an object which wishes to support listening for
consumer events using the
+ * {@link ConsumerListener}.
*/
-public interface PollExceptionStrategy {
+public interface ConsumerListenerAware<T extends ConsumerListener<?, ?>> {
+
+ /**
+ * Injects the {@link ConsumerListener} instance into the object
+ *
+ * @param consumerListener the consumer listener instance
+ */
+ void setConsumerListener(T consumerListener);
/**
- * Controls how to handle the exception while polling from Kafka.
+ * Gets the {@link ConsumerListener} instance
*
- * @param exception the caused exception which typically would be a
{@link org.apache.kafka.common.KafkaException}
- * @return how to handle the exception
+ * @return the consumer listener instance
*/
- void handle(long partitionLastOffset, Exception exception);
+ T getConsumerListener();
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java
b/core/camel-api/src/main/java/org/apache/camel/Route.java
index 2d4e0cda76b..0e6f90882c5 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Route.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Route.java
@@ -378,4 +378,9 @@ public interface Route extends RuntimeConfiguration {
*/
void setResumeStrategy(ResumeStrategy resumeStrategy);
+ /**
+ * Sets the consumer listener for the route
+ */
+ void setConsumerListener(ConsumerListener<?, ?> consumerListener);
+
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 5c39c860bea..8a8fc6a10d5 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.ConsumerListenerAware;
import org.apache.camel.Endpoint;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.NamedNode;
@@ -90,6 +92,7 @@ public class DefaultRoute extends ServiceSupport implements
Route {
private final Map<String, Processor> onCompletions = new HashMap<>();
private final Map<String, Processor> onExceptions = new HashMap<>();
private ResumeStrategy resumeStrategy;
+ private ConsumerListener<?, ?> consumerListener;
// camel-core-model
@Deprecated
@@ -634,6 +637,10 @@ public class DefaultRoute extends ServiceSupport
implements Route {
if (consumer instanceof ResumeAware) {
((ResumeAware) consumer).setResumeStrategy(resumeStrategy);
}
+
+ if (consumer instanceof ConsumerListenerAware) {
+ ((ConsumerListenerAware)
consumer).setConsumerListener(consumerListener);
+ }
}
if (processor instanceof Service) {
services.add((Service) processor);
@@ -711,4 +718,9 @@ public class DefaultRoute extends ServiceSupport implements
Route {
public void setResumeStrategy(ResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
}
+
+ @Override
+ public void setConsumerListener(ConsumerListener<?, ?> consumerListener) {
+ this.consumerListener = consumerListener;
+ }
}
diff --git
a/core/camel-core-engine/src/main/docs/modules/eips/examples/json/pausable.json
b/core/camel-core-engine/src/main/docs/modules/eips/examples/json/pausable.json
new file mode 120000
index 00000000000..f174daeaac5
--- /dev/null
+++
b/core/camel-core-engine/src/main/docs/modules/eips/examples/json/pausable.json
@@ -0,0 +1 @@
+../../../../../../../../camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
\ No newline at end of file
diff --git
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index 214157b9105..606a1f813b5 100644
---
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -2,7 +2,7 @@
:doctitle: Resume Strategies
:shortname: resume
:description: Provide strategies to allow consuming data from specific offsets
-:since:
+:since: 3.16.0
:supportlevel: Experimental
The resume strategies allow users to implement strategies that point the
consumer part of the routes to the last point of consumption. This allows Camel
to skip reading and processing data that has already been consumed.
@@ -191,3 +191,29 @@
from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
----
*Reason*: the `skip` method in the Reader will skip characters, whereas the
same method on the InputStream will skip bytes.
+
+
+== Pausable Consumers API
+
+The Pausable consumers API is a subset of the resume API that provides pause
and resume features for supported components.
+With this API it is possible to implement logic that controls the behavior of
the consumer based on conditions that are
+external to the component. For instance, it makes it possible to pause the
consumer if an external system becomes unavailable.
+
+Currently, support for pausable consumers is available for the following
components:
+
+* xref:components::kafka-component.adoc[camel-kafka]
+
+To use the API, it needs an instance of a Consumer listener along with a
predicate that tests whether to continue.
+
+* `org.apache.camel.ConsumerListener` - the consumer listener interface. Camel
already comes with pre-built consumer listeners, but users in need of more
complex behaviors can create their own listeners.
+* a predicate that returns true if data consumption should resume or false if
consumption should be put on pause
+
+Usage example:
+
+[source,java]
+----
+from(from)
+ .pausable(new KafkaConsumerListener(), o -> canContinue())
+ .process(exchange -> LOG.info("Received an exchange: {}",
exchange.getMessage().getBody()))
+ .to(destination);
+----
diff --git
a/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties
b/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties
index aaf9b075198..84d6b4b872a 100644
---
a/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties
+++
b/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties
@@ -108,6 +108,7 @@ packageScan
param
passThroughServiceFilter
patch
+pausable
pgp
pipeline
policy
diff --git
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index
index 4271e558cb8..6fb29c0d233 100644
---
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index
+++
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index
@@ -45,6 +45,7 @@ OtherwiseDefinition
OutputDefinition
OutputTypeDefinition
PackageScanDefinition
+PausableDefinition
PipelineDefinition
PolicyDefinition
PollEnrichDefinition
diff --git
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
new file mode 100644
index 00000000000..88b088805ab
--- /dev/null
+++
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
@@ -0,0 +1,20 @@
+{
+ "model": {
+ "kind": "model",
+ "name": "pausable",
+ "title": "Pausable",
+ "description": "Pausable EIP to support resuming processing from last
known offset.",
+ "deprecated": false,
+ "label": "eip,routing",
+ "javaType": "org.apache.camel.model.PausableDefinition",
+ "abstract": false,
+ "input": true,
+ "output": false
+ },
+ "properties": {
+ "consumerListener": { "kind": "attribute", "displayName": "Consumer
Listener", "required": true, "type": "object", "javaType":
"org.apache.camel.ConsumerListener", "deprecated": false, "autowired": false,
"secret": false, "description": "Sets the consumer listener to use" },
+ "untilCheck": { "kind": "attribute", "displayName": "Until Check",
"required": true, "type": "object", "javaType": "java.util.function.Predicate",
"deprecated": false, "autowired": false, "secret": false },
+ "id": { "kind": "attribute", "displayName": "Id", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Sets the id of this node"
},
+ "description": { "kind": "element", "displayName": "Description",
"required": false, "type": "object", "javaType":
"org.apache.camel.model.DescriptionDefinition", "deprecated": false,
"autowired": false, "secret": false, "description": "Sets the description of
this node" }
+ }
+}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
new file mode 100644
index 00000000000..d6f1bdfcaf2
--- /dev/null
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model;
+
+import java.util.function.Predicate;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.spi.Metadata;
+
+/**
+ * Pausable EIP to support resuming processing from last known offset.
+ */
+@Metadata(label = "eip,routing")
+@XmlRootElement(name = "pausable")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class PausableDefinition extends NoOutputDefinition<PausableDefinition>
{
+
+ @XmlTransient
+ private ConsumerListener<?, ?> consumerListenerBean;
+
+ @XmlAttribute(required = true)
+ @Metadata(required = true, javaType = "org.apache.camel.ConsumerListener")
+ private String consumerListener;
+
+ @XmlTransient
+ private Predicate<?> untilCheckBean;
+
+ @XmlAttribute(required = true)
+ @Metadata(required = true, javaType = "java.util.function.Predicate")
+ private String untilCheck;
+
+ @Override
+ public String getShortName() {
+ return "pausable";
+ }
+
+ @Override
+ public String getLabel() {
+ return "pausable";
+ }
+
+ public ConsumerListener<?, ?> getConsumerListenerBean() {
+ return consumerListenerBean;
+ }
+
+ public void setConsumerListener(ConsumerListener<?, ?>
consumerListenerBean) {
+ this.consumerListenerBean = consumerListenerBean;
+ }
+
+ public String getConsumerListener() {
+ return consumerListener;
+ }
+
+ public void setConsumerListener(String consumerListener) {
+ this.consumerListener = consumerListener;
+ }
+
+ public Predicate<?> getUntilCheckBean() {
+ return untilCheckBean;
+ }
+
+ public void setUntilCheck(Predicate<?> untilCheckBean) {
+ this.untilCheckBean = untilCheckBean;
+ }
+
+ public String getUntilCheck() {
+ return untilCheck;
+ }
+
+ public void setUntilCheck(String untilCheck) {
+ this.untilCheck = untilCheck;
+ }
+
+ // Fluent API
+ //
-------------------------------------------------------------------------
+
+ /**
+ * Sets the consumer listener to use
+ */
+ public PausableDefinition consumerListener(String consumerListenerRef) {
+ setConsumerListener(consumerListenerRef);
+
+ return this;
+ }
+
+ /**
+ * Sets the consumer listener to use
+ */
+ public PausableDefinition consumerListener(ConsumerListener<?, ?>
consumerListener) {
+ setConsumerListener(consumerListener);
+
+ return this;
+ }
+}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 68ba1c10dec..b3802477243 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -37,6 +37,7 @@ import org.apache.camel.AggregationStrategy;
import org.apache.camel.BeanScope;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
+import org.apache.camel.ConsumerListener;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -3841,6 +3842,63 @@ public abstract class ProcessorDefinition<Type extends
ProcessorDefinition<Type>
return asType();
}
+ /**
+ * This enables pausable consumers, which allows the consumer to pause
work until a certain condition allows it to
+ * resume operation
+ *
+ * @return the builder
+ */
+ public PausableDefinition pausable() {
+ PausableDefinition answer = new PausableDefinition();
+ addOutput(answer);
+ return answer;
+ }
+
+ /**
+ * This enables pausable consumers, which allows the consumer to pause
work until a certain condition allows it to
+ * resume operation
+ *
+ * @param consumerListener the consumer listener to use for consumer
events
+ * @return the builder
+ */
+ public Type pausable(ConsumerListener consumerListener,
java.util.function.Predicate<?> untilCheck) {
+ PausableDefinition answer = new PausableDefinition();
+ answer.setConsumerListener(consumerListener);
+ answer.setUntilCheck(untilCheck);
+ addOutput(answer);
+ return asType();
+ }
+
+ /**
+ * This enables pausable consumers, which allows the consumer to pause
work until a certain condition allows it to
+ * resume operation
+ *
+ * @param consumerListenerRef the resume strategy
+ * @return the builder
+ */
+ public Type pausable(String consumerListenerRef,
java.util.function.Predicate<?> untilCheck) {
+ PausableDefinition answer = new PausableDefinition();
+ answer.setConsumerListener(consumerListenerRef);
+ answer.setUntilCheck(untilCheck);
+ addOutput(answer);
+ return asType();
+ }
+
+ /**
+ * This enables pausable consumers, which allows the consumer to pause
work until a certain condition allows it to
+ * resume operation
+ *
+ * @param consumerListenerRef the resume strategy
+ * @return the builder
+ */
+ public Type pausable(String consumerListenerRef, String untilCheck) {
+ PausableDefinition answer = new PausableDefinition();
+ answer.setConsumerListener(consumerListenerRef);
+ answer.setUntilCheck(untilCheck);
+ addOutput(answer);
+ return asType();
+ }
+
// Properties
//
-------------------------------------------------------------------------
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
new file mode 100644
index 00000000000..85954c3b6f7
--- /dev/null
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.Exchange;
+import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
+
+public class PausableProcessor extends AsyncProcessorSupport
+ implements Navigate<Processor>, CamelContextAware, IdAware,
RouteIdAware {
+
+ private final ConsumerListener<?, ?> consumerListener;
+ private final AsyncProcessor processor;
+ private CamelContext camelContext;
+ private String id;
+ private String routeId;
+
+ public PausableProcessor(ConsumerListener<?, ?> consumerListener,
Processor processor) {
+ this.consumerListener = consumerListener;
+ this.processor = AsyncProcessorConverterHelper.convert(processor);
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ return processor.process(exchange, callback);
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public List<Processor> next() {
+ if (!hasNext()) {
+ return null;
+ }
+ List<Processor> answer = new ArrayList<>(1);
+ answer.add(processor);
+ return answer;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+}
diff --git
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
new file mode 100644
index 00000000000..0626fcb05e5
--- /dev/null
+++
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.reifier;
+
+import java.util.function.Predicate;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.model.PausableDefinition;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.PausableProcessor;
+import org.apache.camel.util.ObjectHelper;
+
+public class PausableReifier extends ProcessorReifier<PausableDefinition> {
+
+ public PausableReifier(Route route, ProcessorDefinition<?> definition) {
+ super(route, PausableDefinition.class.cast(definition));
+ }
+
+ @Override
+ public Processor createProcessor() throws Exception {
+ Processor childProcessor = createChildProcessor(false);
+
+ ConsumerListener<?, ?> consumerListener = resolveConsumerListener();
+ ObjectHelper.notNull(consumerListener, "consumerListener");
+
+ route.setConsumerListener(consumerListener);
+
+ return new PausableProcessor(consumerListener, childProcessor);
+ }
+
+ protected ConsumerListener<?, ?> resolveConsumerListener() {
+ ConsumerListener<?, ?> consumerListener =
definition.getConsumerListenerBean();
+ if (consumerListener == null) {
+ String ref = definition.getConsumerListener();
+
+ consumerListener = mandatoryLookup(ref, ConsumerListener.class);
+ }
+
+ Predicate<?> supplier = resolveUntilCheck();
+ consumerListener.setResumableCheck(supplier);
+ return consumerListener;
+ }
+
+ protected Predicate<?> resolveUntilCheck() {
+ Predicate<?> supplier = definition.getUntilCheckBean();
+ if (supplier == null) {
+ String ref = definition.getUntilCheck();
+
+ supplier = mandatoryLookup(ref, Predicate.class);
+ }
+
+ return supplier;
+ }
+}
diff --git
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index 55d3710f521..f375336c62c 100644
---
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -66,6 +66,7 @@ import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.model.OnFallbackDefinition;
import org.apache.camel.model.OptionalIdentifiedDefinition;
import org.apache.camel.model.OtherwiseDefinition;
+import org.apache.camel.model.PausableDefinition;
import org.apache.camel.model.PipelineDefinition;
import org.apache.camel.model.PolicyDefinition;
import org.apache.camel.model.PollEnrichDefinition;
@@ -310,6 +311,8 @@ public abstract class ProcessorReifier<T extends
ProcessorDefinition<?>> extends
return new WhenReifier(route, definition);
} else if (definition instanceof ResumableDefinition) {
return new ResumableReifier(route, definition);
+ } else if (definition instanceof PausableDefinition) {
+ return new PausableReifier(route, definition);
}
return null;
}
diff --git
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index 24df3c4b80c..78c3d65df2a 100644
---
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -718,6 +718,16 @@ public class ModelParser extends BaseParser {
return true;
}, noValueHandler());
}
+ protected PausableDefinition doParsePausableDefinition() throws
IOException, XmlPullParserException {
+ return doParse(new PausableDefinition(), (def, key, val) -> {
+ switch (key) {
+ case "consumerListener": def.setConsumerListener(val); break;
+ case "untilCheck": def.setUntilCheck(val); break;
+ default: return
processorDefinitionAttributeHandler().accept(def, key, val);
+ }
+ return true;
+ }, optionalIdentifiedDefinitionElementHandler(), noValueHandler());
+ }
protected PipelineDefinition doParsePipelineDefinition() throws
IOException, XmlPullParserException {
return doParse(new PipelineDefinition(),
processorDefinitionAttributeHandler(),
outputDefinitionElementHandler(), noValueHandler());
@@ -3237,6 +3247,7 @@ public class ModelParser extends BaseParser {
case "onCompletion": return doParseOnCompletionDefinition();
case "onException": return doParseOnExceptionDefinition();
case "onFallback": return doParseOnFallbackDefinition();
+ case "pausable": return doParsePausableDefinition();
case "pipeline": return doParsePipelineDefinition();
case "policy": return doParsePolicyDefinition();
case "pollEnrich": return doParsePollEnrichDefinition();
diff --git
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 73c1fa96cb4..6b98abddd24 100644
---
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -655,17 +655,20 @@ public interface KafkaComponentBuilderFactory {
}
/**
* Set if KafkaConsumer will read from beginning or end on startup:
- * beginning : read from beginning end : read from end This is
replacing
- * the earlier property seekToBeginning.
+ * SeekPolicy.BEGINNING: read from beginning. SeekPolicy.END: read from
+ * end.
*
- * The option is a: <code>java.lang.String</code> type.
+ * The option is a:
+ * <code>org.apache.camel.component.kafka.SeekPolicy</code>
+ * type.
*
* Group: consumer
*
* @param seekTo the value to set
* @return the dsl builder
*/
- default KafkaComponentBuilder seekTo(java.lang.String seekTo) {
+ default KafkaComponentBuilder seekTo(
+ org.apache.camel.component.kafka.SeekPolicy seekTo) {
doSetProperty("seekTo", seekTo);
return this;
}
@@ -2079,7 +2082,7 @@ public interface KafkaComponentBuilderFactory {
case "partitionAssignor":
getOrCreateConfiguration((KafkaComponent)
component).setPartitionAssignor((java.lang.String) value); return true;
case "pollOnError": getOrCreateConfiguration((KafkaComponent)
component).setPollOnError((org.apache.camel.component.kafka.PollOnError)
value); return true;
case "pollTimeoutMs": getOrCreateConfiguration((KafkaComponent)
component).setPollTimeoutMs((java.lang.Long) value); return true;
- case "seekTo": getOrCreateConfiguration((KafkaComponent)
component).setSeekTo((java.lang.String) value); return true;
+ case "seekTo": getOrCreateConfiguration((KafkaComponent)
component).setSeekTo((org.apache.camel.component.kafka.SeekPolicy) value);
return true;
case "sessionTimeoutMs": getOrCreateConfiguration((KafkaComponent)
component).setSessionTimeoutMs((java.lang.Integer) value); return true;
case "specificAvroReader":
getOrCreateConfiguration((KafkaComponent)
component).setSpecificAvroReader((boolean) value); return true;
case "topicIsPattern": getOrCreateConfiguration((KafkaComponent)
component).setTopicIsPattern((boolean) value); return true;
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 647d8f04d81..31ce4492994 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -1119,10 +1119,31 @@ public interface KafkaEndpointBuilderFactory {
}
/**
* Set if KafkaConsumer will read from beginning or end on startup:
- * beginning : read from beginning end : read from end This is
replacing
- * the earlier property seekToBeginning.
+ * SeekPolicy.BEGINNING: read from beginning. SeekPolicy.END: read from
+ * end.
*
- * The option is a: <code>java.lang.String</code> type.
+ * The option is a:
+ * <code>org.apache.camel.component.kafka.SeekPolicy</code>
+ * type.
+ *
+ * Group: consumer
+ *
+ * @param seekTo the value to set
+ * @return the dsl builder
+ */
+ default KafkaEndpointConsumerBuilder seekTo(
+ org.apache.camel.component.kafka.SeekPolicy seekTo) {
+ doSetProperty("seekTo", seekTo);
+ return this;
+ }
+ /**
+ * Set if KafkaConsumer will read from beginning or end on startup:
+ * SeekPolicy.BEGINNING: read from beginning. SeekPolicy.END: read from
+ * end.
+ *
+ * The option will be converted to a
+ * <code>org.apache.camel.component.kafka.SeekPolicy</code>
+ * type.
*
* Group: consumer
*
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index 311c9bac119..4aa3b14b129 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -48,6 +48,7 @@ import org.apache.camel.model.OtherwiseDefinition;
import org.apache.camel.model.OutputDefinition;
import org.apache.camel.model.OutputTypeDefinition;
import org.apache.camel.model.PackageScanDefinition;
+import org.apache.camel.model.PausableDefinition;
import org.apache.camel.model.PipelineDefinition;
import org.apache.camel.model.PolicyDefinition;
import org.apache.camel.model.PollEnrichDefinition;
@@ -9798,6 +9799,65 @@ public final class ModelDeserializers extends
YamlDeserializerSupport {
}
}
+ @YamlType(
+ nodes = "pausable",
+ types = org.apache.camel.model.PausableDefinition.class,
+ order =
org.apache.camel.dsl.yaml.common.YamlDeserializerResolver.ORDER_LOWEST - 1,
+ properties = {
+ @YamlProperty(name = "consumer-listener", type = "string",
required = true),
+ @YamlProperty(name = "description", type = "string"),
+ @YamlProperty(name = "id", type = "string"),
+ @YamlProperty(name = "inherit-error-handler", type =
"boolean"),
+ @YamlProperty(name = "until-check", type = "string",
required = true)
+ }
+ )
+ public static class PausableDefinitionDeserializer extends
YamlDeserializerBase<PausableDefinition> {
+ public PausableDefinitionDeserializer() {
+ super(PausableDefinition.class);
+ }
+
+ @Override
+ protected PausableDefinition newInstance() {
+ return new PausableDefinition();
+ }
+
+ @Override
+ protected boolean setProperty(PausableDefinition target, String
propertyKey,
+ String propertyName, Node node) {
+ switch(propertyKey) {
+ case "consumer-listener": {
+ String val = asText(node);
+ target.setConsumerListener(val);
+ break;
+ }
+ case "inherit-error-handler": {
+ String val = asText(node);
+
target.setInheritErrorHandler(java.lang.Boolean.valueOf(val));
+ break;
+ }
+ case "until-check": {
+ String val = asText(node);
+ target.setUntilCheck(val);
+ break;
+ }
+ case "id": {
+ String val = asText(node);
+ target.setId(val);
+ break;
+ }
+ case "description": {
+ org.apache.camel.model.DescriptionDefinition val =
asType(node, org.apache.camel.model.DescriptionDefinition.class);
+ target.setDescription(val);
+ break;
+ }
+ default: {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
@YamlType(
nodes = "pipeline",
types = org.apache.camel.model.PipelineDefinition.class,
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
index bd7b148de91..77675d01f58 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
@@ -286,6 +286,8 @@ public final class ModelDeserializersResolver implements
YamlDeserializerResolve
case
"org.apache.camel.model.cloud.PassThroughServiceCallServiceFilterConfiguration":
return new
ModelDeserializers.PassThroughServiceCallServiceFilterConfigurationDeserializer();
case "patch": return new
ModelDeserializers.PatchDefinitionDeserializer();
case "org.apache.camel.model.rest.PatchDefinition": return new
ModelDeserializers.PatchDefinitionDeserializer();
+ case "pausable": return new
ModelDeserializers.PausableDefinitionDeserializer();
+ case "org.apache.camel.model.PausableDefinition": return new
ModelDeserializers.PausableDefinitionDeserializer();
case "pipeline": return new
ModelDeserializers.PipelineDefinitionDeserializer();
case "org.apache.camel.model.PipelineDefinition": return new
ModelDeserializers.PipelineDefinitionDeserializer();
case "policy": return new
ModelDeserializers.PolicyDefinitionDeserializer();
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
index 817d2942861..24e65a97bef 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
@@ -131,6 +131,9 @@
"otherwise" : {
"$ref" :
"#/items/definitions/org.apache.camel.model.OtherwiseDefinition"
},
+ "pausable" : {
+ "$ref" :
"#/items/definitions/org.apache.camel.model.PausableDefinition"
+ },
"pipeline" : {
"$ref" :
"#/items/definitions/org.apache.camel.model.PipelineDefinition"
},
@@ -1766,6 +1769,27 @@
}
}
},
+ "org.apache.camel.model.PausableDefinition" : {
+ "type" : "object",
+ "properties" : {
+ "consumer-listener" : {
+ "type" : "string"
+ },
+ "description" : {
+ "type" : "string"
+ },
+ "id" : {
+ "type" : "string"
+ },
+ "inherit-error-handler" : {
+ "type" : "boolean"
+ },
+ "until-check" : {
+ "type" : "string"
+ }
+ },
+ "required" : [ "consumer-listener", "until-check" ]
+ },
"org.apache.camel.model.PipelineDefinition" : {
"type" : "object",
"properties" : {
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json
index f5c77af2d0e..ac348b5ee5f 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json
@@ -89,6 +89,9 @@
"otherwise" : {
"$ref" :
"#/items/definitions/org.apache.camel.model.OtherwiseDefinition"
},
+ "pausable" : {
+ "$ref" :
"#/items/definitions/org.apache.camel.model.PausableDefinition"
+ },
"pipeline" : {
"$ref" :
"#/items/definitions/org.apache.camel.model.PipelineDefinition"
},
@@ -1670,6 +1673,27 @@
}
}
},
+ "org.apache.camel.model.PausableDefinition" : {
+ "type" : "object",
+ "properties" : {
+ "consumerListener" : {
+ "type" : "string"
+ },
+ "description" : {
+ "type" : "string"
+ },
+ "id" : {
+ "type" : "string"
+ },
+ "inheritErrorHandler" : {
+ "type" : "boolean"
+ },
+ "untilCheck" : {
+ "type" : "string"
+ }
+ },
+ "required" : [ "consumerListener", "untilCheck" ]
+ },
"org.apache.camel.model.PipelineDefinition" : {
"type" : "object",
"properties" : {