This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2de2764a687 CAMEL-17051: extend the Resume API to include support for 
pausable consumers
2de2764a687 is described below

commit 2de2764a687ae59d8b035a2163b269ab88850a1b
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Mar 30 16:29:35 2022 +0200

    CAMEL-17051: extend the Resume API to include support for pausable consumers
---
 .../org/apache/camel/catalog/components/kafka.json |   4 +-
 .../org/apache/camel/catalog/models.properties     |   1 +
 .../org/apache/camel/catalog/models/pausable.json  |  20 ++
 .../apache/camel/catalog/schemas/camel-spring.xsd  |  50 +++++
 components/camel-kafka/pom.xml                     |  18 ++
 .../component/kafka/KafkaComponentConfigurer.java  |   4 +-
 .../component/kafka/KafkaEndpointConfigurer.java   |   4 +-
 .../org/apache/camel/component/kafka/kafka.json    |   4 +-
 .../camel-kafka/src/main/docs/kafka-component.adoc |  16 ++
 .../camel/component/kafka/KafkaConfiguration.java  |  12 +-
 .../camel/component/kafka/KafkaConsumer.java       |  41 ++++-
 .../camel/component/kafka/KafkaFetchRecords.java   |  65 +++++--
 .../component/kafka/PollExceptionStrategy.java     |  15 ++
 ...{PollExceptionStrategy.java => SeekPolicy.java} |  19 +-
 .../consumer/errorhandler/BridgeErrorStrategy.java |   5 +
 .../errorhandler/DiscardErrorStrategy.java         |   5 +
 .../errorhandler/KafkaConsumerListener.java        |  89 +++++++++
 .../errorhandler/KafkaErrorStrategies.java         |   2 +-
 .../errorhandler/ReconnectErrorStrategy.java       |  14 +-
 .../consumer/errorhandler/RetryErrorStrategy.java  |  10 +-
 .../consumer/errorhandler/StopErrorStrategy.java   |  13 +-
 .../consumer/support/KafkaRecordProcessor.java     |   4 +-
 .../support/KafkaRecordProcessorFacade.java        |  15 +-
 .../kafka/consumer/support/ProcessingResult.java   |  11 +-
 .../consumer/support/ResumeStrategyFactory.java    |   3 +-
 .../SeekPolicyKafkaConsumerResumeStrategy.java     |   9 +-
 .../integration/BaseEmbeddedKafkaTestSupport.java  |  10 +-
 .../kafka/integration/KafkaConsumerFullIT.java     |   5 +-
 .../integration/pause/KafkaPausableConsumerIT.java | 203 +++++++++++++++++++++
 .../src/test/resources/log4j2.properties           |   2 +-
 .../java/org/apache/camel/ConsumerListener.java    |  52 ++++++
 .../org/apache/camel/ConsumerListenerAware.java    |  25 ++-
 .../src/main/java/org/apache/camel/Route.java      |   5 +
 .../org/apache/camel/impl/engine/DefaultRoute.java |  12 ++
 .../docs/modules/eips/examples/json/pausable.json  |   1 +
 .../docs/modules/eips/pages/resume-strategies.adoc |  28 ++-
 .../services/org/apache/camel/model.properties     |   1 +
 .../resources/org/apache/camel/model/jaxb.index    |   1 +
 .../resources/org/apache/camel/model/pausable.json |  20 ++
 .../org/apache/camel/model/PausableDefinition.java | 115 ++++++++++++
 .../apache/camel/model/ProcessorDefinition.java    |  58 ++++++
 .../apache/camel/processor/PausableProcessor.java  |  99 ++++++++++
 .../org/apache/camel/reifier/PausableReifier.java  |  71 +++++++
 .../org/apache/camel/reifier/ProcessorReifier.java |   3 +
 .../java/org/apache/camel/xml/in/ModelParser.java  |  11 ++
 .../dsl/KafkaComponentBuilderFactory.java          |  13 +-
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  |  27 ++-
 .../dsl/yaml/deserializers/ModelDeserializers.java |  60 ++++++
 .../deserializers/ModelDeserializersResolver.java  |   2 +
 .../src/generated/resources/camel-yaml-dsl.json    |  24 +++
 .../src/generated/resources/camelYamlDsl.json      |  24 +++
 51 files changed, 1232 insertions(+), 93 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 190096241df..b7e0f5bb104 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -54,7 +54,7 @@
     "partitionAssignor": { "kind": "property", "displayName": "Partition 
Assignor", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The class name of the partition assignme [...]
     "pollOnError": { "kind": "property", "displayName": "Poll On Error", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ 
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "de [...]
     "pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", 
"group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used when polling the 
KafkaConsumer." },
-    "seekTo": { "kind": "property", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Set if KafkaConsumer will read from beginning 
or end on startup: beginning : read from beginning  [...]
+    "seekTo": { "kind": "property", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.SeekPolicy", "enum": [ 
"BEGINNING", "END" ], "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Set if KafkaConsumer 
will read from beginning or end on startup: SeekP [...]
     "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "10000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used to detect failures when using 
Kafka's group management facilities." },
     "specificAvroReader": { "kind": "property", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Platf [...]
     "topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether the topic is a pattern (regular 
expression). This can be used to subscribe to dynamic num [...]
@@ -177,7 +177,7 @@
     "partitionAssignor": { "kind": "parameter", "displayName": "Partition 
Assignor", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The class name of the partition assignm [...]
     "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ 
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "d [...]
     "pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", 
"group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used when polling the 
KafkaConsumer." },
-    "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Set if KafkaConsumer will read from beginning 
or end on startup: beginning : read from beginning [...]
+    "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.SeekPolicy", "enum": [ 
"BEGINNING", "END" ], "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Set if KafkaConsumer 
will read from beginning or end on startup: Seek [...]
     "sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "10000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used to detect failures when using 
Kafka's group management facilities." },
     "specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Plat [...]
     "topicIsPattern": { "kind": "parameter", "displayName": "Topic Is 
Pattern", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether the topic is a pattern (regular 
expression). This can be used to subscribe to dynamic nu [...]
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models.properties
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models.properties
index c6467f167e4..f683e8e288b 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models.properties
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models.properties
@@ -113,6 +113,7 @@ packageScan
 param
 passThroughServiceFilter
 patch
+pausable
 pgp
 pipeline
 policy
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
new file mode 100644
index 00000000000..88b088805ab
--- /dev/null
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
@@ -0,0 +1,20 @@
+{
+  "model": {
+    "kind": "model",
+    "name": "pausable",
+    "title": "Pausable",
+    "description": "Pausable EIP to support resuming processing from last 
known offset.",
+    "deprecated": false,
+    "label": "eip,routing",
+    "javaType": "org.apache.camel.model.PausableDefinition",
+    "abstract": false,
+    "input": true,
+    "output": false
+  },
+  "properties": {
+    "consumerListener": { "kind": "attribute", "displayName": "Consumer 
Listener", "required": true, "type": "object", "javaType": 
"org.apache.camel.ConsumerListener", "deprecated": false, "autowired": false, 
"secret": false, "description": "Sets the consumer listener to use" },
+    "untilCheck": { "kind": "attribute", "displayName": "Until Check", 
"required": true, "type": "object", "javaType": "java.util.function.Predicate", 
"deprecated": false, "autowired": false, "secret": false },
+    "id": { "kind": "attribute", "displayName": "Id", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the id of this node" 
},
+    "description": { "kind": "element", "displayName": "Description", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.model.DescriptionDefinition", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the description of 
this node" }
+  }
+}
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index 94a28c4d2f4..72e6d5fa578 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -957,6 +957,14 @@ Rest PATCH command
     </xs:annotation>
   </xs:element>
 
+  <xs:element name="pausable" type="tns:pausableDefinition">
+    <xs:annotation>
+      <xs:documentation xml:lang="en"><![CDATA[
+Pausable EIP to support resuming processing from last known offset.
+      ]]></xs:documentation>
+    </xs:annotation>
+  </xs:element>
+
   <xs:element name="pgp" type="tns:pgpDataFormat">
     <xs:annotation>
       <xs:documentation xml:lang="en"><![CDATA[
@@ -3034,6 +3042,7 @@ will fallback to use the fixed value if the Expression 
result was null or 0.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -3538,6 +3547,7 @@ should be intercepted by this exception type or not.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -3638,6 +3648,7 @@ should be intercepted by this exception type or not.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -3735,6 +3746,7 @@ the branch that matched. Default value: false
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -3814,6 +3826,7 @@ the branch that matched. Default value: false
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -5011,6 +5024,7 @@ default WARN. Default value: WARN
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -5096,6 +5110,7 @@ Setting this allows to know if the filter predicate 
evaluated as true or false.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -5235,6 +5250,7 @@ Global option value.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -5414,6 +5430,7 @@ Whether if validation is required for this input type. 
Default value: false
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -5507,6 +5524,7 @@ configured, then all incoming messages is intercepted.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -5607,6 +5625,7 @@ process its result.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -5701,6 +5720,7 @@ specified using uri syntax, eg mynamecount=4&type=gold.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -5995,6 +6015,7 @@ To refer to a custom logger instance to lookup from the 
registry.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -8957,6 +8978,7 @@ decompressed size. Default value: 1073741824
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -9158,6 +9180,7 @@ should be invoked or not.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -9326,6 +9349,7 @@ failure. If this option is enabled then its considered 
handled as well.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -9498,6 +9522,7 @@ Default value: false
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -9601,6 +9626,22 @@ Include finding route builder from these java package 
names.
     </xs:sequence>
   </xs:complexType>
 
+  <xs:complexType name="pausableDefinition">
+    <xs:complexContent>
+      <xs:extension base="tns:noOutputDefinition">
+        <xs:sequence/>
+        <xs:attribute name="consumerListener" type="xs:string" use="required">
+          <xs:annotation>
+            <xs:documentation xml:lang="en"><![CDATA[
+Sets the consumer listener to use.
+            ]]></xs:documentation>
+          </xs:annotation>
+        </xs:attribute>
+        <xs:attribute name="untilCheck" type="xs:string" use="required"/>
+      </xs:extension>
+    </xs:complexContent>
+  </xs:complexType>
+
   <xs:complexType name="pipelineDefinition">
     <xs:complexContent>
       <xs:extension base="tns:output">
@@ -9635,6 +9676,7 @@ Include finding route builder from these java package 
names.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -9712,6 +9754,7 @@ Include finding route builder from these java package 
names.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -10256,6 +10299,7 @@ Name of property to remove.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -10563,6 +10607,7 @@ Reference to the routes in the xml dsl.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -10921,6 +10966,7 @@ compensation/completion exchange.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -11272,6 +11318,7 @@ Sets the comparator to use for sorting.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -11481,6 +11528,7 @@ individual unit of work. Default value: false
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -11974,6 +12022,7 @@ Whether to auto startup components when toD is starting 
up. Default value: true
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
@@ -12088,6 +12137,7 @@ Sets a reference to use for lookup the policy in the 
registry.
             <xs:element ref="tns:onCompletion"/>
             <xs:element ref="tns:onException"/>
             <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pausable"/>
             <xs:element ref="tns:pipeline"/>
             <xs:element ref="tns:policy"/>
             <xs:element ref="tns:pollEnrich"/>
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 7fe17b4d13a..cac4ed03e60 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -55,6 +55,18 @@
         </dependency>
 
         <!-- test -->
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-test-infra-kafka</artifactId>
@@ -64,6 +76,12 @@
         </dependency>
 
         <!-- Required by the admin client-->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-direct</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-test-junit5</artifactId>
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 0ac20776bdd..a9e413a5361 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -176,7 +176,7 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "securityprotocol":
         case "securityProtocol": 
getOrCreateConfiguration(target).setSecurityProtocol(property(camelContext, 
java.lang.String.class, value)); return true;
         case "seekto":
-        case "seekTo": 
getOrCreateConfiguration(target).setSeekTo(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "seekTo": 
getOrCreateConfiguration(target).setSeekTo(property(camelContext, 
org.apache.camel.component.kafka.SeekPolicy.class, value)); return true;
         case "sendbufferbytes":
         case "sendBufferBytes": 
getOrCreateConfiguration(target).setSendBufferBytes(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "sessiontimeoutms":
@@ -394,7 +394,7 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "securityprotocol":
         case "securityProtocol": return java.lang.String.class;
         case "seekto":
-        case "seekTo": return java.lang.String.class;
+        case "seekTo": return 
org.apache.camel.component.kafka.SeekPolicy.class;
         case "sendbufferbytes":
         case "sendBufferBytes": return java.lang.Integer.class;
         case "sessiontimeoutms":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index b36f1d003ad..be6375442c4 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -164,7 +164,7 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "securityprotocol":
         case "securityProtocol": 
target.getConfiguration().setSecurityProtocol(property(camelContext, 
java.lang.String.class, value)); return true;
         case "seekto":
-        case "seekTo": 
target.getConfiguration().setSeekTo(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "seekTo": 
target.getConfiguration().setSeekTo(property(camelContext, 
org.apache.camel.component.kafka.SeekPolicy.class, value)); return true;
         case "sendbufferbytes":
         case "sendBufferBytes": 
target.getConfiguration().setSendBufferBytes(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "sessiontimeoutms":
@@ -366,7 +366,7 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "securityprotocol":
         case "securityProtocol": return java.lang.String.class;
         case "seekto":
-        case "seekTo": return java.lang.String.class;
+        case "seekTo": return 
org.apache.camel.component.kafka.SeekPolicy.class;
         case "sendbufferbytes":
         case "sendBufferBytes": return java.lang.Integer.class;
         case "sessiontimeoutms":
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 190096241df..b7e0f5bb104 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -54,7 +54,7 @@
     "partitionAssignor": { "kind": "property", "displayName": "Partition 
Assignor", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The class name of the partition assignme [...]
     "pollOnError": { "kind": "property", "displayName": "Poll On Error", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ 
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "de [...]
     "pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", 
"group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used when polling the 
KafkaConsumer." },
-    "seekTo": { "kind": "property", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Set if KafkaConsumer will read from beginning 
or end on startup: beginning : read from beginning  [...]
+    "seekTo": { "kind": "property", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.SeekPolicy", "enum": [ 
"BEGINNING", "END" ], "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Set if KafkaConsumer 
will read from beginning or end on startup: SeekP [...]
     "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "10000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used to detect failures when using 
Kafka's group management facilities." },
     "specificAvroReader": { "kind": "property", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Platf [...]
     "topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether the topic is a pattern (regular 
expression). This can be used to subscribe to dynamic num [...]
@@ -177,7 +177,7 @@
     "partitionAssignor": { "kind": "parameter", "displayName": "Partition 
Assignor", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The class name of the partition assignm [...]
     "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ 
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "d [...]
     "pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", 
"group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used when polling the 
KafkaConsumer." },
-    "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Set if KafkaConsumer will read from beginning 
or end on startup: beginning : read from beginning [...]
+    "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.SeekPolicy", "enum": [ 
"BEGINNING", "END" ], "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Set if KafkaConsumer 
will read from beginning or end on startup: Seek [...]
     "sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "10000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used to detect failures when using 
Kafka's group management facilities." },
     "specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Plat [...]
     "topicIsPattern": { "kind": "parameter", "displayName": "Topic Is 
Pattern", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether the topic is a pattern (regular 
expression). This can be used to subscribe to dynamic nu [...]
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index c4621066a63..a56ef6e2013 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -376,6 +376,22 @@ This is mostly useful with aggregation's completion 
timeout strategies.
 If you want to use a custom implementation of `KafkaManualCommit` then you can 
configure a custom `KafkaManualCommitFactory`
 on the `KafkaComponent` that creates instances of your custom implementation.
 
+== Pausable Consumers
+
+The Kafka component supports pausable consumers. This type of consumer can 
pause consuming data based on
+conditions external to the component itself (such as an external system being 
unavailable).
+
+[source,java]
+----
+from("kafka:topic")
+    .pausable(new KafkaConsumerListener(), () -> canContinue())
+    .routeId("pausable-route")
+    .process(exchange -> LOG.info("Got record from Kafka: {}", 
exchange.getMessage().getBody()))
+    .to("some:destination");
+----
+
+In this example, consuming messages can pause (by calling the Kafka's Consumer 
pause method) if the result from `canContinue` is false.
+
 == Kafka Headers propagation
 
 When consuming messages from Kafka, headers will be propagated to camel 
exchange headers automatically.
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 0452fc33031..df624832871 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -129,8 +129,8 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     // fetch.max.wait.ms
     @UriParam(label = "consumer", defaultValue = "500")
     private Integer fetchWaitMaxMs = 500;
-    @UriParam(label = "consumer", enums = "beginning,end")
-    private String seekTo;
+    @UriParam(label = "consumer")
+    private SeekPolicy seekTo;
 
     // Consumer configuration properties
     @UriParam(label = "consumer", defaultValue = "true")
@@ -1612,15 +1612,15 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         this.valueDeserializer = valueDeserializer;
     }
 
-    public String getSeekTo() {
+    public SeekPolicy getSeekTo() {
         return seekTo;
     }
 
     /**
-     * Set if KafkaConsumer will read from beginning or end on startup: 
beginning : read from beginning end : read from
-     * end This is replacing the earlier property seekToBeginning
+     * Set if KafkaConsumer will read from beginning or end on startup: 
SeekPolicy.BEGINNING: read from beginning.
+     * SeekPolicy.END: read from end.
      */
-    public void setSeekTo(String seekTo) {
+    public void setSeekTo(SeekPolicy seekTo) {
         this.seekTo = seekTo;
     }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 107827870c0..8b65d3fae04 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -24,8 +24,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
+import org.apache.camel.ConsumerListenerAware;
 import org.apache.camel.Processor;
 import org.apache.camel.ResumeAware;
+import org.apache.camel.Suspendable;
+import 
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
 import 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import org.apache.camel.health.HealthCheckAware;
 import org.apache.camel.health.HealthCheckHelper;
@@ -39,7 +42,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaConsumer extends DefaultConsumer implements 
ResumeAware<KafkaConsumerResumeStrategy>, HealthCheckAware {
+public class KafkaConsumer extends DefaultConsumer
+        implements ResumeAware<KafkaConsumerResumeStrategy>, HealthCheckAware, 
ConsumerListenerAware<KafkaConsumerListener>,
+        Suspendable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumer.class);
 
@@ -51,6 +56,7 @@ public class KafkaConsumer extends DefaultConsumer implements 
ResumeAware<KafkaC
     private final List<KafkaFetchRecords> tasks = new ArrayList<>();
     private volatile boolean stopOffsetRepo;
     private KafkaConsumerResumeStrategy resumeStrategy;
+    private KafkaConsumerListener consumerListener;
 
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -67,6 +73,16 @@ public class KafkaConsumer extends DefaultConsumer 
implements ResumeAware<KafkaC
         return resumeStrategy;
     }
 
+    @Override
+    public KafkaConsumerListener getConsumerListener() {
+        return consumerListener;
+    }
+
+    @Override
+    public void setConsumerListener(KafkaConsumerListener consumerListener) {
+        this.consumerListener = consumerListener;
+    }
+
     @Override
     protected void doBuild() throws Exception {
         super.doBuild();
@@ -140,7 +156,8 @@ public class KafkaConsumer extends DefaultConsumer 
implements ResumeAware<KafkaC
         BridgeExceptionHandlerToErrorHandler bridge = new 
BridgeExceptionHandlerToErrorHandler(this);
         for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); 
i++) {
             KafkaFetchRecords task = new KafkaFetchRecords(
-                    this, bridge, topic, pattern, i + "", getProps());
+                    this, bridge, topic, pattern, i + "", getProps(), 
consumerListener);
+
             executor.submit(task);
 
             tasks.add(task);
@@ -196,4 +213,24 @@ public class KafkaConsumer extends DefaultConsumer 
implements ResumeAware<KafkaC
 
         super.doStop();
     }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        for (KafkaFetchRecords task : tasks) {
+            LOG.info("Pausing Kafka record fetcher task running client ID {}", 
task.getClientId());
+            task.pause();
+        }
+
+        super.doSuspend();
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        for (KafkaFetchRecords task : tasks) {
+            LOG.info("Resuming Kafka record fetcher task running client ID 
{}", task.getClientId());
+            task.resume();
+        }
+
+        super.doResume();
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 19321dede6b..0c54ea23363 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
 
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManagers;
+import 
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
 import 
org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies;
 import 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
@@ -64,20 +65,21 @@ public class KafkaFetchRecords implements Runnable {
     private final ReentrantLock lock = new ReentrantLock();
     private CommitManager commitManager;
     private Exception lastError;
+    private final KafkaConsumerListener consumerListener;
 
     private boolean terminated;
     private long currentBackoffInterval;
-    private boolean retry = true;
     private boolean reconnect; // must be false at init (this is the policy 
whether to reconnect)
     private boolean connected; // this is the state (connected or not)
 
     KafkaFetchRecords(KafkaConsumer kafkaConsumer,
                       BridgeExceptionHandlerToErrorHandler bridge, String 
topicName, Pattern topicPattern, String id,
-                      Properties kafkaProps) {
+                      Properties kafkaProps, KafkaConsumerListener 
consumerListener) {
         this.kafkaConsumer = kafkaConsumer;
         this.bridge = bridge;
         this.topicName = topicName;
         this.topicPattern = topicPattern;
+        this.consumerListener = consumerListener;
         this.threadId = topicName + "-" + "Thread " + id;
         this.kafkaProps = kafkaProps;
 
@@ -140,7 +142,7 @@ public class KafkaFetchRecords implements Runnable {
 
             lastError = null;
             startPolling();
-        } while ((isRetrying() || isReconnect()) && isKafkaConsumerRunnable());
+        } while ((pollExceptionStrategy.canContinue() || isReconnect()) && 
isKafkaConsumerRunnable());
 
         if (LOG.isInfoEnabled()) {
             LOG.info("Terminating KafkaConsumer thread {} receiving from {}", 
threadId, getPrintableTopic());
@@ -188,6 +190,16 @@ public class KafkaFetchRecords implements Runnable {
             commitManager
                     = CommitManagers.createCommitManager(consumer, 
kafkaConsumer, threadId, getPrintableTopic());
 
+            if (consumerListener != null) {
+                consumerListener.setConsumer(consumer);
+
+                SeekPolicy seekPolicy = 
kafkaConsumer.getEndpoint().getComponent().getConfiguration().getSeekTo();
+                if (seekPolicy == null) {
+                    seekPolicy = SeekPolicy.BEGINNING;
+                }
+
+                consumerListener.setSeekPolicy(seekPolicy);
+            }
         } catch (Exception e) {
             setConnected(false);
             // ensure this is logged so users can see the problem
@@ -240,8 +252,7 @@ public class KafkaFetchRecords implements Runnable {
         // set reconnect to false as the connection and resume is done at this 
point
         setConnected(false);
 
-        // set retry to true to continue polling
-        setRetry(true);
+        pollExceptionStrategy.reset();
     }
 
     private void subscribe() {
@@ -280,22 +291,26 @@ public class KafkaFetchRecords implements Runnable {
             }
 
             KafkaRecordProcessorFacade recordProcessorFacade = new 
KafkaRecordProcessorFacade(
-                    kafkaConsumer, lastProcessedOffset, threadId, 
commitManager);
+                    kafkaConsumer, lastProcessedOffset, threadId, 
commitManager, consumerListener);
 
             Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
-            while (isKafkaConsumerRunnable() && isRetrying() && isConnected()) 
{
+            while (isKafkaConsumerRunnable() && isConnected() && 
pollExceptionStrategy.canContinue()) {
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
+                if (consumerListener != null) {
+                    if (!consumerListener.afterConsume(consumer)) {
+                        continue;
+                    }
+                }
 
                 commitManager.processAsyncCommits();
 
-                ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords);
+                ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords, consumer);
 
                 if (result.isBreakOnErrorHit()) {
                     LOG.debug("We hit an error ... setting flags to force 
reconnect");
                     // force re-connect
                     setReconnect(true);
                     setConnected(false);
-                    setRetry(false); // to close the current consumer
                 }
 
             }
@@ -334,7 +349,7 @@ public class KafkaFetchRecords implements Runnable {
             pollExceptionStrategy.handle(partitionLastOffset, e);
         } finally {
             // only close if not retry
-            if (!isRetrying()) {
+            if (!pollExceptionStrategy.canContinue()) {
                 LOG.debug("Closing consumer {}", threadId);
                 safeUnsubscribe();
                 IOHelper.close(consumer);
@@ -382,14 +397,6 @@ public class KafkaFetchRecords implements Runnable {
         return kafkaConsumer.getEndpoint().getCamelContext().isStopping() && 
!kafkaConsumer.isRunAllowed();
     }
 
-    private boolean isRetrying() {
-        return retry;
-    }
-
-    public void setRetry(boolean value) {
-        retry = value;
-    }
-
     private boolean isReconnect() {
         return reconnect;
     }
@@ -442,6 +449,10 @@ public class KafkaFetchRecords implements Runnable {
         return connected;
     }
 
+    public boolean isPaused() {
+        return !consumer.paused().isEmpty();
+    }
+
     public void setConnected(boolean connected) {
         this.connected = connected;
     }
@@ -489,7 +500,7 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     boolean isRecoverable() {
-        return (isRetrying() || isReconnect()) && isKafkaConsumerRunnable();
+        return (pollExceptionStrategy.canContinue() || isReconnect()) && 
isKafkaConsumerRunnable();
     }
 
     long getCurrentRecoveryInterval() {
@@ -499,4 +510,20 @@ public class KafkaFetchRecords implements Runnable {
     public BridgeExceptionHandlerToErrorHandler getBridge() {
         return bridge;
     }
+
+    /*
+     * This is for manually pausing the consumer. This is mostly used for 
directly calling pause from Java code
+     * or via JMX
+     */
+    public void pause() {
+        consumer.pause(consumer.assignment());
+    }
+
+    /*
+     * This is for manually resuming the consumer (not to be confused w/ the 
Resume API). This is
+     * mostly used for directly calling resume from Java code or via JMX
+     */
+    public void resume() {
+        consumer.resume(consumer.assignment());
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
index a2c6fef690d..019c0849724 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
@@ -24,6 +24,21 @@ package org.apache.camel.component.kafka;
  */
 public interface PollExceptionStrategy {
 
+    /**
+     * Reset any error flags set by a previous error condition
+     */
+    default void reset() {
+
+    }
+
+    /**
+     * This method provides an "answer" to whether the consumer can continue 
polling or not. This is specific to each
+     * polling exception strategy and must be implemented accordingly
+     * 
+     * @return true if polling should continue or false otherwise
+     */
+    boolean canContinue();
+
     /**
      * Controls how to handle the exception while polling from Kafka.
      *
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/SeekPolicy.java
similarity index 52%
copy from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
copy to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/SeekPolicy.java
index a2c6fef690d..2433fe89b9f 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/SeekPolicy.java
@@ -14,21 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.camel.component.kafka;
 
 /**
- * Strategy to decide when a Kafka exception was thrown during polling, how to 
handle this. For example by re-connecting
- * and polling the same message again, by stopping the consumer (allows to 
re-balance and let another consumer try), or
- * to let Camel route the message as an exception which allows Camel error 
handling to handle the exception, or to
- * discard this message and poll the next message.
+ * BEGINNING configures the consumer to consume from the beginning of the 
topic/partition. END configures the consumer
+ * to consume from the end of the topic/partition
  */
-public interface PollExceptionStrategy {
-
-    /**
-     * Controls how to handle the exception while polling from Kafka.
-     *
-     * @param  exception the caused exception which typically would be a 
{@link org.apache.kafka.common.KafkaException}
-     * @return           how to handle the exception
-     */
-    void handle(long partitionLastOffset, Exception exception);
+public enum SeekPolicy {
+    BEGINNING,
+    END
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
index 2141baf5e8d..fdc2c49b0fd 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
@@ -33,6 +33,11 @@ public class BridgeErrorStrategy implements 
PollExceptionStrategy {
         this.consumer = consumer;
     }
 
+    @Override
+    public boolean canContinue() {
+        return true;
+    }
+
     @Override
     public void handle(long partitionLastOffset, Exception exception) {
         LOG.warn("Deferring processing to the exception handler based on 
polling exception strategy");
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
index 7387b85d675..d5424003f00 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
@@ -30,6 +30,11 @@ public class DiscardErrorStrategy implements 
PollExceptionStrategy {
         this.consumer = consumer;
     }
 
+    @Override
+    public boolean canContinue() {
+        return true;
+    }
+
     @Override
     public void handle(long partitionLastOffset, Exception exception) {
         LOG.warn("Requesting the consumer to discard the message and continue 
to the next based on polling exception strategy");
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
new file mode 100644
index 00000000000..44f9f6337ed
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.errorhandler;
+
+import java.util.function.Predicate;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.component.kafka.SeekPolicy;
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConsumerListener implements ConsumerListener<Object, 
ProcessingResult> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerListener.class);
+    private Consumer<?, ?> consumer;
+    private SeekPolicy seekPolicy;
+
+    private Predicate<?> afterConsumeEval;
+
+    public Consumer<?, ?> getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(Consumer<?, ?> consumer) {
+        this.consumer = consumer;
+    }
+
+    public SeekPolicy getSeekPolicy() {
+        return seekPolicy;
+    }
+
+    public void setSeekPolicy(SeekPolicy seekPolicy) {
+        this.seekPolicy = seekPolicy;
+    }
+
+    @Override
+    public void setResumableCheck(Predicate<?> afterConsumeEval) {
+        this.afterConsumeEval = afterConsumeEval;
+    }
+
+    @Override
+    public boolean afterConsume(@SuppressWarnings("unused") Object ignored) {
+        if (afterConsumeEval.test(null)) {
+            LOG.warn("State changed, therefore resuming the consumer");
+            consumer.resume(consumer.assignment());
+
+            return true;
+        }
+
+        LOG.warn("The consumer is not yet resumable");
+        return false;
+    }
+
+    @Override
+    public boolean afterProcess(ProcessingResult result) {
+        if (result.isFailed()) {
+            LOG.warn("Pausing consumer due to error on the last processing");
+            consumer.pause(consumer.assignment());
+
+            if (seekPolicy == SeekPolicy.BEGINNING) {
+                LOG.debug("Seeking from the beginning of topic");
+                consumer.seekToBeginning(consumer.assignment());
+            } else if (seekPolicy == SeekPolicy.END) {
+                LOG.debug("Seeking from the end off the topic");
+                consumer.seekToEnd(consumer.assignment());
+            }
+
+            return false;
+        }
+
+        return true;
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
index b6b9f4acea9..3541e08dbce 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaErrorStrategies.java
@@ -41,7 +41,7 @@ public final class KafkaErrorStrategies {
 
         switch (onError) {
             case RETRY:
-                return new RetryErrorStrategy(recordFetcher);
+                return new RetryErrorStrategy();
             case RECONNECT:
                 return new ReconnectErrorStrategy(recordFetcher);
             case ERROR_HANDLER:
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
index ddac560720e..0d54cbe0706 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
@@ -26,10 +26,22 @@ public class ReconnectErrorStrategy implements 
PollExceptionStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(ReconnectErrorStrategy.class);
     private KafkaFetchRecords recordFetcher;
 
+    private boolean retry = true;
+
     public ReconnectErrorStrategy(KafkaFetchRecords recordFetcher) {
         this.recordFetcher = recordFetcher;
     }
 
+    @Override
+    public void reset() {
+        retry = true;
+    }
+
+    @Override
+    public boolean canContinue() {
+        return retry;
+    }
+
     @Override
     public void handle(long partitionLastOffset, Exception exception) {
         LOG.warn("Requesting the consumer to re-connect on the next run based 
on polling exception strategy");
@@ -39,6 +51,6 @@ public class ReconnectErrorStrategy implements 
PollExceptionStrategy {
         recordFetcher.setConnected(false);
 
         // to close the current consumer
-        recordFetcher.setRetry(false);
+        retry = false;
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/RetryErrorStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/RetryErrorStrategy.java
index 46ddcb761a8..3df6f43c18d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/RetryErrorStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/RetryErrorStrategy.java
@@ -17,24 +17,20 @@
 
 package org.apache.camel.component.kafka.consumer.errorhandler;
 
-import org.apache.camel.component.kafka.KafkaFetchRecords;
 import org.apache.camel.component.kafka.PollExceptionStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RetryErrorStrategy implements PollExceptionStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(RetryErrorStrategy.class);
-    private KafkaFetchRecords recordFetcher;
 
-    public RetryErrorStrategy(KafkaFetchRecords recordFetcher) {
-        this.recordFetcher = recordFetcher;
+    @Override
+    public boolean canContinue() {
+        return true;
     }
 
     @Override
     public void handle(long partitionLastOffset, Exception exception) {
         LOG.warn("Requesting the consumer to retry polling the same message 
based on polling exception strategy");
-
-        // consumer retry the same message again
-        recordFetcher.setRetry(true);
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
index b330a368173..6258ad3ac63 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
@@ -25,17 +25,28 @@ import org.slf4j.LoggerFactory;
 public class StopErrorStrategy implements PollExceptionStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(StopErrorStrategy.class);
     private KafkaFetchRecords recordFetcher;
+    private boolean retry = true;
 
     public StopErrorStrategy(KafkaFetchRecords recordFetcher) {
         this.recordFetcher = recordFetcher;
     }
 
+    @Override
+    public void reset() {
+        retry = true;
+    }
+
+    @Override
+    public boolean canContinue() {
+        return retry;
+    }
+
     @Override
     public void handle(long partitionLastOffset, Exception exception) {
         // stop and terminate consumer
         LOG.warn("Requesting the consumer to stop based on polling exception 
strategy");
 
-        recordFetcher.setRetry(false);
+        retry = false;
         recordFetcher.setConnected(false);
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index df288d78388..723cd0f4e96 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -113,10 +113,10 @@ public class KafkaRecordProcessor {
             boolean breakOnErrorExit = processException(exchange, partition, 
lastResult.getPartitionLastOffset(),
                     exceptionHandler);
 
-            return new ProcessingResult(breakOnErrorExit, 
lastResult.getPartitionLastOffset());
+            return new ProcessingResult(breakOnErrorExit, 
lastResult.getPartitionLastOffset(), true);
         }
 
-        return new ProcessingResult(false, record.offset());
+        return new ProcessingResult(false, record.offset(), 
exchange.getException() != null);
     }
 
     private boolean processException(
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 10d893ef15f..0ced3260246 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -25,6 +25,8 @@ import java.util.Set;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.kafka.KafkaConsumer;
 import org.apache.camel.component.kafka.consumer.CommitManager;
+import 
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
@@ -41,23 +43,24 @@ public class KafkaRecordProcessorFacade {
     private final String threadId;
     private final KafkaRecordProcessor kafkaRecordProcessor;
     private final CommitManager commitManager;
+    private final KafkaConsumerListener consumerListener;
 
     public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, 
Map<String, Long> lastProcessedOffset, String threadId,
-                                      CommitManager commitManager) {
+                                      CommitManager commitManager, 
KafkaConsumerListener consumerListener) {
         this.camelKafkaConsumer = camelKafkaConsumer;
         this.lastProcessedOffset = lastProcessedOffset;
         this.threadId = threadId;
         this.commitManager = commitManager;
 
         kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager);
-
+        this.consumerListener = consumerListener;
     }
 
     private boolean isStopping() {
         return camelKafkaConsumer.isStopping();
     }
 
-    public ProcessingResult processPolledRecords(ConsumerRecords<Object, 
Object> allRecords) {
+    public ProcessingResult processPolledRecords(ConsumerRecords<Object, 
Object> allRecords, Consumer<?, ?> consumer) {
         logRecords(allRecords);
 
         Set<TopicPartition> partitions = allRecords.partitions();
@@ -79,6 +82,12 @@ public class KafkaRecordProcessorFacade {
 
                 lastResult = processRecord(partition, 
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
                         kafkaRecordProcessor, record);
+
+                if (consumerListener != null) {
+                    if (!consumerListener.afterProcess(lastResult)) {
+                        return lastResult;
+                    }
+                }
             }
 
             if (!lastResult.isBreakOnErrorHit()) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index 0845bcab343..36f0c69c8b2 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -20,14 +20,17 @@ package org.apache.camel.component.kafka.consumer.support;
 import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
 
 public final class ProcessingResult {
-    private static final ProcessingResult UNPROCESSED_RESULT = new 
ProcessingResult(false, AbstractCommitManager.START_OFFSET);
+    private static final ProcessingResult UNPROCESSED_RESULT
+            = new ProcessingResult(false, AbstractCommitManager.START_OFFSET, 
false);
 
     private final boolean breakOnErrorHit;
     private final long partitionLastOffset;
+    private final boolean failed;
 
-    ProcessingResult(boolean breakOnErrorHit, long partitionLastOffset) {
+    ProcessingResult(boolean breakOnErrorHit, long partitionLastOffset, 
boolean failed) {
         this.breakOnErrorHit = breakOnErrorHit;
         this.partitionLastOffset = partitionLastOffset;
+        this.failed = failed;
     }
 
     public boolean isBreakOnErrorHit() {
@@ -38,6 +41,10 @@ public final class ProcessingResult {
         return partitionLastOffset;
     }
 
+    public boolean isFailed() {
+        return failed;
+    }
+
     public static ProcessingResult newUnprocessed() {
         return UNPROCESSED_RESULT;
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index c4fdb6d12b9..3784b4c5c15 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConsumer;
+import org.apache.camel.component.kafka.SeekPolicy;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
@@ -65,7 +66,7 @@ public final class ResumeStrategyFactory {
     private static KafkaConsumerResumeStrategy 
builtinResumeStrategies(KafkaConfiguration configuration) {
         LOG.debug("No resume strategy was provided ... checking for built-ins 
...");
         StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
-        String seekTo = configuration.getSeekTo();
+        SeekPolicy seekTo = configuration.getSeekTo();
 
         if (offsetRepository != null) {
             LOG.info("Using resume from offset strategy");
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
index 91db91b85ed..b877763854e 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka.consumer.support;
 
+import org.apache.camel.component.kafka.SeekPolicy;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,10 +28,10 @@ public class SeekPolicyKafkaConsumerResumeStrategy 
implements KafkaConsumerResum
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class);
 
-    private final String seekPolicy;
+    private final SeekPolicy seekPolicy;
     private Consumer<?, ?> consumer;
 
-    public SeekPolicyKafkaConsumerResumeStrategy(String seekPolicy) {
+    public SeekPolicyKafkaConsumerResumeStrategy(SeekPolicy seekPolicy) {
         this.seekPolicy = seekPolicy;
     }
 
@@ -41,10 +42,10 @@ public class SeekPolicyKafkaConsumerResumeStrategy 
implements KafkaConsumerResum
 
     @Override
     public void resume() {
-        if (seekPolicy.equals("beginning")) {
+        if (seekPolicy == SeekPolicy.BEGINNING) {
             LOG.debug("Seeking from the beginning of topic");
             consumer.seekToBeginning(consumer.assignment());
-        } else if (seekPolicy.equals("end")) {
+        } else if (seekPolicy == SeekPolicy.END) {
             LOG.debug("Seeking from the end off the topic");
             consumer.seekToEnd(consumer.assignment());
         }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
index ff427f21e7f..d044dc1b708 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
@@ -57,7 +57,7 @@ public abstract class BaseEmbeddedKafkaTestSupport extends 
CamelTestSupport {
         }
     }
 
-    protected Properties getDefaultProperties() {
+    public static Properties getDefaultProperties(KafkaService service) {
         LOG.info("Connecting to Kafka {}", service.getBootstrapServers());
 
         Properties props = new Properties();
@@ -69,6 +69,10 @@ public abstract class BaseEmbeddedKafkaTestSupport extends 
CamelTestSupport {
         return props;
     }
 
+    protected Properties getDefaultProperties() {
+        return getDefaultProperties(service);
+    }
+
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
@@ -87,6 +91,10 @@ public abstract class BaseEmbeddedKafkaTestSupport extends 
CamelTestSupport {
     }
 
     private static AdminClient createAdminClient() {
+        return createAdminClient(service);
+    }
+
+    public static AdminClient createAdminClient(KafkaService service) {
         final Properties properties = new Properties();
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
service.getBootstrapServers());
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index 6a3d4af2303..46e304d0613 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.KafkaEndpoint;
 import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.SeekPolicy;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -165,7 +166,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
         context.getRouteController().stopRoute("full-it");
 
         KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from;
-        kafkaEndpoint.getConfiguration().setSeekTo("beginning");
+        kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.BEGINNING);
 
         context.getRouteController().startRoute("full-it");
 
@@ -193,7 +194,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
         context.getRouteController().stopRoute("full-it");
 
         KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from;
-        kafkaEndpoint.getConfiguration().setSeekTo("end");
+        kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.END);
 
         context.getRouteController().startRoute("full-it");
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
new file mode 100644
index 00000000000..7ed6fdeb57f
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration.pause;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import 
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import 
org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport {
+    // Just a wrapper for us to check if the expected methods are being called
+    private static class TestListener extends KafkaConsumerListener {
+        boolean afterConsumeCalled;
+        boolean afterProcessCalled;
+
+        @Override
+        public boolean afterConsume(Object ignored) {
+            afterConsumeCalled = true;
+            return super.afterConsume(ignored);
+        }
+
+        @Override
+        public boolean afterProcess(ProcessingResult result) {
+            afterProcessCalled = true;
+            return super.afterProcess(result);
+        }
+    }
+
+    public static final String SOURCE_TOPIC = "pause-source";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaPausableConsumerIT.class);
+
+    private static final int RETRY_COUNT = 10;
+    private LongAdder count = new LongAdder();
+
+    @EndpointInject("kafka:" + SOURCE_TOPIC
+                    + 
"?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+
+    @EndpointInject("direct:intermediate")
+    private Endpoint intermediate;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    private ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+
+    private TestListener testConsumerListener = new TestListener();
+
+    @BeforeEach
+    public void before() {
+        Properties props = 
BaseEmbeddedKafkaTestSupport.getDefaultProperties(service);
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+
+        executorService.scheduleAtFixedRate(this::increment, 5, 1, 
TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        BaseEmbeddedKafkaTestSupport.createAdminClient(service)
+                .deleteTopics(Collections.singletonList(SOURCE_TOPIC)).all();
+
+        executorService.shutdownNow();
+    }
+
+    private boolean canContinue() {
+        // First one should go through ...
+        if (count.intValue() <= 1) {
+            return true;
+        }
+
+        if (count.intValue() >= RETRY_COUNT) {
+            return true;
+        }
+
+        return false;
+    }
+
+    public void increment() {
+        count.increment();
+    }
+
+    public int getCount() {
+        return count.intValue();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .pausable(testConsumerListener, o -> canContinue())
+                        .routeId("pausable-it")
+                        .process(exchange -> LOG.info("Got record from Kafka: 
{}", exchange.getMessage().getBody()))
+                        .to(intermediate);
+
+                from(intermediate)
+                        .process(exchange -> {
+                            LOG.info("Got record on the intermediate 
processor: {}", exchange.getMessage().getBody());
+
+                            if (getCount() <= RETRY_COUNT) {
+                                throw new RuntimeCamelException("Error");
+                            }
+                        })
+                        .to(to);
+            }
+        };
+    }
+
+    @Test
+    @Timeout(value = 1, unit = TimeUnit.MINUTES)
+    public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        String skippedHeaderKey = "CamelSkippedHeader";
+
+        // Although all messages will be sent more than once to the exception 
only 5 messages should reach the final
+        // destination, because sending them on the first few tries should fail
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
+
+        // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any
+        // exchange because autoCommitEnable=true
+        
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
 null, null, null, null, null);
+        to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new 
ProducerRecord<>(SOURCE_TOPIC, "1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped 
header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, 
propagatedHeaderValue));
+            producer.send(data);
+        }
+
+        await().atMost(30, TimeUnit.SECONDS).untilAdder(count, 
greaterThan(10L));
+
+        assertTrue(testConsumerListener.afterConsumeCalled, "The afterConsume 
method should have been called");
+        assertTrue(testConsumerListener.afterProcessCalled, "The afterProcess 
method should have been called");
+
+        to.assertIsSatisfied();
+        assertEquals(5, to.getExchanges().size(), "Did not receive the 
expected amount of messages");
+
+        Map<String, Object> headers = 
to.getExchanges().get(0).getIn().getHeaders();
+        assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive 
skipped header");
+        assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive 
propagated header");
+    }
+}
diff --git a/components/camel-kafka/src/test/resources/log4j2.properties 
b/components/camel-kafka/src/test/resources/log4j2.properties
index 61f9e8e29b4..d1ad676c533 100644
--- a/components/camel-kafka/src/test/resources/log4j2.properties
+++ b/components/camel-kafka/src/test/resources/log4j2.properties
@@ -41,7 +41,7 @@ logger.resume.name=org.apache.camel.processor.resume.kafka
 logger.resume.level=INFO
 
 logger.kafka.name=org.apache.kafka
-logger.kafka.level=INFO
+logger.kafka.level=WARN
 
 # To verify the ResumableCompletion onFailure
 # logger.resume-processor.name=org.apache.camel.processor.resume
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java 
b/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
new file mode 100644
index 00000000000..d92808e5d2a
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel;
+
+import java.util.function.Predicate;
+
+/**
+ * An interface for listening to consumer events and allow proxying between a 
consumer predicate and the Camel
+ * component. The whole of the consumer predicate is that of evaluating 
whether the consumption (from the internal Camel
+ * consumer) can continue or should be put on pause.
+ */
+public interface ConsumerListener<C, P> {
+
+    /**
+     * This sets the predicate responsible for evaluating whether the 
processing can resume or not. Such predicate
+     * should return true if the consumption can resume, or false otherwise. 
The exact point of when the predicate is
+     * called is dependent on the component, and it may be called on either 
one of the available events. Implementations
+     * should not assume the predicate to be called at any specific point.
+     */
+    void setResumableCheck(Predicate<?> afterConsumeEval);
+
+    /**
+     * This is an event that runs after data consumption.
+     * 
+     * @param  consumePayload the resume payload if any
+     * @return                true if the consumer should processing or false 
otherwise.
+     */
+    boolean afterConsume(C consumePayload);
+
+    /**
+     * This is an event that runs after data processing.
+     * 
+     * @param  processingPayload the resume payload if any
+     * @return                   true if the consumer should continue or false 
otherwise.
+     */
+    boolean afterProcess(P processingPayload);
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
 b/core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
similarity index 51%
copy from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
index a2c6fef690d..bbd24c59e47 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
@@ -14,21 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kafka;
+
+package org.apache.camel;
 
 /**
- * Strategy to decide when a Kafka exception was thrown during polling, how to 
handle this. For example by re-connecting
- * and polling the same message again, by stopping the consumer (allows to 
re-balance and let another consumer try), or
- * to let Camel route the message as an exception which allows Camel error 
handling to handle the exception, or to
- * discard this message and poll the next message.
+ * An interface to represent an object which wishes to support listening for 
consumer events using the
+ * {@link ConsumerListener}.
  */
-public interface PollExceptionStrategy {
+public interface ConsumerListenerAware<T extends ConsumerListener<?, ?>> {
+
+    /**
+     * Injects the {@link ConsumerListener} instance into the object
+     *
+     * @param consumerListener the consumer listener instance
+     */
+    void setConsumerListener(T consumerListener);
 
     /**
-     * Controls how to handle the exception while polling from Kafka.
+     * Gets the {@link ConsumerListener} instance
      *
-     * @param  exception the caused exception which typically would be a 
{@link org.apache.kafka.common.KafkaException}
-     * @return           how to handle the exception
+     * @return the consumer listener instance
      */
-    void handle(long partitionLastOffset, Exception exception);
+    T getConsumerListener();
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java 
b/core/camel-api/src/main/java/org/apache/camel/Route.java
index 2d4e0cda76b..0e6f90882c5 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Route.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Route.java
@@ -378,4 +378,9 @@ public interface Route extends RuntimeConfiguration {
      */
     void setResumeStrategy(ResumeStrategy resumeStrategy);
 
+    /**
+     * Sets the consumer listener for the route
+     */
+    void setConsumerListener(ConsumerListener<?, ?> consumerListener);
+
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 5c39c860bea..8a8fc6a10d5 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.ConsumerListenerAware;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.NamedNode;
@@ -90,6 +92,7 @@ public class DefaultRoute extends ServiceSupport implements 
Route {
     private final Map<String, Processor> onCompletions = new HashMap<>();
     private final Map<String, Processor> onExceptions = new HashMap<>();
     private ResumeStrategy resumeStrategy;
+    private ConsumerListener<?, ?> consumerListener;
 
     // camel-core-model
     @Deprecated
@@ -634,6 +637,10 @@ public class DefaultRoute extends ServiceSupport 
implements Route {
             if (consumer instanceof ResumeAware) {
                 ((ResumeAware) consumer).setResumeStrategy(resumeStrategy);
             }
+
+            if (consumer instanceof ConsumerListenerAware) {
+                ((ConsumerListenerAware) 
consumer).setConsumerListener(consumerListener);
+            }
         }
         if (processor instanceof Service) {
             services.add((Service) processor);
@@ -711,4 +718,9 @@ public class DefaultRoute extends ServiceSupport implements 
Route {
     public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
+
+    @Override
+    public void setConsumerListener(ConsumerListener<?, ?> consumerListener) {
+        this.consumerListener = consumerListener;
+    }
 }
diff --git 
a/core/camel-core-engine/src/main/docs/modules/eips/examples/json/pausable.json 
b/core/camel-core-engine/src/main/docs/modules/eips/examples/json/pausable.json
new file mode 120000
index 00000000000..f174daeaac5
--- /dev/null
+++ 
b/core/camel-core-engine/src/main/docs/modules/eips/examples/json/pausable.json
@@ -0,0 +1 @@
+../../../../../../../../camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
\ No newline at end of file
diff --git 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index 214157b9105..606a1f813b5 100644
--- 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -2,7 +2,7 @@
 :doctitle: Resume Strategies
 :shortname: resume
 :description: Provide strategies to allow consuming data from specific offsets
-:since:
+:since: 3.16.0
 :supportlevel: Experimental
 
 The resume strategies allow users to implement strategies that point the 
consumer part of the routes to the last point of consumption. This allows Camel 
to skip reading and processing data that has already been consumed.
@@ -191,3 +191,29 @@ 
from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
 ----
 
 *Reason*: the `skip` method in the Reader will skip characters, whereas the 
same method on the InputStream will skip bytes.
+
+
+== Pausable Consumers API
+
+The Pausable consumers API is a subset of the resume API that provides pause 
and resume features for supported components.
+With this API it is possible to implement logic that controls the behavior of 
the consumer based on conditions that are
+external to the component. For instance, it makes it possible to pause the 
consumer if an external system becomes unavailable.
+
+Currently, support for pausable consumers is available for the following 
components:
+
+* xref:components::kafka-component.adoc[camel-kafka]
+
+To use the API, it needs an instance of a Consumer listener along with a 
predicate that tests whether to continue.
+
+* `org.apache.camel.ConsumerListener` - the consumer listener interface. Camel 
already comes with pre-built consumer listeners, but users in need of more 
complex behaviors can create their own listeners.
+* a predicate that returns true if data consumption should resume or false if 
consumption should be put on pause
+
+Usage example:
+
+[source,java]
+----
+from(from)
+    .pausable(new KafkaConsumerListener(), o -> canContinue())
+    .process(exchange -> LOG.info("Received an exchange: {}", 
exchange.getMessage().getBody()))
+    .to(destination);
+----
diff --git 
a/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties
 
b/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties
index aaf9b075198..84d6b4b872a 100644
--- 
a/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties
+++ 
b/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties
@@ -108,6 +108,7 @@ packageScan
 param
 passThroughServiceFilter
 patch
+pausable
 pgp
 pipeline
 policy
diff --git 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index
 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index
index 4271e558cb8..6fb29c0d233 100644
--- 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index
+++ 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index
@@ -45,6 +45,7 @@ OtherwiseDefinition
 OutputDefinition
 OutputTypeDefinition
 PackageScanDefinition
+PausableDefinition
 PipelineDefinition
 PolicyDefinition
 PollEnrichDefinition
diff --git 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
new file mode 100644
index 00000000000..88b088805ab
--- /dev/null
+++ 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
@@ -0,0 +1,20 @@
+{
+  "model": {
+    "kind": "model",
+    "name": "pausable",
+    "title": "Pausable",
+    "description": "Pausable EIP to support resuming processing from last 
known offset.",
+    "deprecated": false,
+    "label": "eip,routing",
+    "javaType": "org.apache.camel.model.PausableDefinition",
+    "abstract": false,
+    "input": true,
+    "output": false
+  },
+  "properties": {
+    "consumerListener": { "kind": "attribute", "displayName": "Consumer 
Listener", "required": true, "type": "object", "javaType": 
"org.apache.camel.ConsumerListener", "deprecated": false, "autowired": false, 
"secret": false, "description": "Sets the consumer listener to use" },
+    "untilCheck": { "kind": "attribute", "displayName": "Until Check", 
"required": true, "type": "object", "javaType": "java.util.function.Predicate", 
"deprecated": false, "autowired": false, "secret": false },
+    "id": { "kind": "attribute", "displayName": "Id", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the id of this node" 
},
+    "description": { "kind": "element", "displayName": "Description", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.model.DescriptionDefinition", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the description of 
this node" }
+  }
+}
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
new file mode 100644
index 00000000000..d6f1bdfcaf2
--- /dev/null
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model;
+
+import java.util.function.Predicate;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.spi.Metadata;
+
+/**
+ * Pausable EIP to support resuming processing from last known offset.
+ */
+@Metadata(label = "eip,routing")
+@XmlRootElement(name = "pausable")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class PausableDefinition extends NoOutputDefinition<PausableDefinition> 
{
+
+    @XmlTransient
+    private ConsumerListener<?, ?> consumerListenerBean;
+
+    @XmlAttribute(required = true)
+    @Metadata(required = true, javaType = "org.apache.camel.ConsumerListener")
+    private String consumerListener;
+
+    @XmlTransient
+    private Predicate<?> untilCheckBean;
+
+    @XmlAttribute(required = true)
+    @Metadata(required = true, javaType = "java.util.function.Predicate")
+    private String untilCheck;
+
+    @Override
+    public String getShortName() {
+        return "pausable";
+    }
+
+    @Override
+    public String getLabel() {
+        return "pausable";
+    }
+
+    public ConsumerListener<?, ?> getConsumerListenerBean() {
+        return consumerListenerBean;
+    }
+
+    public void setConsumerListener(ConsumerListener<?, ?> 
consumerListenerBean) {
+        this.consumerListenerBean = consumerListenerBean;
+    }
+
+    public String getConsumerListener() {
+        return consumerListener;
+    }
+
+    public void setConsumerListener(String consumerListener) {
+        this.consumerListener = consumerListener;
+    }
+
+    public Predicate<?> getUntilCheckBean() {
+        return untilCheckBean;
+    }
+
+    public void setUntilCheck(Predicate<?> untilCheckBean) {
+        this.untilCheckBean = untilCheckBean;
+    }
+
+    public String getUntilCheck() {
+        return untilCheck;
+    }
+
+    public void setUntilCheck(String untilCheck) {
+        this.untilCheck = untilCheck;
+    }
+
+    // Fluent API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the consumer listener to use
+     */
+    public PausableDefinition consumerListener(String consumerListenerRef) {
+        setConsumerListener(consumerListenerRef);
+
+        return this;
+    }
+
+    /**
+     * Sets the consumer listener to use
+     */
+    public PausableDefinition consumerListener(ConsumerListener<?, ?> 
consumerListener) {
+        setConsumerListener(consumerListener);
+
+        return this;
+    }
+}
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 68ba1c10dec..b3802477243 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -37,6 +37,7 @@ import org.apache.camel.AggregationStrategy;
 import org.apache.camel.BeanScope;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
+import org.apache.camel.ConsumerListener;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -3841,6 +3842,63 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
         return asType();
     }
 
+    /**
+     * This enables pausable consumers, which allows the consumer to pause 
work until a certain condition allows it to
+     * resume operation
+     *
+     * @return the builder
+     */
+    public PausableDefinition pausable() {
+        PausableDefinition answer = new PausableDefinition();
+        addOutput(answer);
+        return answer;
+    }
+
+    /**
+     * This enables pausable consumers, which allows the consumer to pause 
work until a certain condition allows it to
+     * resume operation
+     *
+     * @param  consumerListener the consumer listener to use for consumer 
events
+     * @return                  the builder
+     */
+    public Type pausable(ConsumerListener consumerListener, 
java.util.function.Predicate<?> untilCheck) {
+        PausableDefinition answer = new PausableDefinition();
+        answer.setConsumerListener(consumerListener);
+        answer.setUntilCheck(untilCheck);
+        addOutput(answer);
+        return asType();
+    }
+
+    /**
+     * This enables pausable consumers, which allows the consumer to pause 
work until a certain condition allows it to
+     * resume operation
+     *
+     * @param  consumerListenerRef the resume strategy
+     * @return                     the builder
+     */
+    public Type pausable(String consumerListenerRef, 
java.util.function.Predicate<?> untilCheck) {
+        PausableDefinition answer = new PausableDefinition();
+        answer.setConsumerListener(consumerListenerRef);
+        answer.setUntilCheck(untilCheck);
+        addOutput(answer);
+        return asType();
+    }
+
+    /**
+     * This enables pausable consumers, which allows the consumer to pause 
work until a certain condition allows it to
+     * resume operation
+     *
+     * @param  consumerListenerRef the resume strategy
+     * @return                     the builder
+     */
+    public Type pausable(String consumerListenerRef, String untilCheck) {
+        PausableDefinition answer = new PausableDefinition();
+        answer.setConsumerListener(consumerListenerRef);
+        answer.setUntilCheck(untilCheck);
+        addOutput(answer);
+        return asType();
+    }
+
     // Properties
     // 
-------------------------------------------------------------------------
 
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
new file mode 100644
index 00000000000..85954c3b6f7
--- /dev/null
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.Exchange;
+import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
+
+public class PausableProcessor extends AsyncProcessorSupport
+        implements Navigate<Processor>, CamelContextAware, IdAware, 
RouteIdAware {
+
+    private final ConsumerListener<?, ?> consumerListener;
+    private final AsyncProcessor processor;
+    private CamelContext camelContext;
+    private String id;
+    private String routeId;
+
+    public PausableProcessor(ConsumerListener<?, ?> consumerListener, 
Processor processor) {
+        this.consumerListener = consumerListener;
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        return processor.process(exchange, callback);
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public List<Processor> next() {
+        if (!hasNext()) {
+            return null;
+        }
+        List<Processor> answer = new ArrayList<>(1);
+        answer.add(processor);
+        return answer;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getRouteId() {
+        return routeId;
+    }
+
+    @Override
+    public void setRouteId(String routeId) {
+        this.routeId = routeId;
+    }
+}
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
new file mode 100644
index 00000000000..0626fcb05e5
--- /dev/null
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.reifier;
+
+import java.util.function.Predicate;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.model.PausableDefinition;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.PausableProcessor;
+import org.apache.camel.util.ObjectHelper;
+
+public class PausableReifier extends ProcessorReifier<PausableDefinition> {
+
+    public PausableReifier(Route route, ProcessorDefinition<?> definition) {
+        super(route, PausableDefinition.class.cast(definition));
+    }
+
+    @Override
+    public Processor createProcessor() throws Exception {
+        Processor childProcessor = createChildProcessor(false);
+
+        ConsumerListener<?, ?> consumerListener = resolveConsumerListener();
+        ObjectHelper.notNull(consumerListener, "consumerListener");
+
+        route.setConsumerListener(consumerListener);
+
+        return new PausableProcessor(consumerListener, childProcessor);
+    }
+
+    protected ConsumerListener<?, ?> resolveConsumerListener() {
+        ConsumerListener<?, ?> consumerListener = 
definition.getConsumerListenerBean();
+        if (consumerListener == null) {
+            String ref = definition.getConsumerListener();
+
+            consumerListener = mandatoryLookup(ref, ConsumerListener.class);
+        }
+
+        Predicate<?> supplier = resolveUntilCheck();
+        consumerListener.setResumableCheck(supplier);
+        return consumerListener;
+    }
+
+    protected Predicate<?> resolveUntilCheck() {
+        Predicate<?> supplier = definition.getUntilCheckBean();
+        if (supplier == null) {
+            String ref = definition.getUntilCheck();
+
+            supplier = mandatoryLookup(ref, Predicate.class);
+        }
+
+        return supplier;
+    }
+}
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index 55d3710f521..f375336c62c 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -66,6 +66,7 @@ import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.model.OnFallbackDefinition;
 import org.apache.camel.model.OptionalIdentifiedDefinition;
 import org.apache.camel.model.OtherwiseDefinition;
+import org.apache.camel.model.PausableDefinition;
 import org.apache.camel.model.PipelineDefinition;
 import org.apache.camel.model.PolicyDefinition;
 import org.apache.camel.model.PollEnrichDefinition;
@@ -310,6 +311,8 @@ public abstract class ProcessorReifier<T extends 
ProcessorDefinition<?>> extends
             return new WhenReifier(route, definition);
         } else if (definition instanceof ResumableDefinition) {
             return new ResumableReifier(route, definition);
+        } else if (definition instanceof PausableDefinition) {
+            return new PausableReifier(route, definition);
         }
         return null;
     }
diff --git 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index 24df3c4b80c..78c3d65df2a 100644
--- 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++ 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -718,6 +718,16 @@ public class ModelParser extends BaseParser {
             return true;
         }, noValueHandler());
     }
+    protected PausableDefinition doParsePausableDefinition() throws 
IOException, XmlPullParserException {
+        return doParse(new PausableDefinition(), (def, key, val) -> {
+            switch (key) {
+                case "consumerListener": def.setConsumerListener(val); break;
+                case "untilCheck": def.setUntilCheck(val); break;
+                default: return 
processorDefinitionAttributeHandler().accept(def, key, val);
+            }
+            return true;
+        }, optionalIdentifiedDefinitionElementHandler(), noValueHandler());
+    }
     protected PipelineDefinition doParsePipelineDefinition() throws 
IOException, XmlPullParserException {
         return doParse(new PipelineDefinition(),
             processorDefinitionAttributeHandler(), 
outputDefinitionElementHandler(), noValueHandler());
@@ -3237,6 +3247,7 @@ public class ModelParser extends BaseParser {
             case "onCompletion": return doParseOnCompletionDefinition();
             case "onException": return doParseOnExceptionDefinition();
             case "onFallback": return doParseOnFallbackDefinition();
+            case "pausable": return doParsePausableDefinition();
             case "pipeline": return doParsePipelineDefinition();
             case "policy": return doParsePolicyDefinition();
             case "pollEnrich": return doParsePollEnrichDefinition();
diff --git 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 73c1fa96cb4..6b98abddd24 100644
--- 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -655,17 +655,20 @@ public interface KafkaComponentBuilderFactory {
         }
         /**
          * Set if KafkaConsumer will read from beginning or end on startup:
-         * beginning : read from beginning end : read from end This is 
replacing
-         * the earlier property seekToBeginning.
+         * SeekPolicy.BEGINNING: read from beginning. SeekPolicy.END: read from
+         * end.
          * 
-         * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
+         * The option is a:
+         * &lt;code&gt;org.apache.camel.component.kafka.SeekPolicy&lt;/code&gt;
+         * type.
          * 
          * Group: consumer
          * 
          * @param seekTo the value to set
          * @return the dsl builder
          */
-        default KafkaComponentBuilder seekTo(java.lang.String seekTo) {
+        default KafkaComponentBuilder seekTo(
+                org.apache.camel.component.kafka.SeekPolicy seekTo) {
             doSetProperty("seekTo", seekTo);
             return this;
         }
@@ -2079,7 +2082,7 @@ public interface KafkaComponentBuilderFactory {
             case "partitionAssignor": 
getOrCreateConfiguration((KafkaComponent) 
component).setPartitionAssignor((java.lang.String) value); return true;
             case "pollOnError": getOrCreateConfiguration((KafkaComponent) 
component).setPollOnError((org.apache.camel.component.kafka.PollOnError) 
value); return true;
             case "pollTimeoutMs": getOrCreateConfiguration((KafkaComponent) 
component).setPollTimeoutMs((java.lang.Long) value); return true;
-            case "seekTo": getOrCreateConfiguration((KafkaComponent) 
component).setSeekTo((java.lang.String) value); return true;
+            case "seekTo": getOrCreateConfiguration((KafkaComponent) 
component).setSeekTo((org.apache.camel.component.kafka.SeekPolicy) value); 
return true;
             case "sessionTimeoutMs": getOrCreateConfiguration((KafkaComponent) 
component).setSessionTimeoutMs((java.lang.Integer) value); return true;
             case "specificAvroReader": 
getOrCreateConfiguration((KafkaComponent) 
component).setSpecificAvroReader((boolean) value); return true;
             case "topicIsPattern": getOrCreateConfiguration((KafkaComponent) 
component).setTopicIsPattern((boolean) value); return true;
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 647d8f04d81..31ce4492994 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -1119,10 +1119,31 @@ public interface KafkaEndpointBuilderFactory {
         }
         /**
          * Set if KafkaConsumer will read from beginning or end on startup:
-         * beginning : read from beginning end : read from end This is 
replacing
-         * the earlier property seekToBeginning.
+         * SeekPolicy.BEGINNING: read from beginning. SeekPolicy.END: read from
+         * end.
          * 
-         * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
+         * The option is a:
+         * &lt;code&gt;org.apache.camel.component.kafka.SeekPolicy&lt;/code&gt;
+         * type.
+         * 
+         * Group: consumer
+         * 
+         * @param seekTo the value to set
+         * @return the dsl builder
+         */
+        default KafkaEndpointConsumerBuilder seekTo(
+                org.apache.camel.component.kafka.SeekPolicy seekTo) {
+            doSetProperty("seekTo", seekTo);
+            return this;
+        }
+        /**
+         * Set if KafkaConsumer will read from beginning or end on startup:
+         * SeekPolicy.BEGINNING: read from beginning. SeekPolicy.END: read from
+         * end.
+         * 
+         * The option will be converted to a
+         * &lt;code&gt;org.apache.camel.component.kafka.SeekPolicy&lt;/code&gt;
+         * type.
          * 
          * Group: consumer
          * 
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index 311c9bac119..4aa3b14b129 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -48,6 +48,7 @@ import org.apache.camel.model.OtherwiseDefinition;
 import org.apache.camel.model.OutputDefinition;
 import org.apache.camel.model.OutputTypeDefinition;
 import org.apache.camel.model.PackageScanDefinition;
+import org.apache.camel.model.PausableDefinition;
 import org.apache.camel.model.PipelineDefinition;
 import org.apache.camel.model.PolicyDefinition;
 import org.apache.camel.model.PollEnrichDefinition;
@@ -9798,6 +9799,65 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
         }
     }
 
+    @YamlType(
+            nodes = "pausable",
+            types = org.apache.camel.model.PausableDefinition.class,
+            order = 
org.apache.camel.dsl.yaml.common.YamlDeserializerResolver.ORDER_LOWEST - 1,
+            properties = {
+                    @YamlProperty(name = "consumer-listener", type = "string", 
required = true),
+                    @YamlProperty(name = "description", type = "string"),
+                    @YamlProperty(name = "id", type = "string"),
+                    @YamlProperty(name = "inherit-error-handler", type = 
"boolean"),
+                    @YamlProperty(name = "until-check", type = "string", 
required = true)
+            }
+    )
+    public static class PausableDefinitionDeserializer extends 
YamlDeserializerBase<PausableDefinition> {
+        public PausableDefinitionDeserializer() {
+            super(PausableDefinition.class);
+        }
+
+        @Override
+        protected PausableDefinition newInstance() {
+            return new PausableDefinition();
+        }
+
+        @Override
+        protected boolean setProperty(PausableDefinition target, String 
propertyKey,
+                String propertyName, Node node) {
+            switch(propertyKey) {
+                case "consumer-listener": {
+                    String val = asText(node);
+                    target.setConsumerListener(val);
+                    break;
+                }
+                case "inherit-error-handler": {
+                    String val = asText(node);
+                    
target.setInheritErrorHandler(java.lang.Boolean.valueOf(val));
+                    break;
+                }
+                case "until-check": {
+                    String val = asText(node);
+                    target.setUntilCheck(val);
+                    break;
+                }
+                case "id": {
+                    String val = asText(node);
+                    target.setId(val);
+                    break;
+                }
+                case "description": {
+                    org.apache.camel.model.DescriptionDefinition val = 
asType(node, org.apache.camel.model.DescriptionDefinition.class);
+                    target.setDescription(val);
+                    break;
+                }
+                default: {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
     @YamlType(
             nodes = "pipeline",
             types = org.apache.camel.model.PipelineDefinition.class,
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
index bd7b148de91..77675d01f58 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java
@@ -286,6 +286,8 @@ public final class ModelDeserializersResolver implements 
YamlDeserializerResolve
             case 
"org.apache.camel.model.cloud.PassThroughServiceCallServiceFilterConfiguration":
 return new 
ModelDeserializers.PassThroughServiceCallServiceFilterConfigurationDeserializer();
             case "patch": return new 
ModelDeserializers.PatchDefinitionDeserializer();
             case "org.apache.camel.model.rest.PatchDefinition": return new 
ModelDeserializers.PatchDefinitionDeserializer();
+            case "pausable": return new 
ModelDeserializers.PausableDefinitionDeserializer();
+            case "org.apache.camel.model.PausableDefinition": return new 
ModelDeserializers.PausableDefinitionDeserializer();
             case "pipeline": return new 
ModelDeserializers.PipelineDefinitionDeserializer();
             case "org.apache.camel.model.PipelineDefinition": return new 
ModelDeserializers.PipelineDefinitionDeserializer();
             case "policy": return new 
ModelDeserializers.PolicyDefinitionDeserializer();
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
index 817d2942861..24e65a97bef 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
@@ -131,6 +131,9 @@
           "otherwise" : {
             "$ref" : 
"#/items/definitions/org.apache.camel.model.OtherwiseDefinition"
           },
+          "pausable" : {
+            "$ref" : 
"#/items/definitions/org.apache.camel.model.PausableDefinition"
+          },
           "pipeline" : {
             "$ref" : 
"#/items/definitions/org.apache.camel.model.PipelineDefinition"
           },
@@ -1766,6 +1769,27 @@
           }
         }
       },
+      "org.apache.camel.model.PausableDefinition" : {
+        "type" : "object",
+        "properties" : {
+          "consumer-listener" : {
+            "type" : "string"
+          },
+          "description" : {
+            "type" : "string"
+          },
+          "id" : {
+            "type" : "string"
+          },
+          "inherit-error-handler" : {
+            "type" : "boolean"
+          },
+          "until-check" : {
+            "type" : "string"
+          }
+        },
+        "required" : [ "consumer-listener", "until-check" ]
+      },
       "org.apache.camel.model.PipelineDefinition" : {
         "type" : "object",
         "properties" : {
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json
index f5c77af2d0e..ac348b5ee5f 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json
@@ -89,6 +89,9 @@
           "otherwise" : {
             "$ref" : 
"#/items/definitions/org.apache.camel.model.OtherwiseDefinition"
           },
+          "pausable" : {
+            "$ref" : 
"#/items/definitions/org.apache.camel.model.PausableDefinition"
+          },
           "pipeline" : {
             "$ref" : 
"#/items/definitions/org.apache.camel.model.PipelineDefinition"
           },
@@ -1670,6 +1673,27 @@
           }
         }
       },
+      "org.apache.camel.model.PausableDefinition" : {
+        "type" : "object",
+        "properties" : {
+          "consumerListener" : {
+            "type" : "string"
+          },
+          "description" : {
+            "type" : "string"
+          },
+          "id" : {
+            "type" : "string"
+          },
+          "inheritErrorHandler" : {
+            "type" : "boolean"
+          },
+          "untilCheck" : {
+            "type" : "string"
+          }
+        },
+        "required" : [ "consumerListener", "untilCheck" ]
+      },
       "org.apache.camel.model.PipelineDefinition" : {
         "type" : "object",
         "properties" : {

Reply via email to