This is an automated email from the ASF dual-hosted git repository.

jpoth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git


The following commit(s) were added to refs/heads/main by this push:
     new e44242f  [CAMEL-17805] add tests in camel-kafka-starter (#476)
e44242f is described below

commit e44242f9eef226050c10a0ad3a91c8af805c7e35
Author: John Poth <[email protected]>
AuthorDate: Mon Mar 21 12:16:41 2022 +0100

    [CAMEL-17805] add tests in camel-kafka-starter (#476)
    
    * [CAMEL-17805] add tests in camel-kafka-starter
    
    * Regen
---
 components-starter/camel-kafka-starter/pom.xml     | 125 +++++
 .../integration/BaseEmbeddedKafkaTestSupport.java  | 108 ++++
 .../integration/CustomHeaderDeserializer.java      |  39 ++
 .../KafkaConsumerAsyncManualCommitIT.java          | 160 ++++++
 .../integration/KafkaConsumerBatchSizeIT.java      | 116 +++++
 .../kafka/integration/KafkaConsumerFullIT.java     | 230 +++++++++
 .../integration/KafkaConsumerHealthCheckIT.java    | 211 ++++++++
 .../integration/KafkaConsumerIdempotentIT.java     | 102 ++++
 .../KafkaConsumerIdempotentTestSupport.java        |  66 +++
 ...kaConsumerIdempotentWithCustomSerializerIT.java |  97 ++++
 .../KafkaConsumerIdempotentWithProcessorIT.java    | 104 ++++
 .../KafkaConsumerLastRecordHeaderIT.java           | 113 +++++
 .../integration/KafkaConsumerManualCommitIT.java   | 180 +++++++
 .../integration/KafkaConsumerRebalanceIT.java      | 120 +++++
 .../integration/KafkaConsumerTopicIsPatternIT.java | 113 +++++
 .../KafkaConsumerWithResumeRouteStrategyIT.java    | 250 +++++++++
 .../kafka/integration/KafkaProducerFullIT.java     | 561 +++++++++++++++++++++
 .../component/kafka/integration/KafkaToDIT.java    |  71 +++
 .../kafka/integration/MockConsumerInterceptor.java |  54 ++
 .../kafka/integration/MockProducerInterceptor.java |  49 ++
 .../OffsetStateRepositoryConfiguration.java        |  35 ++
 .../integration/ResumeStrategyConfiguration.java   |  35 ++
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  62 +++
 23 files changed, 3001 insertions(+)

diff --git a/components-starter/camel-kafka-starter/pom.xml 
b/components-starter/camel-kafka-starter/pom.xml
index 7a55a4a..368c4d6 100644
--- a/components-starter/camel-kafka-starter/pom.xml
+++ b/components-starter/camel-kafka-starter/pom.xml
@@ -52,6 +52,31 @@
       </exclusions>
       -->
     </dependency>
+    <!-- test -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-infra-kafka</artifactId>
+      <version>${camel-version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-spring-junit5</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.camel</groupId>
+          <artifactId>camel-spring-xml</artifactId>
+        </exclusion>
+      </exclusions>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <version>${spring-boot-version}</version>
+      <scope>test</scope>
+    </dependency>
     <!--START OF GENERATED CODE-->
     <dependency>
       <groupId>org.apache.camel.springboot</groupId>
@@ -59,4 +84,104 @@
     </dependency>
     <!--END OF GENERATED CODE-->
   </dependencies>
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-failsafe-plugin</artifactId>
+          <configuration>
+            <reuseForks>true</reuseForks>
+            
<forkedProcessTimeoutInSeconds>${camel.failsafe.forkTimeout}</forkedProcessTimeoutInSeconds>
+            <redirectTestOutputToFile>false</redirectTestOutputToFile>
+            <systemPropertyVariables>
+              <visibleassertions.silence>true</visibleassertions.silence>
+            </systemPropertyVariables>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+  <profiles>
+    <!-- activate integration test if the docker socket file is accessible -->
+    <profile>
+      <id>kafka-integration-tests-docker-file</id>
+      <activation>
+        <file>
+          <exists>/var/run/docker.sock</exists>
+        </file>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-failsafe-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>integration-test</id>
+                <phase>verify</phase>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+              </execution>
+              <execution>
+                <id>kafka-3</id>
+                <configuration>
+                  <systemPropertyVariables>
+                    
<kafka.instance.type>local-kafka3-container</kafka.instance.type>
+                  </systemPropertyVariables>
+                  <reportNameSuffix>kafka-3</reportNameSuffix>
+                </configuration>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <!-- activate integration test if the DOCKER_HOST env var is set -->
+    <profile>
+      <id>kafka-integration-tests-docker-env</id>
+      <activation>
+        <property>
+          <name>env.DOCKER_HOST</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-failsafe-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>integration-test</id>
+                <phase>verify</phase>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+              </execution>
+              <execution>
+                <id>kafka-3</id>
+                <configuration>
+                  <systemPropertyVariables>
+                    
<kafka.instance.type>local-kafka3-container</kafka.instance.type>
+                  </systemPropertyVariables>
+                  <reportNameSuffix>kafka-3</reportNameSuffix>
+                </configuration>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
new file mode 100644
index 0000000..7b83738
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Properties;
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.spring.boot.CamelContextConfiguration;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+public abstract class BaseEmbeddedKafkaTestSupport {
+
+    @RegisterExtension
+    public static KafkaService service = KafkaServiceFactory.createService();
+
+    @Autowired
+    protected CamelContext context;
+
+    protected static AdminClient kafkaAdminClient;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BaseEmbeddedKafkaTestSupport.class);
+
+    @BeforeAll
+    public static void beforeClass() {
+        LOG.info("### Embedded Kafka cluster broker list: " + 
service.getBootstrapServers());
+        System.setProperty("bootstrapServers", service.getBootstrapServers());
+    }
+
+    @BeforeEach
+    public void setKafkaAdminClient() {
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient = createAdminClient();
+        }
+    }
+
+    protected Properties getDefaultProperties() {
+        LOG.info("Connecting to Kafka {}", service.getBootstrapServers());
+
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
service.getBootstrapServers());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+        props.put(ProducerConfig.ACKS_CONFIG, "1");
+        return props;
+    }
+
+    protected static String getBootstrapServers() {
+        return service.getBootstrapServers();
+    }
+
+    private static AdminClient createAdminClient() {
+        final Properties properties = new Properties();
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
service.getBootstrapServers());
+
+        return KafkaAdminClient.create(properties);
+    }
+
+    @Configuration
+    public class DefaulKafkaComponent {
+        @Bean
+        CamelContextConfiguration contextConfiguration() {
+            return new CamelContextConfiguration() {
+                @Override
+                public void beforeApplicationStart(CamelContext context) {
+                    context.getPropertiesComponent().setLocation("ref:prop");
+
+                    KafkaComponent kafka = new KafkaComponent(context);
+                    kafka.init();
+                    
kafka.getConfiguration().setBrokers(BaseEmbeddedKafkaTestSupport.service.getBootstrapServers());
+                    context.addComponent("kafka", kafka);
+                }
+
+                @Override
+                public void afterApplicationStart(CamelContext camelContext) {
+                    //do nothing here
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/CustomHeaderDeserializer.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/CustomHeaderDeserializer.java
new file mode 100644
index 0000000..e3be099
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/CustomHeaderDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.integration;
+
+import java.math.BigInteger;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CustomHeaderDeserializer.class);
+
+    @Override
+    public Object deserialize(String key, byte[] value) {
+        if (key.equals("id")) {
+            BigInteger bi = new BigInteger(value);
+            LOG.debug("Converted the header {} to {} via custom serializer", 
key, bi.longValue());
+
+            return String.valueOf(bi.longValue());
+        } else {
+            return super.deserialize(key, value);
+        }
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
new file mode 100644
index 0000000..e4d2e1d
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.AggregationStrategies;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import 
org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerAsyncManualCommitIT.class,
+                KafkaConsumerAsyncManualCommitIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerAsyncManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
+
+    public static final String TOPIC = "testManualCommitTest";
+
+    private final String from = "kafka:" + TOPIC
+                    + 
"?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+                    + 
"&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory";
+
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    @EndpointInject("mock:resultBar")
+    private MockEndpoint toBar;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @RepeatedTest(4)
+    public void kafkaManualCommit() throws Exception {
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
+        // The LAST_RECORD_BEFORE_COMMIT header should include a value as we 
use
+        // manual commit
+        
to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        // Second step: We shut down our route, we expect nothing will be 
recovered by our route
+        context.getRouteController().stopRoute("foo");
+        to.expectedMessageCount(0);
+
+        // Third step: While our route is stopped, we send 3 records more to 
Kafka test topic
+        for (int k = 5; k < 8; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        // Fourth step: We start again our route, since we have been 
committing the offsets from the first step,
+        // we will expect to consume from the latest committed offset e.g from 
offset 5
+        context.getRouteController().startRoute("foo");
+        to.expectedMessageCount(3);
+        to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", 
"message-7");
+
+        to.assertIsSatisfied(3000);
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean("testFactory")
+        public KafkaManualCommitFactory createKafkaManualCommitFactory (){
+            return new DefaultKafkaManualAsyncCommitFactory();
+        }
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from(from).routeId("foo").to("direct:aggregate");
+                    // With sync manual commit, this would throw a concurrent 
modification exception
+                    // It can be used in aggregator with completion 
timeout/interval for instance
+                    // WARN: records from one partition must be processed by 
one unique thread
+                    from("direct:aggregate").routeId("aggregate").to(to)
+                            .aggregate()
+                            .constant(true)
+                            .completionTimeout(1)
+                            
.aggregationStrategy(AggregationStrategies.groupedExchange())
+                            .split().body()
+                            .process(e -> {
+                                KafkaManualCommit manual = 
e.getMessage().getBody(Exchange.class)
+                                        
.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+                                assertNotNull(manual);
+                                manual.commit();
+                            });
+                    from(from).routeId("bar").autoStartup(false).to(toBar);
+                }
+            };
+        }
+    }
+
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
new file mode 100644
index 0000000..c446b20
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerBatchSizeIT.class,
+                KafkaConsumerBatchSizeIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerBatchSizeIT extends BaseEmbeddedKafkaTestSupport {
+
+    public static final String TOPIC = "test-batch";
+
+    private final String from = "kafka:" + TOPIC + 
"?autoOffsetReset=earliest&autoCommitEnable=false&consumersCount=1";
+
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @Test
+    public void kafkaMessagesIsConsumedByCamel() throws Exception {
+
+        // First 2 must not be committed since batch size is 3
+        to.expectedBodiesReceivedInAnyOrder("m1", "m2");
+        for (int k = 1; k <= 2; k++) {
+            String msg = "m" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied();
+
+        to.reset();
+
+        to.expectedBodiesReceivedInAnyOrder("m3", "m4", "m5", "m6", "m7", 
"m8", "m9", "m10");
+
+        // Restart endpoint,
+        context.getRouteController().stopRoute("foo");
+        context.getRouteController().startRoute("foo");
+
+        // Second route must wake up and consume all from scratch and commit 9
+        // consumed
+        for (int k = 3; k <= 10; k++) {
+            String msg = "m" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied();
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from(from).routeId("foo").to(to).setId("First");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
new file mode 100644
index 0000000..ca96589
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.StreamSupport;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerFullIT.class,
+                KafkaConsumerFullIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
+    public static final String TOPIC = "test-full";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerFullIT.class);
+
+    private final String from = "kafka:" + TOPIC
+            + 
"?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+            + 
"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+            + 
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+        to.reset();
+    }
+
+    @Order(3)
+    @Test
+    public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        String skippedHeaderKey = "CamelSkippedHeader";
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
+        // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any
+        // exchange because autoCommitEnable=true
+        
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
 null, null, null, null, null);
+        to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped 
header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, 
propagatedHeaderValue));
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+
+        assertEquals(5, 
StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(),
 false)
+                .count());
+
+        Map<String, Object> headers = 
to.getExchanges().get(0).getIn().getHeaders();
+        assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive 
skipped header");
+        assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive 
propagated header");
+    }
+
+    @Order(2)
+    @Test
+    public void kafkaRecordSpecificHeadersAreNotOverwritten() throws 
InterruptedException {
+        String propagatedHeaderKey = KafkaConstants.TOPIC;
+        byte[] propagatedHeaderValue = "propagated incorrect topic".getBytes();
+        to.expectedHeaderReceived(KafkaConstants.TOPIC, TOPIC);
+
+        ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", 
"message");
+        data.headers().add(new RecordHeader(propagatedHeaderKey, 
propagatedHeaderValue));
+        producer.send(data);
+
+        to.assertIsSatisfied(3000);
+
+        Map<String, Object> headers = 
to.getExchanges().get(0).getIn().getHeaders();
+        assertTrue(headers.containsKey(KafkaConstants.TOPIC), "Should receive 
KafkaEndpoint populated kafka.TOPIC header");
+        assertEquals(TOPIC, headers.get(KafkaConstants.TOPIC), "Topic name 
received");
+    }
+
+    @Test
+    @Order(1)
+    public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws 
Exception {
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            producer.send(data);
+        }
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        to.expectedMessageCount(5);
+
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
+
+        // Restart endpoint,
+        context.getRouteController().stopRoute("full-it");
+
+        KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) 
context.getEndpoint(from);
+        kafkaEndpoint.getConfiguration().setSeekTo("beginning");
+
+        context.getRouteController().startRoute("full-it");
+
+        // As wee set seek to beginning we should re-consume all messages
+        to.assertIsSatisfied(3000);
+    }
+
+    @Order(4)
+    @Test
+    public void kafkaMessageIsConsumedByCamelSeekedToEnd() throws Exception {
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            producer.send(data);
+        }
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        to.expectedMessageCount(0);
+
+        // Restart endpoint,
+        context.getRouteController().stopRoute("full-it");
+
+        KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) 
context.getEndpoint(from);
+        kafkaEndpoint.getConfiguration().setSeekTo("end");
+
+        context.getRouteController().startRoute("full-it");
+
+        to.assertIsSatisfied(3000);
+    }
+
+    @Order(5)
+    @Test
+    public void headerDeserializerCouldBeOverridden() {
+        KafkaEndpoint kafkaEndpoint
+                = 
context.getEndpoint("kafka:random_topic?headerDeserializer=#myHeaderDeserializer",
 KafkaEndpoint.class);
+        assertIsInstanceOf(MyKafkaHeaderDeserializer.class, 
kafkaEndpoint.getConfiguration().getHeaderDeserializer());
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from(from).process(exchange -> LOG.trace("Captured on the 
processor: {}", exchange.getMessage().getBody()))
+                            .routeId("full-it").to(to);
+                }
+            };
+        }
+
+        @Bean("myHeaderDeserializer")
+        public MyKafkaHeaderDeserializer createMyKafkaHeaderDeserializer(){
+            return new MyKafkaHeaderDeserializer();
+        }
+    }
+
+    private static class MyKafkaHeaderDeserializer extends 
DefaultKafkaHeaderDeserializer {
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
new file mode 100644
index 0000000..31a383c
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.spring.boot.CamelContextConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerHealthCheckIT.class,
+                KafkaConsumerHealthCheckIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
+    public static final String TOPIC = "test-health";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerHealthCheckIT.class);
+
+    private final String from = "kafka:" + TOPIC
+            + 
"?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+            + 
"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+            + 
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+        to.reset();
+    }
+
+    @Bean("healthRegistry")
+    CamelContextConfiguration contextConfiguration() {
+        return new CamelContextConfiguration() {
+            @Override
+            public void beforeApplicationStart(CamelContext context) {
+                // install health check manually (yes a bit cumbersome)
+                HealthCheckRegistry registry = new 
DefaultHealthCheckRegistry();
+                registry.setCamelContext(context);
+                Object hc = registry.resolveById("context");
+                registry.register(hc);
+                hc = registry.resolveById("routes");
+                registry.register(hc);
+                hc = registry.resolveById("consumers");
+                registry.register(hc);
+                context.setExtension(HealthCheckRegistry.class, registry);
+            }
+
+            @Override
+            public void afterApplicationStart(CamelContext camelContext) {
+                //do nothing here
+            }
+        };
+    }
+
+    @Order(1)
+    @Test
+    public void kafkaConsumerHealthCheck() throws InterruptedException {
+        // health-check liveness should be UP
+        Collection<HealthCheck.Result> res = 
HealthCheckHelper.invokeLiveness(context);
+        boolean up = res.stream().allMatch(r -> 
r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+
+        // health-check readiness should be ready
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = 
HealthCheckHelper.invokeReadiness(context);
+            boolean up2 = res2.stream().allMatch(r -> 
r.getState().equals(HealthCheck.State.UP));
+            Assertions.assertTrue(up2, "readiness check");
+        });
+
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        String skippedHeaderKey = "CamelSkippedHeader";
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
+        // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any
+        // exchange because autoCommitEnable=true
+        
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
 null, null, null, null, null);
+        to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped 
header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, 
propagatedHeaderValue));
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+
+        assertEquals(5, 
StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(),
 false)
+                .count());
+
+        Map<String, Object> headers = 
to.getExchanges().get(0).getIn().getHeaders();
+        assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive 
skipped header");
+        assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive 
propagated header");
+
+        // and shutdown kafka which will make readiness report as DOWN
+        service.shutdown();
+
+        // health-check liveness should be UP
+        res = HealthCheckHelper.invokeLiveness(context);
+        up = res.stream().allMatch(r -> 
r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+        // but health-check readiness should NOT be ready
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = 
HealthCheckHelper.invoke(context);
+            Optional<HealthCheck.Result> down
+                    = res2.stream().filter(r -> 
r.getState().equals(HealthCheck.State.DOWN)).findFirst();
+            Assertions.assertTrue(down.isPresent());
+            String msg = down.get().getMessage().get();
+            Assertions.assertEquals("KafkaConsumer is not ready (recovery in 
progress using 5s intervals).", msg);
+            Map<String, Object> map = down.get().getDetails();
+            Assertions.assertEquals(TOPIC, map.get("topic"));
+            Assertions.assertEquals("test-health-it", map.get("route.id"));
+        });
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from(from).process(exchange -> LOG.trace("Captured on the 
processor: {}", exchange.getMessage().getBody()))
+                            .routeId("test-health-it").to(to);
+                }
+            };
+        }
+
+        @Bean("myHeaderDeserializer")
+        public MyKafkaHeaderDeserializer createMyKafkaHeaderDeserializer(){
+            return new MyKafkaHeaderDeserializer();
+        }
+    }
+
+    private static class MyKafkaHeaderDeserializer extends 
DefaultKafkaHeaderDeserializer {
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
new file mode 100644
index 0000000..5e20875
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Arrays;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static 
org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHeader;
+
+@EnabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", 
matches = "true")
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerIdempotentIT.class,
+                KafkaConsumerIdempotentIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerIdempotentIT extends 
KafkaConsumerIdempotentTestSupport {
+
+    public static final String TOPIC = "idempt";
+
+    private final String from = "kafka:" + TOPIC
+            + "?groupId=group2&autoOffsetReset=earliest"
+            + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+            + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+            + 
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+            + 
"&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private int size = 200;
+
+    @BeforeEach
+    public void before() {
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
+        doSend(size, TOPIC);
+    }
+
+    @AfterEach
+    public void after() {
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
+    }
+
+    @Test
+    @DisplayName("Numeric headers is consumable when using idempotent 
(CAMEL-16914)")
+    public void kafkaIdempotentMessageIsConsumedByCamel() throws 
InterruptedException {
+        doRun(to, size);
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from(from).routeId("foo")
+                            .idempotentConsumer(numericHeader("id"))
+                            .idempotentRepository("kafkaIdempotentRepository")
+                            .to(to);
+                }
+            };
+        }
+
+        @Bean("kafkaIdempotentRepository")
+        public KafkaIdempotentRepository createKafkaIdempotentRepository(){
+            return new KafkaIdempotentRepository("TEST_IDEMPOTENT", 
getBootstrapServers());
+        }
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
new file mode 100644
index 0000000..dc56323
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.integration;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public abstract class KafkaConsumerIdempotentTestSupport extends 
BaseEmbeddedKafkaTestSupport {
+
+    protected void doSend(int size, String topic) {
+        Properties props = getDefaultProperties();
+        org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer
+                = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+
+        try {
+            for (int k = 0; k < size; k++) {
+                String msg = "message-" + k;
+                ProducerRecord<String, String> data = new 
ProducerRecord<>(topic, String.valueOf(k), msg);
+
+                data.headers().add(new RecordHeader("id", 
BigInteger.valueOf(k).toByteArray()));
+                producer.send(data);
+            }
+        } finally {
+            if (producer != null) {
+                producer.close();
+            }
+        }
+    }
+
+    protected void doRun(MockEndpoint mockEndpoint, int size) throws 
InterruptedException {
+        mockEndpoint.expectedMessageCount(size);
+
+        List<Exchange> exchangeList = mockEndpoint.getReceivedExchanges();
+
+        mockEndpoint.assertIsSatisfied(10000);
+
+        assertEquals(size, exchangeList.size());
+
+        Map<String, Object> headers = 
mockEndpoint.getExchanges().get(0).getIn().getHeaders();
+        assertTrue(headers.containsKey("id"), "0");
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
new file mode 100644
index 0000000..c9a96b1
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Arrays;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerIdempotentWithCustomSerializerIT.class,
+                
KafkaConsumerIdempotentWithCustomSerializerIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerIdempotentWithCustomSerializerIT extends 
KafkaConsumerIdempotentTestSupport {
+
+    public static final String TOPIC = "idempt2";
+
+    private final String from = "kafka:" + TOPIC
+            + "?groupId=group2&autoOffsetReset=earliest"
+            + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+            + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+            + 
"&headerDeserializer=#class:org.apache.camel.component.kafka.integration.CustomHeaderDeserializer"
+            + 
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+            + 
"&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private int size = 200;
+
+    @BeforeEach
+    public void before() {
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
+        doSend(size, TOPIC);
+    }
+
+    @AfterEach
+    public void after() {
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
+    }
+
+    @Test
+    public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+        doRun(to, size);
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from(from).routeId("foo")
+                            .idempotentConsumer(header("id"))
+                            .idempotentRepository("kafkaIdempotentRepository")
+                            .to(to);
+                }
+            };
+        }
+
+        @Bean("kafkaIdempotentRepository")
+        public KafkaIdempotentRepository createKafkaIdempotentRepository(){
+            return new KafkaIdempotentRepository("TEST_IDEMPOTENT", 
getBootstrapServers());
+        }
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
new file mode 100644
index 0000000..d41756b
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerIdempotentWithProcessorIT.class,
+                KafkaConsumerIdempotentWithProcessorIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerIdempotentWithProcessorIT extends 
KafkaConsumerIdempotentTestSupport {
+    public static final String TOPIC = "testidemp3";
+
+    private final String from = "kafka:" + TOPIC
+            + "?groupId=group2&autoOffsetReset=earliest"
+            + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+            + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+            + 
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+            + 
"&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+    @EndpointInject("mock:resulti")
+    private MockEndpoint to;
+
+    private int size = 200;
+
+    @BeforeEach
+    public void before() {
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
+        doSend(size, TOPIC);
+    }
+
+    @AfterEach
+    public void after() {
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
+    }
+
+    @Test
+    public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+        doRun(to, size);
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from(from).routeId("idemp-with-prop")
+                            .process(exchange -> {
+                                byte[] id = exchange.getIn().getHeader("id", 
byte[].class);
+
+                                BigInteger bi = new BigInteger(id);
+
+                                exchange.getIn().setHeader("id", 
String.valueOf(bi.longValue()));
+                            })
+                            .idempotentConsumer(header("id"))
+                            .idempotentRepository("kafkaIdempotentRepository")
+                            .to(to);
+                }
+            };
+        }
+
+        @Bean("kafkaIdempotentRepository")
+        public KafkaIdempotentRepository createKafkaIdempotentRepository(){
+            return new KafkaIdempotentRepository("TEST_IDEMPOTENT", 
getBootstrapServers());
+        }
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java
new file mode 100644
index 0000000..2d97cc9
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerLastRecordHeaderIT.class,
+                KafkaConsumerLastRecordHeaderIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerLastRecordHeaderIT extends 
BaseEmbeddedKafkaTestSupport {
+    private static final String TOPIC = "last-record";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    /**
+     * When consuming data with autoCommitEnable=false Then the 
LAST_RECORD_BEFORE_COMMIT header must be always defined
+     * And it should be true only for the last one
+     */
+    @Test
+    public void shouldStartFromBeginningWithEmptyOffsetRepository() throws 
InterruptedException {
+        result.expectedMessageCount(5);
+        result.expectedBodiesReceived("message-0", "message-1", "message-2", 
"message-3", "message-4");
+
+        for (int i = 0; i < 5; i++) {
+            producer.send(new ProducerRecord<>(TOPIC, "1", "message-" + i));
+        }
+
+        result.assertIsSatisfied(3000);
+
+        List<Exchange> exchanges = result.getExchanges();
+        for (int i = 0; i < exchanges.size(); i++) {
+            Boolean header = 
exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, 
Boolean.class);
+            assertNotNull(header, "Header not set for #" + i);
+            assertEquals(header, i == exchanges.size() - 1, "Header invalid 
for #" + i);
+            // as long as the partitions count is 1 on topic:
+            header = 
exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_POLL_RECORD, 
Boolean.class);
+            assertNotNull(header, "Last record header not set for #" + i);
+            assertEquals(header, i == exchanges.size() - 1, "Last record 
header invalid for #" + i);
+        }
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("kafka:" + TOPIC + 
"?groupId=A&autoOffsetReset=earliest&autoCommitEnable=false").to("mock:result");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
new file mode 100644
index 0000000..9730286
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerManualCommitIT.class,
+                KafkaConsumerManualCommitIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport {
+
+    public static final String TOPIC = "testManualCommitTest";
+
+    private final String from = "kafka:" + TOPIC
+            + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+            + "&allowManualCommit=true&autoOffsetReset=earliest";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    @EndpointInject("mock:resultBar")
+    private MockEndpoint toBar;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+     @RepeatedTest(4)
+    public void kafkaAutoCommitDisabledDuringRebalance() throws Exception {
+        to.expectedMessageCount(1);
+        String firstMessage = "message-0";
+        to.expectedBodiesReceivedInAnyOrder(firstMessage);
+
+        ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", 
firstMessage);
+        producer.send(data);
+
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        context.getRouteController().stopRoute("foo");
+        to.expectedMessageCount(0);
+
+        String secondMessage = "message-1";
+        data = new ProducerRecord<>(TOPIC, "1", secondMessage);
+        producer.send(data);
+
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        // start a new route in order to rebalance kafka
+        context.getRouteController().startRoute("bar");
+        toBar.expectedMessageCount(1);
+
+        toBar.assertIsSatisfied();
+
+        context.getRouteController().stopRoute("bar");
+
+        // The route bar is not committing the offset, so by restarting foo, 
last 3 items will be processed
+        context.getRouteController().startRoute("foo");
+        to.expectedMessageCount(1);
+        to.expectedBodiesReceivedInAnyOrder("message-1");
+
+        to.assertIsSatisfied(3000);
+    }
+
+    @RepeatedTest(4)
+    public void kafkaManualCommit() throws Exception {
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
+        // The LAST_RECORD_BEFORE_COMMIT header should include a value as we 
use
+        // manual commit
+        
to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        // Second step: We shut down our route, we expect nothing will be 
recovered by our route
+        context.getRouteController().stopRoute("foo");
+        to.expectedMessageCount(0);
+
+        // Third step: While our route is stopped, we send 3 records more to 
Kafka test topic
+        for (int k = 5; k < 8; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        // Fourth step: We start again our route, since we have been 
committing the offsets from the first step,
+        // we will expect to consume from the latest committed offset e.g from 
offset 5
+        context.getRouteController().startRoute("foo");
+        to.expectedMessageCount(3);
+        to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", 
"message-7");
+
+        to.assertIsSatisfied(3000);
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from(from).routeId("foo").to(to).process(e -> {
+                        KafkaManualCommit manual = 
e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+                        assertNotNull(manual);
+                        manual.commit();
+                    });
+                    from(from).routeId("bar").autoStartup(false).to(toBar);
+                }
+            };
+        }
+    }
+
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerRebalanceIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerRebalanceIT.java
new file mode 100644
index 0000000..5437132
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerRebalanceIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.StateRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerRebalanceIT.class,
+                OffsetStateRepositoryConfiguration.class,
+                KafkaConsumerRebalanceIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerRebalanceIT extends BaseEmbeddedKafkaTestSupport {
+    private static final String TOPIC = "offset-rebalance";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Autowired
+    private CountDownLatch messagesLatch;
+
+    @Test
+    public void offsetGetStateMustHaveBeenCalledTwice() throws Exception {
+        boolean offsetGetStateCalled = messagesLatch.await(30000, 
TimeUnit.MILLISECONDS);
+        // The getState should most likely be called during the partition 
assignment
+        assertTrue(offsetGetStateCalled, "StateRepository.getState should have 
been called for topic " + TOPIC
+                                         + ". Remaining count : " + 
messagesLatch.getCount());
+    }
+
+    @AfterEach
+    public void after() {
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("kafka:" + TOPIC + "?groupId=" + TOPIC + "_GROUP" + 
"&autoCommitIntervalMs=1000"
+                            + "&autoOffsetReset=latest" + "&consumersCount=1"
+                            + 
"&offsetRepository=#offset").routeId("consumer-rebalance-route").to("mock:result");
+                }
+            };
+        }
+    }
+
+    public static class OffsetStateRepository implements 
StateRepository<String, String> {
+        private static final Logger LOG = 
LoggerFactory.getLogger(OffsetStateRepository.class);
+        CountDownLatch messagesLatch;
+
+        public OffsetStateRepository(CountDownLatch messagesLatch) {
+            this.messagesLatch = messagesLatch;
+        }
+
+        @Override
+        public void start() {
+        }
+
+        @Override
+        public void stop() {
+        }
+
+        @Override
+        public String getState(String key) {
+            LOG.debug("Getting the state for {} from topic {}", key, TOPIC);
+            if (key.contains(TOPIC)) {
+                LOG.trace("Topic matches, counting down");
+                messagesLatch.countDown();
+            }
+
+            return "-1";
+        }
+
+        @Override
+        public void setState(String key, String value) {
+        }
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
new file mode 100644
index 0000000..9285ffa
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.stream.StreamSupport;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerTopicIsPatternIT.class,
+                KafkaConsumerTopicIsPatternIT.TestConfiguration.class,
+        }
+)
+public class KafkaConsumerTopicIsPatternIT extends 
BaseEmbeddedKafkaTestSupport {
+
+    public static final String TOPIC = "vess123d";
+    public static final String TOPIC_PATTERN = "v.*d";
+
+    private final String from = "kafka:" + TOPIC_PATTERN + 
"?topicIsPattern=true&groupId=group1&autoOffsetReset=earliest"
+            + 
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @Test
+    public void kafkaTopicIsPattern() throws Exception {
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
+        to.allMessages().header(KafkaConstants.TOPIC).isEqualTo(TOPIC);
+        // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any
+        // exchange because autoCommitEnable=true
+        
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
 null, null, null, null, null);
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+
+        assertEquals(5, 
StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(),
 false)
+                .count());
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from(from).to(to);
+                }
+            };
+        }
+    }
+
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
new file mode 100644
index 0000000..05f9d7d
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Offset;
+import org.apache.camel.Resumable;
+import org.apache.camel.Service;
+import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.builder.RouteBuilder;
+import 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
+import org.apache.camel.component.kafka.consumer.support.KafkaResumable;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.resume.Resumables;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaConsumerWithResumeRouteStrategyIT.class,
+                KafkaConsumerWithResumeRouteStrategyIT.TestConfiguration.class,
+                ResumeStrategyConfiguration.class,
+        }
+)
+public class KafkaConsumerWithResumeRouteStrategyIT extends 
BaseEmbeddedKafkaTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerWithResumeRouteStrategyIT.class);
+    private static final String TOPIC = "resumable-route-tp";
+    private static final int RANDOM_VALUE = 
ThreadLocalRandom.current().nextInt(1, 1000);
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Autowired
+    @Qualifier("resumeStrategy")
+    private TestKafkaConsumerResumeStrategy resumeStrategy;
+
+    @Autowired
+    private CountDownLatch messagesLatch;
+    private KafkaProducer<Object, Object> producer;
+
+    static class TestKafkaConsumerResumeStrategy
+            implements KafkaConsumerResumeStrategy,
+            UpdatableConsumerResumeStrategy<String, Integer, Resumable<String, 
Integer>>, Service {
+        private final CountDownLatch messagesLatch;
+        private boolean resumeCalled;
+        private boolean consumerIsNull = true;
+        private boolean startCalled;
+        private boolean offsetNull = true;
+        private boolean offsetAddressableNull = true;
+        private boolean offsetAddressableEmpty = true;
+        private boolean offsetValueNull = true;
+        private boolean offsetValueEmpty = true;
+        private int lastOffset;
+
+        public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) {
+            this.messagesLatch = messagesLatch;
+        }
+
+        @Override
+        public void setConsumer(Consumer<?, ?> consumer) {
+            if (consumer != null) {
+                consumerIsNull = false;
+            }
+        }
+
+        @Override
+        public void resume(KafkaResumable resumable) {
+            resumeCalled = true;
+
+        }
+
+        @Override
+        public void resume() {
+            resumeCalled = true;
+        }
+
+        public boolean isResumeCalled() {
+            return resumeCalled;
+        }
+
+        public boolean isConsumerIsNull() {
+            return consumerIsNull;
+        }
+
+        @Override
+        public void start() {
+            LOG.warn("Start was called");
+            startCalled = true;
+        }
+
+        @Override
+        public void init() {
+            LOG.warn("Init was called");
+        }
+
+        public boolean isStartCalled() {
+            return startCalled;
+        }
+
+        @Override
+        public void updateLastOffset(Resumable<String, Integer> offset) {
+            try {
+                if (offset != null) {
+                    offsetNull = false;
+
+                    String addressable = offset.getAddressable();
+                    if (addressable != null) {
+                        offsetAddressableNull = false;
+                        offsetAddressableEmpty = addressable.isEmpty() || 
addressable.isBlank();
+
+                    }
+
+                    Offset<Integer> offsetValue = offset.getLastOffset();
+                    if (offsetValue != null) {
+                        offsetValueNull = false;
+
+                        if (offsetValue.offset() != null) {
+                            offsetValueEmpty = false;
+                            lastOffset = offsetValue.offset();
+                        }
+                    }
+                }
+            } finally {
+                messagesLatch.countDown();
+            }
+        }
+
+        public boolean isOffsetNull() {
+            return offsetNull;
+        }
+
+        public boolean isOffsetAddressableNull() {
+            return offsetAddressableNull;
+        }
+
+        public boolean isOffsetValueNull() {
+            return offsetValueNull;
+        }
+
+        public boolean isOffsetAddressableEmpty() {
+            return offsetAddressableEmpty;
+        }
+
+        public boolean isOffsetValueEmpty() {
+            return offsetValueEmpty;
+        }
+    }
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+
+        for (int i = 0; i < 10; i++) {
+            producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+            producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i)));
+        }
+    }
+
+    @Test
+    //    @Timeout(value = 30)
+    public void testOffsetIsBeingChecked() throws InterruptedException {
+        assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was 
not called");
+
+        assertTrue(resumeStrategy.isResumeCalled(),
+                "The resume strategy should have been called when the 
partition was assigned");
+        assertFalse(resumeStrategy.isConsumerIsNull(),
+                "The consumer passed to the strategy should not be null");
+        assertTrue(resumeStrategy.isStartCalled(),
+                "The resume strategy should have been started");
+        assertFalse(resumeStrategy.isOffsetNull(),
+                "The offset should not be null");
+        assertFalse(resumeStrategy.isOffsetAddressableNull(),
+                "The offset addressable should not be null");
+        assertFalse(resumeStrategy.isOffsetAddressableEmpty(),
+                "The offset addressable should not be empty");
+        assertFalse(resumeStrategy.isOffsetValueNull(),
+                "The offset value should not be null");
+        assertFalse(resumeStrategy.isOffsetValueEmpty(),
+                "The offset value should not be empty");
+        assertEquals(RANDOM_VALUE, resumeStrategy.lastOffset, "the offsets 
don't match");
+    }
+
+    @AfterEach
+    public void after() {
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("kafka:" + TOPIC + "?groupId=" + TOPIC + 
"_GROUP&autoCommitIntervalMs=1000"
+                            + "&autoOffsetReset=earliest&consumersCount=1")
+                            .routeId("resume-strategy-route")
+                            .setHeader(Exchange.OFFSET,
+                                    constant(Resumables.of("key", 
RANDOM_VALUE)))
+                            .resumable().resumeStrategy("resumeStrategy")
+                            .to("mock:result");
+                }
+            };
+        }
+
+
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
new file mode 100644
index 0000000..43803d9
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaProducerFullIT.class,
+                KafkaProducerFullIT.TestConfiguration.class,
+        }
+)
+public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
+
+    private static final String TOPIC_STRINGS = "test";
+    private static final String TOPIC_INTERCEPTED = "test";
+    private static final String TOPIC_STRINGS_IN_HEADER = "testHeader";
+    private static final String TOPIC_BYTES = "testBytes";
+    private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader";
+    private static final String GROUP_BYTES = "groupStrings";
+    private static final String TOPIC_PROPAGATED_HEADERS = 
"testPropagatedHeaders";
+    private static final String TOPIC_NO_RECORD_SPECIFIC_HEADERS = 
"noRecordSpecificHeaders";
+
+    private static KafkaConsumer<String, String> stringsConsumerConn;
+    private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
+
+    private final String toStrings = "kafka:" + TOPIC_STRINGS + 
"?requestRequiredAcks=-1";
+
+    private final String toStrings2 = "kafka:" + TOPIC_STRINGS + 
"?requestRequiredAcks=-1&partitionKey=0";
+
+    private final String toStringsWithInterceptor = "kafka:" + 
TOPIC_INTERCEPTED + "?requestRequiredAcks=-1"
+            + 
"&interceptorClasses=org.apache.camel.component.kafka.integration.MockProducerInterceptor";
+
+    @EndpointInject("mock:kafkaAck")
+    private MockEndpoint mockEndpoint;
+
+    private final String toBytes = "kafka:" + TOPIC_BYTES + 
"?requestRequiredAcks=-1"
+            + 
"&valueSerializer=org.apache.kafka.common.serialization.ByteArraySerializer&"
+            + 
"keySerializer=org.apache.kafka.common.serialization.ByteArraySerializer";
+
+    private final String toPropagatedHeaders = "kafka:" + 
TOPIC_PROPAGATED_HEADERS + "?requestRequiredAcks=-1";
+
+    private final String toNoRecordSpecificHeaders = "kafka:" + 
TOPIC_NO_RECORD_SPECIFIC_HEADERS + "?requestRequiredAcks=-1";
+
+    @Produce("direct:startStrings")
+    private ProducerTemplate stringsTemplate;
+
+    @Produce("direct:startStrings2")
+    private ProducerTemplate stringsTemplate2;
+
+    @Produce("direct:startBytes")
+    private ProducerTemplate bytesTemplate;
+
+    @Produce("direct:startTraced")
+    private ProducerTemplate interceptedTemplate;
+
+    @Produce("direct:propagatedHeaders")
+    private ProducerTemplate propagatedHeadersTemplate;
+
+    @Produce("direct:noRecordSpecificHeaders")
+    private ProducerTemplate noRecordSpecificHeadersTemplate;
+
+    @BeforeAll
+    public static void before() {
+        stringsConsumerConn = createStringKafkaConsumer("DemoConsumer");
+        bytesConsumerConn = createByteKafkaConsumer(GROUP_BYTES);
+    }
+
+    @AfterAll
+    public void after() {
+        // clean all test topics
+        final List<String> topics = new ArrayList<>();
+        topics.add(TOPIC_BYTES);
+        topics.add(TOPIC_INTERCEPTED);
+        topics.add(TOPIC_PROPAGATED_HEADERS);
+        topics.add(TOPIC_STRINGS);
+
+        kafkaAdminClient.deleteTopics(topics);
+        mockEndpoint.reset();
+    }
+
+    @Test
+    public void producedStringMessageIsReceivedByKafka() throws 
InterruptedException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + 
messageInOtherTopic);
+
+        sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test 
message", KafkaConstants.PARTITION_KEY, "0");
+        sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test 
message in other topic",
+                KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
+                TOPIC_STRINGS_IN_HEADER);
+
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, 
TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+        boolean allMessagesReceived = messagesLatch.await(200, 
TimeUnit.MILLISECONDS);
+
+        assertTrue(allMessagesReceived,
+                "Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount());
+
+        List<Exchange> exchangeList = mockEndpoint.getExchanges();
+        assertEquals(15, exchangeList.size(), "Fifteen Exchanges are 
expected");
+        for (Exchange exchange : exchangeList) {
+            @SuppressWarnings("unchecked")
+            List<RecordMetadata> recordMetaData1
+                    = (List<RecordMetadata>) 
(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+            assertEquals(1, recordMetaData1.size(), "One RecordMetadata is 
expected.");
+            assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is 
positive");
+            assertTrue(recordMetaData1.get(0).topic().startsWith("test"), 
"Topic Name start with 'test'");
+        }
+    }
+
+    @Test
+    public void producedString2MessageIsReceivedByKafka() throws 
InterruptedException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + 
messageInOtherTopic);
+
+        sendMessagesInRoute(messageInTopic, stringsTemplate2, "IT test 
message", (String[]) null);
+        sendMessagesInRoute(messageInOtherTopic, stringsTemplate2, "IT test 
message in other topic",
+                KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
+                TOPIC_STRINGS_IN_HEADER);
+
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, 
TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+        boolean allMessagesReceived = messagesLatch.await(200, 
TimeUnit.MILLISECONDS);
+
+        assertTrue(allMessagesReceived,
+                "Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount());
+
+        List<Exchange> exchangeList = mockEndpoint.getExchanges();
+        assertEquals(15, exchangeList.size(), "Fifteen Exchanges are 
expected");
+        for (Exchange exchange : exchangeList) {
+            @SuppressWarnings("unchecked")
+            List<RecordMetadata> recordMetaData1
+                    = (List<RecordMetadata>) 
(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+            assertEquals(1, recordMetaData1.size(), "One RecordMetadata is 
expected.");
+            assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is 
positive");
+            assertTrue(recordMetaData1.get(0).topic().startsWith("test"), 
"Topic Name start with 'test'");
+        }
+    }
+
+    @Test
+    public void producedStringMessageIsIntercepted() throws 
InterruptedException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + 
messageInOtherTopic);
+
+        sendMessagesInRoute(messageInTopic, interceptedTemplate, "IT test 
message", KafkaConstants.PARTITION_KEY, "0");
+        sendMessagesInRoute(messageInOtherTopic, interceptedTemplate, "IT test 
message in other topic",
+                KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
+                TOPIC_STRINGS_IN_HEADER);
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_INTERCEPTED, 
TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+        boolean allMessagesReceived = messagesLatch.await(200, 
TimeUnit.MILLISECONDS);
+
+        assertTrue(allMessagesReceived,
+                "Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount());
+
+        Assertions.assertEquals(messageInTopic + messageInOtherTopic, 
MockProducerInterceptor.recordsCaptured.size());
+    }
+
+    @Test
+    public void producedStringCollectionMessageIsReceivedByKafka() throws 
InterruptedException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + 
messageInOtherTopic);
+
+        List<String> msgs = new ArrayList<>();
+        for (int x = 0; x < messageInTopic; x++) {
+            msgs.add("Message " + x);
+        }
+
+        sendMessagesInRoute(1, stringsTemplate, msgs, 
KafkaConstants.PARTITION_KEY, "0");
+        msgs = new ArrayList<>();
+        for (int x = 0; x < messageInOtherTopic; x++) {
+            msgs.add("Other Message " + x);
+        }
+        sendMessagesInRoute(1, stringsTemplate, msgs, 
KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
+                TOPIC_STRINGS_IN_HEADER);
+
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, 
TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+        boolean allMessagesReceived = messagesLatch.await(200, 
TimeUnit.MILLISECONDS);
+
+        assertTrue(allMessagesReceived,
+                "Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount());
+        List<Exchange> exchangeList = mockEndpoint.getExchanges();
+        assertEquals(2, exchangeList.size(), "Two Exchanges are expected");
+        Exchange e1 = exchangeList.get(0);
+        @SuppressWarnings("unchecked")
+        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) 
(e1.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+        assertEquals(10, recordMetaData1.size(), "Ten RecordMetadata is 
expected.");
+        for (RecordMetadata recordMeta : recordMetaData1) {
+            assertTrue(recordMeta.offset() >= 0, "Offset is positive");
+            assertTrue(recordMeta.topic().startsWith("test"), "Topic Name 
start with 'test'");
+        }
+        Exchange e2 = exchangeList.get(1);
+        @SuppressWarnings("unchecked")
+        List<RecordMetadata> recordMetaData2 = (List<RecordMetadata>) 
(e2.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+        assertEquals(5, recordMetaData2.size(), "Five RecordMetadata is 
expected.");
+        for (RecordMetadata recordMeta : recordMetaData2) {
+            assertTrue(recordMeta.offset() >= 0, "Offset is positive");
+            assertTrue(recordMeta.topic().startsWith("test"), "Topic Name 
start with 'test'");
+        }
+    }
+
+    @Test
+    public void producedBytesMessageIsReceivedByKafka() throws 
InterruptedException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + 
messageInOtherTopic);
+
+        Map<String, Object> inTopicHeaders = new HashMap<>();
+        inTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
+        sendMessagesInRoute(messageInTopic, bytesTemplate, "IT test 
message".getBytes(), inTopicHeaders);
+
+        Map<String, Object> otherTopicHeaders = new HashMap<>();
+        otherTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
+        otherTopicHeaders.put(KafkaConstants.TOPIC, TOPIC_BYTES_IN_HEADER);
+        sendMessagesInRoute(messageInOtherTopic, bytesTemplate, "IT test 
message in other topic".getBytes(), otherTopicHeaders);
+
+        createKafkaBytesMessageConsumer(bytesConsumerConn, TOPIC_BYTES, 
TOPIC_BYTES_IN_HEADER, messagesLatch);
+
+        boolean allMessagesReceived = messagesLatch.await(200, 
TimeUnit.MILLISECONDS);
+
+        assertTrue(allMessagesReceived,
+                "Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount());
+
+        List<Exchange> exchangeList = mockEndpoint.getExchanges();
+        assertEquals(15, exchangeList.size(), "Fifteen Exchanges are 
expected");
+        for (Exchange exchange : exchangeList) {
+            @SuppressWarnings("unchecked")
+            List<RecordMetadata> recordMetaData1
+                    = (List<RecordMetadata>) 
(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+            assertEquals(1, recordMetaData1.size(), "One RecordMetadata is 
expected.");
+            assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is 
positive");
+            assertTrue(recordMetaData1.get(0).topic().startsWith("test"), 
"Topic Name start with 'test'");
+        }
+    }
+
+    @Test
+    public void propagatedHeaderIsReceivedByKafka() throws Exception {
+        String propagatedStringHeaderKey = "PROPAGATED_STRING_HEADER";
+        String propagatedStringHeaderValue = "propagated string header value";
+
+        String propagatedIntegerHeaderKey = "PROPAGATED_INTEGER_HEADER";
+        Integer propagatedIntegerHeaderValue = 54545;
+
+        String propagatedLongHeaderKey = "PROPAGATED_LONG_HEADER";
+        Long propagatedLongHeaderValue = 5454545454545L;
+
+        String propagatedDoubleHeaderKey = "PROPAGATED_DOUBLE_HEADER";
+        Double propagatedDoubleHeaderValue = 43434.545D;
+
+        String propagatedBytesHeaderKey = "PROPAGATED_BYTES_HEADER";
+        byte[] propagatedBytesHeaderValue = new byte[] { 121, 34, 34, 54, 5, 
3, 54, -34 };
+
+        String propagatedBooleanHeaderKey = "PROPAGATED_BOOLEAN_HEADER";
+        Boolean propagatedBooleanHeaderValue = Boolean.TRUE;
+
+        Map<String, Object> camelHeaders = new HashMap<>();
+        camelHeaders.put(propagatedStringHeaderKey, 
propagatedStringHeaderValue);
+        camelHeaders.put(propagatedIntegerHeaderKey, 
propagatedIntegerHeaderValue);
+        camelHeaders.put(propagatedLongHeaderKey, propagatedLongHeaderValue);
+        camelHeaders.put(propagatedDoubleHeaderKey, 
propagatedDoubleHeaderValue);
+        camelHeaders.put(propagatedBytesHeaderKey, propagatedBytesHeaderValue);
+        camelHeaders.put(propagatedBooleanHeaderKey, 
propagatedBooleanHeaderValue);
+
+        camelHeaders.put("CustomObjectHeader", new Object());
+        camelHeaders.put("CustomNullObjectHeader", null);
+        camelHeaders.put("CamelFilteredHeader", "CamelFilteredHeader value");
+
+        CountDownLatch messagesLatch = new CountDownLatch(1);
+        propagatedHeadersTemplate.sendBodyAndHeaders("Some test message", 
camelHeaders);
+
+        List<ConsumerRecord<String, String>> records = 
pollForRecords(createStringKafkaConsumer("propagatedHeaderConsumer"),
+                TOPIC_PROPAGATED_HEADERS, messagesLatch);
+        boolean allMessagesReceived = messagesLatch.await(10_000, 
TimeUnit.MILLISECONDS);
+
+        assertTrue(allMessagesReceived,
+                "Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount());
+
+        ConsumerRecord<String, String> record = records.get(0);
+        Headers headers = record.headers();
+        assertNotNull(headers, "Kafka Headers should not be null.");
+        // we have 6 headers
+        assertEquals(6, headers.toArray().length, "6 propagated header is 
expected.");
+        assertEquals(propagatedStringHeaderValue, new 
String(getHeaderValue(propagatedStringHeaderKey, headers)),
+                "Propagated string value received");
+        assertEquals(propagatedIntegerHeaderValue,
+                
Integer.valueOf(ByteBuffer.wrap(getHeaderValue(propagatedIntegerHeaderKey, 
headers)).getInt()),
+                "Propagated integer value received");
+        assertEquals(propagatedLongHeaderValue,
+                
Long.valueOf(ByteBuffer.wrap(getHeaderValue(propagatedLongHeaderKey, 
headers)).getLong()),
+                "Propagated long value received");
+        assertEquals(propagatedDoubleHeaderValue,
+                
Double.valueOf(ByteBuffer.wrap(getHeaderValue(propagatedDoubleHeaderKey, 
headers)).getDouble()),
+                "Propagated double value received");
+        assertArrayEquals(propagatedBytesHeaderValue, 
getHeaderValue(propagatedBytesHeaderKey, headers),
+                "Propagated byte array value received");
+        assertEquals(propagatedBooleanHeaderValue,
+                Boolean.valueOf(new 
String(getHeaderValue(propagatedBooleanHeaderKey, headers))),
+                "Propagated boolean value received");
+    }
+
+    @Test
+    public void recordSpecificHeaderIsNotReceivedByKafka() throws Exception {
+        String propagatedStringHeaderKey = KafkaConstants.TOPIC;
+        String propagatedStringHeaderValue = "source topic";
+
+        Map<String, Object> camelHeaders = new HashMap<>();
+        camelHeaders.put(propagatedStringHeaderKey, 
propagatedStringHeaderValue);
+
+        CountDownLatch messagesLatch = new CountDownLatch(1);
+        noRecordSpecificHeadersTemplate.sendBodyAndHeaders("Some test 
message", camelHeaders);
+
+        List<ConsumerRecord<String, String>> records = pollForRecords(
+                createStringKafkaConsumer("noRecordSpecificHeadersConsumer"), 
TOPIC_NO_RECORD_SPECIFIC_HEADERS, messagesLatch);
+        boolean allMessagesReceived = messagesLatch.await(10_000, 
TimeUnit.MILLISECONDS);
+
+        assertTrue(allMessagesReceived,
+                "Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount());
+
+        ConsumerRecord<String, String> record = records.get(0);
+        Headers headers = record.headers();
+        assertNotNull(headers, "Kafka Headers should not be null.");
+        // we have 0 headers
+        assertEquals(0, headers.toArray().length, "0 propagated headers are 
expected");
+    }
+
+    @Test
+    public void headerFilterStrategyCouldBeOverridden() {
+        KafkaEndpoint kafkaEndpoint
+                = 
context.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy",
 KafkaEndpoint.class);
+        assertIsInstanceOf(MyHeaderFilterStrategy.class, 
kafkaEndpoint.getConfiguration().getHeaderFilterStrategy());
+    }
+
+    @Test
+    public void headerSerializerCouldBeOverridden() {
+        KafkaEndpoint kafkaEndpoint = context
+                
.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerSerializer=#myHeaderSerializer",
 KafkaEndpoint.class);
+        assertIsInstanceOf(MyKafkaHeadersSerializer.class, 
kafkaEndpoint.getConfiguration().getHeaderSerializer());
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:startStrings").to(toStrings).to(mockEndpoint);
+
+                    
from("direct:startStrings2").to(toStrings2).to(mockEndpoint);
+
+                    from("direct:startBytes").to(toBytes).to(mockEndpoint);
+
+                    
from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint);
+
+                    
from("direct:propagatedHeaders").to(toPropagatedHeaders).to(mockEndpoint);
+
+                    
from("direct:noRecordSpecificHeaders").to(toNoRecordSpecificHeaders).to(mockEndpoint);
+                }
+            };
+        }
+
+        @Bean("myStrategy")
+        public MyHeaderFilterStrategy createMyHeaderFilterStrategy(){
+            return new MyHeaderFilterStrategy();
+        }
+
+        @Bean("myHeaderSerializer")
+        public MyKafkaHeadersSerializer createMyKafkaHeadersSerializer(){
+            return new MyKafkaHeadersSerializer();
+        }
+    }
+
+    private byte[] getHeaderValue(String headerKey, Headers headers) {
+        Header foundHeader = StreamSupport.stream(headers.spliterator(), 
false).filter(header -> header.key().equals(headerKey))
+                .findFirst().orElse(null);
+        assertNotNull(foundHeader, "Header should be sent");
+        return foundHeader.value();
+    }
+
+    private static KafkaConsumer<String, String> 
createStringKafkaConsumer(final String groupId) {
+        Properties stringsProps = new Properties();
+
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
 getBootstrapServers());
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
 groupId);
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
 "true");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
 "1000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
 "30000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
 "earliest");
+
+        return new KafkaConsumer<>(stringsProps);
+    }
+
+    private static KafkaConsumer<byte[], byte[]> createByteKafkaConsumer(final 
String groupId) {
+        Properties stringsProps = new Properties();
+
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
 getBootstrapServers());
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
 groupId);
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
 "true");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
 "1000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
 "30000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
 "earliest");
+
+        return new KafkaConsumer<>(stringsProps);
+    }
+
+    private List<ConsumerRecord<String, String>> pollForRecords(
+            KafkaConsumer<String, String> consumerConn, String topic, 
CountDownLatch messagesLatch) {
+
+        List<ConsumerRecord<String, String>> consumedRecords = new 
ArrayList<>();
+        consumerConn.subscribe(Collections.singletonList(topic));
+
+        new Thread(() -> {
+            while (messagesLatch.getCount() != 0) {
+                for (ConsumerRecord<String, String> record : 
consumerConn.poll(Duration.ofMillis(100))) {
+                    consumedRecords.add(record);
+                    messagesLatch.countDown();
+                }
+            }
+        }).start();
+
+        return consumedRecords;
+    }
+
+    private void createKafkaMessageConsumer(
+            KafkaConsumer<String, String> consumerConn, String topic, String 
topicInHeader, CountDownLatch messagesLatch) {
+
+        consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
+        boolean run = true;
+
+        while (run) {
+            ConsumerRecords<String, String> records = 
consumerConn.poll(Duration.ofMillis(100));
+            for (int i = 0; i < records.count(); i++) {
+                messagesLatch.countDown();
+                if (messagesLatch.getCount() == 0) {
+                    run = false;
+                }
+            }
+        }
+
+    }
+
+    private void createKafkaBytesMessageConsumer(
+            KafkaConsumer<byte[], byte[]> consumerConn, String topic, String 
topicInHeader, CountDownLatch messagesLatch) {
+
+        consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
+        boolean run = true;
+
+        while (run) {
+            ConsumerRecords<byte[], byte[]> records = 
consumerConn.poll(Duration.ofMillis(100));
+            for (int i = 0; i < records.count(); i++) {
+                messagesLatch.countDown();
+                if (messagesLatch.getCount() == 0) {
+                    run = false;
+                }
+            }
+        }
+
+    }
+
+    private void sendMessagesInRoute(int messages, ProducerTemplate template, 
Object bodyOther, String... headersWithValue) {
+        Map<String, Object> headerMap = new HashMap<>();
+        if (headersWithValue != null) {
+            for (int i = 0; i < headersWithValue.length; i = i + 2) {
+                headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
+            }
+        }
+        sendMessagesInRoute(messages, template, bodyOther, headerMap);
+    }
+
+    private void sendMessagesInRoute(int messages, ProducerTemplate template, 
Object bodyOther, Map<String, Object> headerMap) {
+        for (int k = 0; k < messages; k++) {
+            template.sendBodyAndHeaders(bodyOther, headerMap);
+        }
+    }
+
+    private static class MyHeaderFilterStrategy extends 
DefaultHeaderFilterStrategy {
+    }
+
+    private static class MyKafkaHeadersSerializer extends 
DefaultKafkaHeaderSerializer {
+    }
+
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaToDIT.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaToDIT.java
new file mode 100644
index 0000000..6706a81
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaToDIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+                KafkaToDIT.class,
+                KafkaToDIT.TestConfiguration.class,
+        }
+)
+public class KafkaToDIT extends BaseEmbeddedKafkaTestSupport {
+
+    @Autowired
+    ProducerTemplate template;
+
+    @Test
+    public void testToD() {
+        template.sendBodyAndHeader("direct:start", "Hello bar", "where", 
"bar");
+        template.sendBodyAndHeader("direct:start", "Hello beer", "where", 
"beer");
+
+        // there should only be one kafka endpoint
+        long count = context.getEndpoints().stream().filter(e -> 
e.getEndpointUri().startsWith("kafka:")).count();
+        assertEquals(1, count, "There should only be 1 kafka endpoint");
+    }
+
+    @Configuration
+    public class TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    // route message dynamic using toD
+                    from("direct:start").toD("kafka://${header.where}");
+                }
+            };
+        }
+    }
+
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockConsumerInterceptor.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockConsumerInterceptor.java
new file mode 100644
index 0000000..1914f2e
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockConsumerInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.ArrayList;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockConsumerInterceptor implements ConsumerInterceptor<String, 
String> {
+    public static ArrayList<ConsumerRecords<String, String>> recordsCaptured = 
new ArrayList<>();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MockConsumerInterceptor.class);
+
+    @Override
+    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, 
String> consumerRecords) {
+        consumerRecords.forEach(r -> LOG.trace("Captured on mock: {}", 
r.value()));
+        recordsCaptured.add(consumerRecords);
+        return consumerRecords;
+    }
+
+    @Override
+    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
+        // noop
+    }
+
+    @Override
+    public void close() {
+        // noop
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+        // noop
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockProducerInterceptor.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockProducerInterceptor.java
new file mode 100644
index 0000000..329f324
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockProducerInterceptor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.ArrayList;
+import java.util.Map;
+import org.apache.kafka.clients.producer.ProducerInterceptor;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+public class MockProducerInterceptor implements ProducerInterceptor<String, 
String> {
+
+    public static ArrayList<ProducerRecord<String, String>> recordsCaptured = 
new ArrayList<>();
+
+    @Override
+    public ProducerRecord<String, String> onSend(ProducerRecord<String, 
String> producerRecord) {
+        recordsCaptured.add(producerRecord);
+        return producerRecord;
+    }
+
+    @Override
+    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
+        // noop
+    }
+
+    @Override
+    public void close() {
+        // noop
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+        // noop
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/OffsetStateRepositoryConfiguration.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/OffsetStateRepositoryConfiguration.java
new file mode 100644
index 0000000..1025c7c
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/OffsetStateRepositoryConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.concurrent.CountDownLatch;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class OffsetStateRepositoryConfiguration {
+
+    @Bean("offset")
+    public KafkaConsumerRebalanceIT.OffsetStateRepository 
createOffsetStateRepository(CountDownLatch messagesLatch){
+        return new 
KafkaConsumerRebalanceIT.OffsetStateRepository(messagesLatch);
+    }
+
+    @Bean
+    public CountDownLatch createCountDownLatch(){
+        return new CountDownLatch(1);
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/ResumeStrategyConfiguration.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/ResumeStrategyConfiguration.java
new file mode 100644
index 0000000..e9ea18c
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/ResumeStrategyConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.concurrent.CountDownLatch;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ResumeStrategyConfiguration {
+
+    @Bean("resumeStrategy")
+    public 
KafkaConsumerWithResumeRouteStrategyIT.TestKafkaConsumerResumeStrategy 
createTestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch){
+        return new 
KafkaConsumerWithResumeRouteStrategyIT.TestKafkaConsumerResumeStrategy(messagesLatch);
+    }
+
+    @Bean
+    public CountDownLatch createCountDownLatch(){
+        return new CountDownLatch(1);
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
new file mode 100644
index 0000000..99befb6
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.time.Duration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.hamcrest.core.IsNot;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.mockito.Mockito.when;
+
+
+@ExtendWith(MockitoExtension.class)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                KafkaConsumerTest.class,
+        }
+)
+public class KafkaConsumerTest {
+
+    @Mock
+    private KafkaConsumer<Object, Object> kafkaConsumer;
+
+    @BeforeEach
+    public void init() {
+        
when(kafkaConsumer.poll(Duration.ofSeconds(1))).thenReturn(ConsumerRecords.empty());
+    }
+
+    @Test
+    public void testPollGivenReturnsEmptyConsumerRecordShouldNotBeNull() {
+        ConsumerRecords<Object, Object> consumerRecords = 
kafkaConsumer.poll(Duration.ofSeconds(1));
+        assertThat(consumerRecords, IsNot.not(nullValue()));
+    }
+
+    @Test
+    public void testPollGivenReturnsEmptyPartitionsShouldNotBeNull() {
+        ConsumerRecords<Object, Object> consumerRecords = 
kafkaConsumer.poll(Duration.ofSeconds(1));
+        assertThat(consumerRecords.partitions(), IsNot.not(nullValue()));
+    }
+}

Reply via email to