This is an automated email from the ASF dual-hosted git repository.
jpoth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
The following commit(s) were added to refs/heads/main by this push:
new e44242f [CAMEL-17805] add tests in camel-kafka-starter (#476)
e44242f is described below
commit e44242f9eef226050c10a0ad3a91c8af805c7e35
Author: John Poth <[email protected]>
AuthorDate: Mon Mar 21 12:16:41 2022 +0100
[CAMEL-17805] add tests in camel-kafka-starter (#476)
* [CAMEL-17805] add tests in camel-kafka-starter
* Regen
---
components-starter/camel-kafka-starter/pom.xml | 125 +++++
.../integration/BaseEmbeddedKafkaTestSupport.java | 108 ++++
.../integration/CustomHeaderDeserializer.java | 39 ++
.../KafkaConsumerAsyncManualCommitIT.java | 160 ++++++
.../integration/KafkaConsumerBatchSizeIT.java | 116 +++++
.../kafka/integration/KafkaConsumerFullIT.java | 230 +++++++++
.../integration/KafkaConsumerHealthCheckIT.java | 211 ++++++++
.../integration/KafkaConsumerIdempotentIT.java | 102 ++++
.../KafkaConsumerIdempotentTestSupport.java | 66 +++
...kaConsumerIdempotentWithCustomSerializerIT.java | 97 ++++
.../KafkaConsumerIdempotentWithProcessorIT.java | 104 ++++
.../KafkaConsumerLastRecordHeaderIT.java | 113 +++++
.../integration/KafkaConsumerManualCommitIT.java | 180 +++++++
.../integration/KafkaConsumerRebalanceIT.java | 120 +++++
.../integration/KafkaConsumerTopicIsPatternIT.java | 113 +++++
.../KafkaConsumerWithResumeRouteStrategyIT.java | 250 +++++++++
.../kafka/integration/KafkaProducerFullIT.java | 561 +++++++++++++++++++++
.../component/kafka/integration/KafkaToDIT.java | 71 +++
.../kafka/integration/MockConsumerInterceptor.java | 54 ++
.../kafka/integration/MockProducerInterceptor.java | 49 ++
.../OffsetStateRepositoryConfiguration.java | 35 ++
.../integration/ResumeStrategyConfiguration.java | 35 ++
.../kafka/clients/consumer/KafkaConsumerTest.java | 62 +++
23 files changed, 3001 insertions(+)
diff --git a/components-starter/camel-kafka-starter/pom.xml
b/components-starter/camel-kafka-starter/pom.xml
index 7a55a4a..368c4d6 100644
--- a/components-starter/camel-kafka-starter/pom.xml
+++ b/components-starter/camel-kafka-starter/pom.xml
@@ -52,6 +52,31 @@
</exclusions>
-->
</dependency>
+ <!-- test -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-kafka</artifactId>
+ <version>${camel-version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-spring-junit5</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-spring-xml</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <version>${spring-boot-version}</version>
+ <scope>test</scope>
+ </dependency>
<!--START OF GENERATED CODE-->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
@@ -59,4 +84,104 @@
</dependency>
<!--END OF GENERATED CODE-->
</dependencies>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <reuseForks>true</reuseForks>
+
<forkedProcessTimeoutInSeconds>${camel.failsafe.forkTimeout}</forkedProcessTimeoutInSeconds>
+ <redirectTestOutputToFile>false</redirectTestOutputToFile>
+ <systemPropertyVariables>
+ <visibleassertions.silence>true</visibleassertions.silence>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+ <profiles>
+ <!-- activate integration test if the docker socket file is accessible -->
+ <profile>
+ <id>kafka-integration-tests-docker-file</id>
+ <activation>
+ <file>
+ <exists>/var/run/docker.sock</exists>
+ </file>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>integration-test</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>kafka-3</id>
+ <configuration>
+ <systemPropertyVariables>
+
<kafka.instance.type>local-kafka3-container</kafka.instance.type>
+ </systemPropertyVariables>
+ <reportNameSuffix>kafka-3</reportNameSuffix>
+ </configuration>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <!-- activate integration test if the DOCKER_HOST env var is set -->
+ <profile>
+ <id>kafka-integration-tests-docker-env</id>
+ <activation>
+ <property>
+ <name>env.DOCKER_HOST</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>integration-test</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>kafka-3</id>
+ <configuration>
+ <systemPropertyVariables>
+
<kafka.instance.type>local-kafka3-container</kafka.instance.type>
+ </systemPropertyVariables>
+ <reportNameSuffix>kafka-3</reportNameSuffix>
+ </configuration>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
new file mode 100644
index 0000000..7b83738
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaTestSupport.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Properties;
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.spring.boot.CamelContextConfiguration;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+public abstract class BaseEmbeddedKafkaTestSupport {
+
+ @RegisterExtension
+ public static KafkaService service = KafkaServiceFactory.createService();
+
+ @Autowired
+ protected CamelContext context;
+
+ protected static AdminClient kafkaAdminClient;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseEmbeddedKafkaTestSupport.class);
+
+ @BeforeAll
+ public static void beforeClass() {
+ LOG.info("### Embedded Kafka cluster broker list: " +
service.getBootstrapServers());
+ System.setProperty("bootstrapServers", service.getBootstrapServers());
+ }
+
+ @BeforeEach
+ public void setKafkaAdminClient() {
+ if (kafkaAdminClient == null) {
+ kafkaAdminClient = createAdminClient();
+ }
+ }
+
+ protected Properties getDefaultProperties() {
+ LOG.info("Connecting to Kafka {}", service.getBootstrapServers());
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
service.getBootstrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+ props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+ props.put(ProducerConfig.ACKS_CONFIG, "1");
+ return props;
+ }
+
+ protected static String getBootstrapServers() {
+ return service.getBootstrapServers();
+ }
+
+ private static AdminClient createAdminClient() {
+ final Properties properties = new Properties();
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
service.getBootstrapServers());
+
+ return KafkaAdminClient.create(properties);
+ }
+
+ @Configuration
+ public class DefaulKafkaComponent {
+ @Bean
+ CamelContextConfiguration contextConfiguration() {
+ return new CamelContextConfiguration() {
+ @Override
+ public void beforeApplicationStart(CamelContext context) {
+ context.getPropertiesComponent().setLocation("ref:prop");
+
+ KafkaComponent kafka = new KafkaComponent(context);
+ kafka.init();
+
kafka.getConfiguration().setBrokers(BaseEmbeddedKafkaTestSupport.service.getBootstrapServers());
+ context.addComponent("kafka", kafka);
+ }
+
+ @Override
+ public void afterApplicationStart(CamelContext camelContext) {
+ //do nothing here
+ }
+ };
+ }
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/CustomHeaderDeserializer.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/CustomHeaderDeserializer.java
new file mode 100644
index 0000000..e3be099
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/CustomHeaderDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.integration;
+
+import java.math.BigInteger;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+ private static final Logger LOG =
LoggerFactory.getLogger(CustomHeaderDeserializer.class);
+
+ @Override
+ public Object deserialize(String key, byte[] value) {
+ if (key.equals("id")) {
+ BigInteger bi = new BigInteger(value);
+ LOG.debug("Converted the header {} to {} via custom serializer",
key, bi.longValue());
+
+ return String.valueOf(bi.longValue());
+ } else {
+ return super.deserialize(key, value);
+ }
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
new file mode 100644
index 0000000..e4d2e1d
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.AggregationStrategies;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import
org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerAsyncManualCommitIT.class,
+ KafkaConsumerAsyncManualCommitIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerAsyncManualCommitIT extends
BaseEmbeddedKafkaTestSupport {
+
+ public static final String TOPIC = "testManualCommitTest";
+
+ private final String from = "kafka:" + TOPIC
+ +
"?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+ +
"&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory";
+
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ @EndpointInject("mock:resultBar")
+ private MockEndpoint toBar;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+ }
+
+ @RepeatedTest(4)
+ public void kafkaManualCommit() throws Exception {
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
+ // The LAST_RECORD_BEFORE_COMMIT header should include a value as we
use
+ // manual commit
+
to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
+
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ producer.send(data);
+ }
+
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ // Second step: We shut down our route, we expect nothing will be
recovered by our route
+ context.getRouteController().stopRoute("foo");
+ to.expectedMessageCount(0);
+
+ // Third step: While our route is stopped, we send 3 records more to
Kafka test topic
+ for (int k = 5; k < 8; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ producer.send(data);
+ }
+
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ // Fourth step: We start again our route, since we have been
committing the offsets from the first step,
+ // we will expect to consume from the latest committed offset e.g from
offset 5
+ context.getRouteController().startRoute("foo");
+ to.expectedMessageCount(3);
+ to.expectedBodiesReceivedInAnyOrder("message-5", "message-6",
"message-7");
+
+ to.assertIsSatisfied(3000);
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean("testFactory")
+ public KafkaManualCommitFactory createKafkaManualCommitFactory (){
+ return new DefaultKafkaManualAsyncCommitFactory();
+ }
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("foo").to("direct:aggregate");
+ // With sync manual commit, this would throw a concurrent
modification exception
+ // It can be used in aggregator with completion
timeout/interval for instance
+ // WARN: records from one partition must be processed by
one unique thread
+ from("direct:aggregate").routeId("aggregate").to(to)
+ .aggregate()
+ .constant(true)
+ .completionTimeout(1)
+
.aggregationStrategy(AggregationStrategies.groupedExchange())
+ .split().body()
+ .process(e -> {
+ KafkaManualCommit manual =
e.getMessage().getBody(Exchange.class)
+
.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+ assertNotNull(manual);
+ manual.commit();
+ });
+ from(from).routeId("bar").autoStartup(false).to(toBar);
+ }
+ };
+ }
+ }
+
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
new file mode 100644
index 0000000..c446b20
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerBatchSizeIT.class,
+ KafkaConsumerBatchSizeIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerBatchSizeIT extends BaseEmbeddedKafkaTestSupport {
+
+ public static final String TOPIC = "test-batch";
+
+ private final String from = "kafka:" + TOPIC +
"?autoOffsetReset=earliest&autoCommitEnable=false&consumersCount=1";
+
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+ }
+
+ @Test
+ public void kafkaMessagesIsConsumedByCamel() throws Exception {
+
+ // First 2 must not be committed since batch size is 3
+ to.expectedBodiesReceivedInAnyOrder("m1", "m2");
+ for (int k = 1; k <= 2; k++) {
+ String msg = "m" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ producer.send(data);
+ }
+
+ to.assertIsSatisfied();
+
+ to.reset();
+
+ to.expectedBodiesReceivedInAnyOrder("m3", "m4", "m5", "m6", "m7",
"m8", "m9", "m10");
+
+ // Restart endpoint,
+ context.getRouteController().stopRoute("foo");
+ context.getRouteController().startRoute("foo");
+
+ // Second route must wake up and consume all from scratch and commit 9
+ // consumed
+ for (int k = 3; k <= 10; k++) {
+ String msg = "m" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ producer.send(data);
+ }
+
+ to.assertIsSatisfied();
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("foo").to(to).setId("First");
+ }
+ };
+ }
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
new file mode 100644
index 0000000..ca96589
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.StreamSupport;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerFullIT.class,
+ KafkaConsumerFullIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
+ public static final String TOPIC = "test-full";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaConsumerFullIT.class);
+
+ private final String from = "kafka:" + TOPIC
+ +
"?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+ +
"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ MockConsumerInterceptor.recordsCaptured.clear();
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+ to.reset();
+ }
+
+ @Order(3)
+ @Test
+ public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+ String propagatedHeaderKey = "PropagatedCustomHeader";
+ byte[] propagatedHeaderValue = "propagated header value".getBytes();
+ String skippedHeaderKey = "CamelSkippedHeader";
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
+ // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any
+ // exchange because autoCommitEnable=true
+
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
null, null, null, null, null);
+ to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
+
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped
header value".getBytes()));
+ data.headers().add(new RecordHeader(propagatedHeaderKey,
propagatedHeaderValue));
+ producer.send(data);
+ }
+
+ to.assertIsSatisfied(3000);
+
+ assertEquals(5,
StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(),
false)
+ .count());
+
+ Map<String, Object> headers =
to.getExchanges().get(0).getIn().getHeaders();
+ assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive
skipped header");
+ assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive
propagated header");
+ }
+
+ @Order(2)
+ @Test
+ public void kafkaRecordSpecificHeadersAreNotOverwritten() throws
InterruptedException {
+ String propagatedHeaderKey = KafkaConstants.TOPIC;
+ byte[] propagatedHeaderValue = "propagated incorrect topic".getBytes();
+ to.expectedHeaderReceived(KafkaConstants.TOPIC, TOPIC);
+
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1",
"message");
+ data.headers().add(new RecordHeader(propagatedHeaderKey,
propagatedHeaderValue));
+ producer.send(data);
+
+ to.assertIsSatisfied(3000);
+
+ Map<String, Object> headers =
to.getExchanges().get(0).getIn().getHeaders();
+ assertTrue(headers.containsKey(KafkaConstants.TOPIC), "Should receive
KafkaEndpoint populated kafka.TOPIC header");
+ assertEquals(TOPIC, headers.get(KafkaConstants.TOPIC), "Topic name
received");
+ }
+
+ @Test
+ @Order(1)
+ public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws
Exception {
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ producer.send(data);
+ }
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ to.expectedMessageCount(5);
+
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
+
+ // Restart endpoint,
+ context.getRouteController().stopRoute("full-it");
+
+ KafkaEndpoint kafkaEndpoint = (KafkaEndpoint)
context.getEndpoint(from);
+ kafkaEndpoint.getConfiguration().setSeekTo("beginning");
+
+ context.getRouteController().startRoute("full-it");
+
+ // As wee set seek to beginning we should re-consume all messages
+ to.assertIsSatisfied(3000);
+ }
+
+ @Order(4)
+ @Test
+ public void kafkaMessageIsConsumedByCamelSeekedToEnd() throws Exception {
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ producer.send(data);
+ }
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ to.expectedMessageCount(0);
+
+ // Restart endpoint,
+ context.getRouteController().stopRoute("full-it");
+
+ KafkaEndpoint kafkaEndpoint = (KafkaEndpoint)
context.getEndpoint(from);
+ kafkaEndpoint.getConfiguration().setSeekTo("end");
+
+ context.getRouteController().startRoute("full-it");
+
+ to.assertIsSatisfied(3000);
+ }
+
+ @Order(5)
+ @Test
+ public void headerDeserializerCouldBeOverridden() {
+ KafkaEndpoint kafkaEndpoint
+ =
context.getEndpoint("kafka:random_topic?headerDeserializer=#myHeaderDeserializer",
KafkaEndpoint.class);
+ assertIsInstanceOf(MyKafkaHeaderDeserializer.class,
kafkaEndpoint.getConfiguration().getHeaderDeserializer());
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).process(exchange -> LOG.trace("Captured on the
processor: {}", exchange.getMessage().getBody()))
+ .routeId("full-it").to(to);
+ }
+ };
+ }
+
+ @Bean("myHeaderDeserializer")
+ public MyKafkaHeaderDeserializer createMyKafkaHeaderDeserializer(){
+ return new MyKafkaHeaderDeserializer();
+ }
+ }
+
+ private static class MyKafkaHeaderDeserializer extends
DefaultKafkaHeaderDeserializer {
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
new file mode 100644
index 0000000..31a383c
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.spring.boot.CamelContextConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerHealthCheckIT.class,
+ KafkaConsumerHealthCheckIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
+ public static final String TOPIC = "test-health";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaConsumerHealthCheckIT.class);
+
+ private final String from = "kafka:" + TOPIC
+ +
"?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+ +
"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ MockConsumerInterceptor.recordsCaptured.clear();
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+ to.reset();
+ }
+
+ @Bean("healthRegistry")
+ CamelContextConfiguration contextConfiguration() {
+ return new CamelContextConfiguration() {
+ @Override
+ public void beforeApplicationStart(CamelContext context) {
+ // install health check manually (yes a bit cumbersome)
+ HealthCheckRegistry registry = new
DefaultHealthCheckRegistry();
+ registry.setCamelContext(context);
+ Object hc = registry.resolveById("context");
+ registry.register(hc);
+ hc = registry.resolveById("routes");
+ registry.register(hc);
+ hc = registry.resolveById("consumers");
+ registry.register(hc);
+ context.setExtension(HealthCheckRegistry.class, registry);
+ }
+
+ @Override
+ public void afterApplicationStart(CamelContext camelContext) {
+ //do nothing here
+ }
+ };
+ }
+
+ @Order(1)
+ @Test
+ public void kafkaConsumerHealthCheck() throws InterruptedException {
+ // health-check liveness should be UP
+ Collection<HealthCheck.Result> res =
HealthCheckHelper.invokeLiveness(context);
+ boolean up = res.stream().allMatch(r ->
r.getState().equals(HealthCheck.State.UP));
+ Assertions.assertTrue(up, "liveness check");
+
+ // health-check readiness should be ready
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ Collection<HealthCheck.Result> res2 =
HealthCheckHelper.invokeReadiness(context);
+ boolean up2 = res2.stream().allMatch(r ->
r.getState().equals(HealthCheck.State.UP));
+ Assertions.assertTrue(up2, "readiness check");
+ });
+
+ String propagatedHeaderKey = "PropagatedCustomHeader";
+ byte[] propagatedHeaderValue = "propagated header value".getBytes();
+ String skippedHeaderKey = "CamelSkippedHeader";
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
+ // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any
+ // exchange because autoCommitEnable=true
+
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
null, null, null, null, null);
+ to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
+
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped
header value".getBytes()));
+ data.headers().add(new RecordHeader(propagatedHeaderKey,
propagatedHeaderValue));
+ producer.send(data);
+ }
+
+ to.assertIsSatisfied(3000);
+
+ assertEquals(5,
StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(),
false)
+ .count());
+
+ Map<String, Object> headers =
to.getExchanges().get(0).getIn().getHeaders();
+ assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive
skipped header");
+ assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive
propagated header");
+
+ // and shutdown kafka which will make readiness report as DOWN
+ service.shutdown();
+
+ // health-check liveness should be UP
+ res = HealthCheckHelper.invokeLiveness(context);
+ up = res.stream().allMatch(r ->
r.getState().equals(HealthCheck.State.UP));
+ Assertions.assertTrue(up, "liveness check");
+ // but health-check readiness should NOT be ready
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ Collection<HealthCheck.Result> res2 =
HealthCheckHelper.invoke(context);
+ Optional<HealthCheck.Result> down
+ = res2.stream().filter(r ->
r.getState().equals(HealthCheck.State.DOWN)).findFirst();
+ Assertions.assertTrue(down.isPresent());
+ String msg = down.get().getMessage().get();
+ Assertions.assertEquals("KafkaConsumer is not ready (recovery in
progress using 5s intervals).", msg);
+ Map<String, Object> map = down.get().getDetails();
+ Assertions.assertEquals(TOPIC, map.get("topic"));
+ Assertions.assertEquals("test-health-it", map.get("route.id"));
+ });
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).process(exchange -> LOG.trace("Captured on the
processor: {}", exchange.getMessage().getBody()))
+ .routeId("test-health-it").to(to);
+ }
+ };
+ }
+
+ @Bean("myHeaderDeserializer")
+ public MyKafkaHeaderDeserializer createMyKafkaHeaderDeserializer(){
+ return new MyKafkaHeaderDeserializer();
+ }
+ }
+
+ private static class MyKafkaHeaderDeserializer extends
DefaultKafkaHeaderDeserializer {
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
new file mode 100644
index 0000000..5e20875
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Arrays;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static
org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHeader;
+
+@EnabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests",
matches = "true")
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerIdempotentIT.class,
+ KafkaConsumerIdempotentIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerIdempotentIT extends
KafkaConsumerIdempotentTestSupport {
+
+ public static final String TOPIC = "idempt";
+
+ private final String from = "kafka:" + TOPIC
+ + "?groupId=group2&autoOffsetReset=earliest"
+ +
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+ +
"&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ private int size = 200;
+
+ @BeforeEach
+ public void before() {
+ kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
+ doSend(size, TOPIC);
+ }
+
+ @AfterEach
+ public void after() {
+ kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
+ }
+
+ @Test
+ @DisplayName("Numeric headers is consumable when using idempotent
(CAMEL-16914)")
+ public void kafkaIdempotentMessageIsConsumedByCamel() throws
InterruptedException {
+ doRun(to, size);
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("foo")
+ .idempotentConsumer(numericHeader("id"))
+ .idempotentRepository("kafkaIdempotentRepository")
+ .to(to);
+ }
+ };
+ }
+
+ @Bean("kafkaIdempotentRepository")
+ public KafkaIdempotentRepository createKafkaIdempotentRepository(){
+ return new KafkaIdempotentRepository("TEST_IDEMPOTENT",
getBootstrapServers());
+ }
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
new file mode 100644
index 0000000..dc56323
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.integration;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public abstract class KafkaConsumerIdempotentTestSupport extends
BaseEmbeddedKafkaTestSupport {
+
+ protected void doSend(int size, String topic) {
+ Properties props = getDefaultProperties();
+ org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer
+ = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+
+ try {
+ for (int k = 0; k < size; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new
ProducerRecord<>(topic, String.valueOf(k), msg);
+
+ data.headers().add(new RecordHeader("id",
BigInteger.valueOf(k).toByteArray()));
+ producer.send(data);
+ }
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+ }
+
+ protected void doRun(MockEndpoint mockEndpoint, int size) throws
InterruptedException {
+ mockEndpoint.expectedMessageCount(size);
+
+ List<Exchange> exchangeList = mockEndpoint.getReceivedExchanges();
+
+ mockEndpoint.assertIsSatisfied(10000);
+
+ assertEquals(size, exchangeList.size());
+
+ Map<String, Object> headers =
mockEndpoint.getExchanges().get(0).getIn().getHeaders();
+ assertTrue(headers.containsKey("id"), "0");
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
new file mode 100644
index 0000000..c9a96b1
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Arrays;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerIdempotentWithCustomSerializerIT.class,
+
KafkaConsumerIdempotentWithCustomSerializerIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerIdempotentWithCustomSerializerIT extends
KafkaConsumerIdempotentTestSupport {
+
+ public static final String TOPIC = "idempt2";
+
+ private final String from = "kafka:" + TOPIC
+ + "?groupId=group2&autoOffsetReset=earliest"
+ +
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&headerDeserializer=#class:org.apache.camel.component.kafka.integration.CustomHeaderDeserializer"
+ +
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+ +
"&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ private int size = 200;
+
+ @BeforeEach
+ public void before() {
+ kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
+ doSend(size, TOPIC);
+ }
+
+ @AfterEach
+ public void after() {
+ kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
+ }
+
+ @Test
+ public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+ doRun(to, size);
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("foo")
+ .idempotentConsumer(header("id"))
+ .idempotentRepository("kafkaIdempotentRepository")
+ .to(to);
+ }
+ };
+ }
+
+ @Bean("kafkaIdempotentRepository")
+ public KafkaIdempotentRepository createKafkaIdempotentRepository(){
+ return new KafkaIdempotentRepository("TEST_IDEMPOTENT",
getBootstrapServers());
+ }
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
new file mode 100644
index 0000000..d41756b
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerIdempotentWithProcessorIT.class,
+ KafkaConsumerIdempotentWithProcessorIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerIdempotentWithProcessorIT extends
KafkaConsumerIdempotentTestSupport {
+ public static final String TOPIC = "testidemp3";
+
+ private final String from = "kafka:" + TOPIC
+ + "?groupId=group2&autoOffsetReset=earliest"
+ +
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+ +
"&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+ @EndpointInject("mock:resulti")
+ private MockEndpoint to;
+
+ private int size = 200;
+
+ @BeforeEach
+ public void before() {
+ kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
+ doSend(size, TOPIC);
+ }
+
+ @AfterEach
+ public void after() {
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
+ }
+
+ @Test
+ public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+ doRun(to, size);
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("idemp-with-prop")
+ .process(exchange -> {
+ byte[] id = exchange.getIn().getHeader("id",
byte[].class);
+
+ BigInteger bi = new BigInteger(id);
+
+ exchange.getIn().setHeader("id",
String.valueOf(bi.longValue()));
+ })
+ .idempotentConsumer(header("id"))
+ .idempotentRepository("kafkaIdempotentRepository")
+ .to(to);
+ }
+ };
+ }
+
+ @Bean("kafkaIdempotentRepository")
+ public KafkaIdempotentRepository createKafkaIdempotentRepository(){
+ return new KafkaIdempotentRepository("TEST_IDEMPOTENT",
getBootstrapServers());
+ }
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java
new file mode 100644
index 0000000..2d97cc9
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerLastRecordHeaderIT.class,
+ KafkaConsumerLastRecordHeaderIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerLastRecordHeaderIT extends
BaseEmbeddedKafkaTestSupport {
+ private static final String TOPIC = "last-record";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+ }
+
+ /**
+ * When consuming data with autoCommitEnable=false Then the
LAST_RECORD_BEFORE_COMMIT header must be always defined
+ * And it should be true only for the last one
+ */
+ @Test
+ public void shouldStartFromBeginningWithEmptyOffsetRepository() throws
InterruptedException {
+ result.expectedMessageCount(5);
+ result.expectedBodiesReceived("message-0", "message-1", "message-2",
"message-3", "message-4");
+
+ for (int i = 0; i < 5; i++) {
+ producer.send(new ProducerRecord<>(TOPIC, "1", "message-" + i));
+ }
+
+ result.assertIsSatisfied(3000);
+
+ List<Exchange> exchanges = result.getExchanges();
+ for (int i = 0; i < exchanges.size(); i++) {
+ Boolean header =
exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
Boolean.class);
+ assertNotNull(header, "Header not set for #" + i);
+ assertEquals(header, i == exchanges.size() - 1, "Header invalid
for #" + i);
+ // as long as the partitions count is 1 on topic:
+ header =
exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_POLL_RECORD,
Boolean.class);
+ assertNotNull(header, "Last record header not set for #" + i);
+ assertEquals(header, i == exchanges.size() - 1, "Last record
header invalid for #" + i);
+ }
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("kafka:" + TOPIC +
"?groupId=A&autoOffsetReset=earliest&autoCommitEnable=false").to("mock:result");
+ }
+ };
+ }
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
new file mode 100644
index 0000000..9730286
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerManualCommitIT.class,
+ KafkaConsumerManualCommitIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport {
+
+ public static final String TOPIC = "testManualCommitTest";
+
+ private final String from = "kafka:" + TOPIC
+ + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+ + "&allowManualCommit=true&autoOffsetReset=earliest";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ @EndpointInject("mock:resultBar")
+ private MockEndpoint toBar;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+ }
+
+ @RepeatedTest(4)
+ public void kafkaAutoCommitDisabledDuringRebalance() throws Exception {
+ to.expectedMessageCount(1);
+ String firstMessage = "message-0";
+ to.expectedBodiesReceivedInAnyOrder(firstMessage);
+
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1",
firstMessage);
+ producer.send(data);
+
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ context.getRouteController().stopRoute("foo");
+ to.expectedMessageCount(0);
+
+ String secondMessage = "message-1";
+ data = new ProducerRecord<>(TOPIC, "1", secondMessage);
+ producer.send(data);
+
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ // start a new route in order to rebalance kafka
+ context.getRouteController().startRoute("bar");
+ toBar.expectedMessageCount(1);
+
+ toBar.assertIsSatisfied();
+
+ context.getRouteController().stopRoute("bar");
+
+ // The route bar is not committing the offset, so by restarting foo,
last 3 items will be processed
+ context.getRouteController().startRoute("foo");
+ to.expectedMessageCount(1);
+ to.expectedBodiesReceivedInAnyOrder("message-1");
+
+ to.assertIsSatisfied(3000);
+ }
+
+ @RepeatedTest(4)
+ public void kafkaManualCommit() throws Exception {
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
+ // The LAST_RECORD_BEFORE_COMMIT header should include a value as we
use
+ // manual commit
+
to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
+
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ producer.send(data);
+ }
+
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ // Second step: We shut down our route, we expect nothing will be
recovered by our route
+ context.getRouteController().stopRoute("foo");
+ to.expectedMessageCount(0);
+
+ // Third step: While our route is stopped, we send 3 records more to
Kafka test topic
+ for (int k = 5; k < 8; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ producer.send(data);
+ }
+
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ // Fourth step: We start again our route, since we have been
committing the offsets from the first step,
+ // we will expect to consume from the latest committed offset e.g from
offset 5
+ context.getRouteController().startRoute("foo");
+ to.expectedMessageCount(3);
+ to.expectedBodiesReceivedInAnyOrder("message-5", "message-6",
"message-7");
+
+ to.assertIsSatisfied(3000);
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("foo").to(to).process(e -> {
+ KafkaManualCommit manual =
e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+ assertNotNull(manual);
+ manual.commit();
+ });
+ from(from).routeId("bar").autoStartup(false).to(toBar);
+ }
+ };
+ }
+ }
+
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerRebalanceIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerRebalanceIT.java
new file mode 100644
index 0000000..5437132
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerRebalanceIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.StateRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerRebalanceIT.class,
+ OffsetStateRepositoryConfiguration.class,
+ KafkaConsumerRebalanceIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerRebalanceIT extends BaseEmbeddedKafkaTestSupport {
+ private static final String TOPIC = "offset-rebalance";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Autowired
+ private CountDownLatch messagesLatch;
+
+ @Test
+ public void offsetGetStateMustHaveBeenCalledTwice() throws Exception {
+ boolean offsetGetStateCalled = messagesLatch.await(30000,
TimeUnit.MILLISECONDS);
+ // The getState should most likely be called during the partition
assignment
+ assertTrue(offsetGetStateCalled, "StateRepository.getState should have
been called for topic " + TOPIC
+ + ". Remaining count : " +
messagesLatch.getCount());
+ }
+
+ @AfterEach
+ public void after() {
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("kafka:" + TOPIC + "?groupId=" + TOPIC + "_GROUP" +
"&autoCommitIntervalMs=1000"
+ + "&autoOffsetReset=latest" + "&consumersCount=1"
+ +
"&offsetRepository=#offset").routeId("consumer-rebalance-route").to("mock:result");
+ }
+ };
+ }
+ }
+
+ public static class OffsetStateRepository implements
StateRepository<String, String> {
+ private static final Logger LOG =
LoggerFactory.getLogger(OffsetStateRepository.class);
+ CountDownLatch messagesLatch;
+
+ public OffsetStateRepository(CountDownLatch messagesLatch) {
+ this.messagesLatch = messagesLatch;
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public String getState(String key) {
+ LOG.debug("Getting the state for {} from topic {}", key, TOPIC);
+ if (key.contains(TOPIC)) {
+ LOG.trace("Topic matches, counting down");
+ messagesLatch.countDown();
+ }
+
+ return "-1";
+ }
+
+ @Override
+ public void setState(String key, String value) {
+ }
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
new file mode 100644
index 0000000..9285ffa
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.stream.StreamSupport;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerTopicIsPatternIT.class,
+ KafkaConsumerTopicIsPatternIT.TestConfiguration.class,
+ }
+)
+public class KafkaConsumerTopicIsPatternIT extends
BaseEmbeddedKafkaTestSupport {
+
+ public static final String TOPIC = "vess123d";
+ public static final String TOPIC_PATTERN = "v.*d";
+
+ private final String from = "kafka:" + TOPIC_PATTERN +
"?topicIsPattern=true&groupId=group1&autoOffsetReset=earliest"
+ +
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ MockConsumerInterceptor.recordsCaptured.clear();
+
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+ }
+
+ @Test
+ public void kafkaTopicIsPattern() throws Exception {
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
+ to.allMessages().header(KafkaConstants.TOPIC).isEqualTo(TOPIC);
+ // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any
+ // exchange because autoCommitEnable=true
+
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
null, null, null, null, null);
+
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ producer.send(data);
+ }
+
+ to.assertIsSatisfied(3000);
+
+ assertEquals(5,
StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(),
false)
+ .count());
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).to(to);
+ }
+ };
+ }
+ }
+
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
new file mode 100644
index 0000000..05f9d7d
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Offset;
+import org.apache.camel.Resumable;
+import org.apache.camel.Service;
+import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.builder.RouteBuilder;
+import
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
+import org.apache.camel.component.kafka.consumer.support.KafkaResumable;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.resume.Resumables;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaConsumerWithResumeRouteStrategyIT.class,
+ KafkaConsumerWithResumeRouteStrategyIT.TestConfiguration.class,
+ ResumeStrategyConfiguration.class,
+ }
+)
+public class KafkaConsumerWithResumeRouteStrategyIT extends
BaseEmbeddedKafkaTestSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaConsumerWithResumeRouteStrategyIT.class);
+ private static final String TOPIC = "resumable-route-tp";
+ private static final int RANDOM_VALUE =
ThreadLocalRandom.current().nextInt(1, 1000);
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Autowired
+ @Qualifier("resumeStrategy")
+ private TestKafkaConsumerResumeStrategy resumeStrategy;
+
+ @Autowired
+ private CountDownLatch messagesLatch;
+ private KafkaProducer<Object, Object> producer;
+
+ static class TestKafkaConsumerResumeStrategy
+ implements KafkaConsumerResumeStrategy,
+ UpdatableConsumerResumeStrategy<String, Integer, Resumable<String,
Integer>>, Service {
+ private final CountDownLatch messagesLatch;
+ private boolean resumeCalled;
+ private boolean consumerIsNull = true;
+ private boolean startCalled;
+ private boolean offsetNull = true;
+ private boolean offsetAddressableNull = true;
+ private boolean offsetAddressableEmpty = true;
+ private boolean offsetValueNull = true;
+ private boolean offsetValueEmpty = true;
+ private int lastOffset;
+
+ public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) {
+ this.messagesLatch = messagesLatch;
+ }
+
+ @Override
+ public void setConsumer(Consumer<?, ?> consumer) {
+ if (consumer != null) {
+ consumerIsNull = false;
+ }
+ }
+
+ @Override
+ public void resume(KafkaResumable resumable) {
+ resumeCalled = true;
+
+ }
+
+ @Override
+ public void resume() {
+ resumeCalled = true;
+ }
+
+ public boolean isResumeCalled() {
+ return resumeCalled;
+ }
+
+ public boolean isConsumerIsNull() {
+ return consumerIsNull;
+ }
+
+ @Override
+ public void start() {
+ LOG.warn("Start was called");
+ startCalled = true;
+ }
+
+ @Override
+ public void init() {
+ LOG.warn("Init was called");
+ }
+
+ public boolean isStartCalled() {
+ return startCalled;
+ }
+
+ @Override
+ public void updateLastOffset(Resumable<String, Integer> offset) {
+ try {
+ if (offset != null) {
+ offsetNull = false;
+
+ String addressable = offset.getAddressable();
+ if (addressable != null) {
+ offsetAddressableNull = false;
+ offsetAddressableEmpty = addressable.isEmpty() ||
addressable.isBlank();
+
+ }
+
+ Offset<Integer> offsetValue = offset.getLastOffset();
+ if (offsetValue != null) {
+ offsetValueNull = false;
+
+ if (offsetValue.offset() != null) {
+ offsetValueEmpty = false;
+ lastOffset = offsetValue.offset();
+ }
+ }
+ }
+ } finally {
+ messagesLatch.countDown();
+ }
+ }
+
+ public boolean isOffsetNull() {
+ return offsetNull;
+ }
+
+ public boolean isOffsetAddressableNull() {
+ return offsetAddressableNull;
+ }
+
+ public boolean isOffsetValueNull() {
+ return offsetValueNull;
+ }
+
+ public boolean isOffsetAddressableEmpty() {
+ return offsetAddressableEmpty;
+ }
+
+ public boolean isOffsetValueEmpty() {
+ return offsetValueEmpty;
+ }
+ }
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+
+ for (int i = 0; i < 10; i++) {
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i)));
+ }
+ }
+
+ @Test
+ // @Timeout(value = 30)
+ public void testOffsetIsBeingChecked() throws InterruptedException {
+ assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was
not called");
+
+ assertTrue(resumeStrategy.isResumeCalled(),
+ "The resume strategy should have been called when the
partition was assigned");
+ assertFalse(resumeStrategy.isConsumerIsNull(),
+ "The consumer passed to the strategy should not be null");
+ assertTrue(resumeStrategy.isStartCalled(),
+ "The resume strategy should have been started");
+ assertFalse(resumeStrategy.isOffsetNull(),
+ "The offset should not be null");
+ assertFalse(resumeStrategy.isOffsetAddressableNull(),
+ "The offset addressable should not be null");
+ assertFalse(resumeStrategy.isOffsetAddressableEmpty(),
+ "The offset addressable should not be empty");
+ assertFalse(resumeStrategy.isOffsetValueNull(),
+ "The offset value should not be null");
+ assertFalse(resumeStrategy.isOffsetValueEmpty(),
+ "The offset value should not be empty");
+ assertEquals(RANDOM_VALUE, resumeStrategy.lastOffset, "the offsets
don't match");
+ }
+
+ @AfterEach
+ public void after() {
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("kafka:" + TOPIC + "?groupId=" + TOPIC +
"_GROUP&autoCommitIntervalMs=1000"
+ + "&autoOffsetReset=earliest&consumersCount=1")
+ .routeId("resume-strategy-route")
+ .setHeader(Exchange.OFFSET,
+ constant(Resumables.of("key",
RANDOM_VALUE)))
+ .resumable().resumeStrategy("resumeStrategy")
+ .to("mock:result");
+ }
+ };
+ }
+
+
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
new file mode 100644
index 0000000..43803d9
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerFullIT.java
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaProducerFullIT.class,
+ KafkaProducerFullIT.TestConfiguration.class,
+ }
+)
+public class KafkaProducerFullIT extends BaseEmbeddedKafkaTestSupport {
+
+ private static final String TOPIC_STRINGS = "test";
+ private static final String TOPIC_INTERCEPTED = "test";
+ private static final String TOPIC_STRINGS_IN_HEADER = "testHeader";
+ private static final String TOPIC_BYTES = "testBytes";
+ private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader";
+ private static final String GROUP_BYTES = "groupStrings";
+ private static final String TOPIC_PROPAGATED_HEADERS =
"testPropagatedHeaders";
+ private static final String TOPIC_NO_RECORD_SPECIFIC_HEADERS =
"noRecordSpecificHeaders";
+
+ private static KafkaConsumer<String, String> stringsConsumerConn;
+ private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
+
+ private final String toStrings = "kafka:" + TOPIC_STRINGS +
"?requestRequiredAcks=-1";
+
+ private final String toStrings2 = "kafka:" + TOPIC_STRINGS +
"?requestRequiredAcks=-1&partitionKey=0";
+
+ private final String toStringsWithInterceptor = "kafka:" +
TOPIC_INTERCEPTED + "?requestRequiredAcks=-1"
+ +
"&interceptorClasses=org.apache.camel.component.kafka.integration.MockProducerInterceptor";
+
+ @EndpointInject("mock:kafkaAck")
+ private MockEndpoint mockEndpoint;
+
+ private final String toBytes = "kafka:" + TOPIC_BYTES +
"?requestRequiredAcks=-1"
+ +
"&valueSerializer=org.apache.kafka.common.serialization.ByteArraySerializer&"
+ +
"keySerializer=org.apache.kafka.common.serialization.ByteArraySerializer";
+
+ private final String toPropagatedHeaders = "kafka:" +
TOPIC_PROPAGATED_HEADERS + "?requestRequiredAcks=-1";
+
+ private final String toNoRecordSpecificHeaders = "kafka:" +
TOPIC_NO_RECORD_SPECIFIC_HEADERS + "?requestRequiredAcks=-1";
+
+ @Produce("direct:startStrings")
+ private ProducerTemplate stringsTemplate;
+
+ @Produce("direct:startStrings2")
+ private ProducerTemplate stringsTemplate2;
+
+ @Produce("direct:startBytes")
+ private ProducerTemplate bytesTemplate;
+
+ @Produce("direct:startTraced")
+ private ProducerTemplate interceptedTemplate;
+
+ @Produce("direct:propagatedHeaders")
+ private ProducerTemplate propagatedHeadersTemplate;
+
+ @Produce("direct:noRecordSpecificHeaders")
+ private ProducerTemplate noRecordSpecificHeadersTemplate;
+
+ @BeforeAll
+ public static void before() {
+ stringsConsumerConn = createStringKafkaConsumer("DemoConsumer");
+ bytesConsumerConn = createByteKafkaConsumer(GROUP_BYTES);
+ }
+
+ @AfterAll
+ public void after() {
+ // clean all test topics
+ final List<String> topics = new ArrayList<>();
+ topics.add(TOPIC_BYTES);
+ topics.add(TOPIC_INTERCEPTED);
+ topics.add(TOPIC_PROPAGATED_HEADERS);
+ topics.add(TOPIC_STRINGS);
+
+ kafkaAdminClient.deleteTopics(topics);
+ mockEndpoint.reset();
+ }
+
+ @Test
+ public void producedStringMessageIsReceivedByKafka() throws
InterruptedException {
+ int messageInTopic = 10;
+ int messageInOtherTopic = 5;
+
+ CountDownLatch messagesLatch = new CountDownLatch(messageInTopic +
messageInOtherTopic);
+
+ sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test
message", KafkaConstants.PARTITION_KEY, "0");
+ sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test
message in other topic",
+ KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
+ TOPIC_STRINGS_IN_HEADER);
+
+ createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS,
TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+ boolean allMessagesReceived = messagesLatch.await(200,
TimeUnit.MILLISECONDS);
+
+ assertTrue(allMessagesReceived,
+ "Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount());
+
+ List<Exchange> exchangeList = mockEndpoint.getExchanges();
+ assertEquals(15, exchangeList.size(), "Fifteen Exchanges are
expected");
+ for (Exchange exchange : exchangeList) {
+ @SuppressWarnings("unchecked")
+ List<RecordMetadata> recordMetaData1
+ = (List<RecordMetadata>)
(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertEquals(1, recordMetaData1.size(), "One RecordMetadata is
expected.");
+ assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is
positive");
+ assertTrue(recordMetaData1.get(0).topic().startsWith("test"),
"Topic Name start with 'test'");
+ }
+ }
+
+ @Test
+ public void producedString2MessageIsReceivedByKafka() throws
InterruptedException {
+ int messageInTopic = 10;
+ int messageInOtherTopic = 5;
+
+ CountDownLatch messagesLatch = new CountDownLatch(messageInTopic +
messageInOtherTopic);
+
+ sendMessagesInRoute(messageInTopic, stringsTemplate2, "IT test
message", (String[]) null);
+ sendMessagesInRoute(messageInOtherTopic, stringsTemplate2, "IT test
message in other topic",
+ KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
+ TOPIC_STRINGS_IN_HEADER);
+
+ createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS,
TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+ boolean allMessagesReceived = messagesLatch.await(200,
TimeUnit.MILLISECONDS);
+
+ assertTrue(allMessagesReceived,
+ "Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount());
+
+ List<Exchange> exchangeList = mockEndpoint.getExchanges();
+ assertEquals(15, exchangeList.size(), "Fifteen Exchanges are
expected");
+ for (Exchange exchange : exchangeList) {
+ @SuppressWarnings("unchecked")
+ List<RecordMetadata> recordMetaData1
+ = (List<RecordMetadata>)
(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertEquals(1, recordMetaData1.size(), "One RecordMetadata is
expected.");
+ assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is
positive");
+ assertTrue(recordMetaData1.get(0).topic().startsWith("test"),
"Topic Name start with 'test'");
+ }
+ }
+
+ @Test
+ public void producedStringMessageIsIntercepted() throws
InterruptedException {
+ int messageInTopic = 10;
+ int messageInOtherTopic = 5;
+
+ CountDownLatch messagesLatch = new CountDownLatch(messageInTopic +
messageInOtherTopic);
+
+ sendMessagesInRoute(messageInTopic, interceptedTemplate, "IT test
message", KafkaConstants.PARTITION_KEY, "0");
+ sendMessagesInRoute(messageInOtherTopic, interceptedTemplate, "IT test
message in other topic",
+ KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
+ TOPIC_STRINGS_IN_HEADER);
+ createKafkaMessageConsumer(stringsConsumerConn, TOPIC_INTERCEPTED,
TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+ boolean allMessagesReceived = messagesLatch.await(200,
TimeUnit.MILLISECONDS);
+
+ assertTrue(allMessagesReceived,
+ "Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount());
+
+ Assertions.assertEquals(messageInTopic + messageInOtherTopic,
MockProducerInterceptor.recordsCaptured.size());
+ }
+
+ @Test
+ public void producedStringCollectionMessageIsReceivedByKafka() throws
InterruptedException {
+ int messageInTopic = 10;
+ int messageInOtherTopic = 5;
+
+ CountDownLatch messagesLatch = new CountDownLatch(messageInTopic +
messageInOtherTopic);
+
+ List<String> msgs = new ArrayList<>();
+ for (int x = 0; x < messageInTopic; x++) {
+ msgs.add("Message " + x);
+ }
+
+ sendMessagesInRoute(1, stringsTemplate, msgs,
KafkaConstants.PARTITION_KEY, "0");
+ msgs = new ArrayList<>();
+ for (int x = 0; x < messageInOtherTopic; x++) {
+ msgs.add("Other Message " + x);
+ }
+ sendMessagesInRoute(1, stringsTemplate, msgs,
KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
+ TOPIC_STRINGS_IN_HEADER);
+
+ createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS,
TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+ boolean allMessagesReceived = messagesLatch.await(200,
TimeUnit.MILLISECONDS);
+
+ assertTrue(allMessagesReceived,
+ "Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount());
+ List<Exchange> exchangeList = mockEndpoint.getExchanges();
+ assertEquals(2, exchangeList.size(), "Two Exchanges are expected");
+ Exchange e1 = exchangeList.get(0);
+ @SuppressWarnings("unchecked")
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>)
(e1.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertEquals(10, recordMetaData1.size(), "Ten RecordMetadata is
expected.");
+ for (RecordMetadata recordMeta : recordMetaData1) {
+ assertTrue(recordMeta.offset() >= 0, "Offset is positive");
+ assertTrue(recordMeta.topic().startsWith("test"), "Topic Name
start with 'test'");
+ }
+ Exchange e2 = exchangeList.get(1);
+ @SuppressWarnings("unchecked")
+ List<RecordMetadata> recordMetaData2 = (List<RecordMetadata>)
(e2.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertEquals(5, recordMetaData2.size(), "Five RecordMetadata is
expected.");
+ for (RecordMetadata recordMeta : recordMetaData2) {
+ assertTrue(recordMeta.offset() >= 0, "Offset is positive");
+ assertTrue(recordMeta.topic().startsWith("test"), "Topic Name
start with 'test'");
+ }
+ }
+
+ @Test
+ public void producedBytesMessageIsReceivedByKafka() throws
InterruptedException {
+ int messageInTopic = 10;
+ int messageInOtherTopic = 5;
+
+ CountDownLatch messagesLatch = new CountDownLatch(messageInTopic +
messageInOtherTopic);
+
+ Map<String, Object> inTopicHeaders = new HashMap<>();
+ inTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
+ sendMessagesInRoute(messageInTopic, bytesTemplate, "IT test
message".getBytes(), inTopicHeaders);
+
+ Map<String, Object> otherTopicHeaders = new HashMap<>();
+ otherTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
+ otherTopicHeaders.put(KafkaConstants.TOPIC, TOPIC_BYTES_IN_HEADER);
+ sendMessagesInRoute(messageInOtherTopic, bytesTemplate, "IT test
message in other topic".getBytes(), otherTopicHeaders);
+
+ createKafkaBytesMessageConsumer(bytesConsumerConn, TOPIC_BYTES,
TOPIC_BYTES_IN_HEADER, messagesLatch);
+
+ boolean allMessagesReceived = messagesLatch.await(200,
TimeUnit.MILLISECONDS);
+
+ assertTrue(allMessagesReceived,
+ "Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount());
+
+ List<Exchange> exchangeList = mockEndpoint.getExchanges();
+ assertEquals(15, exchangeList.size(), "Fifteen Exchanges are
expected");
+ for (Exchange exchange : exchangeList) {
+ @SuppressWarnings("unchecked")
+ List<RecordMetadata> recordMetaData1
+ = (List<RecordMetadata>)
(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertEquals(1, recordMetaData1.size(), "One RecordMetadata is
expected.");
+ assertTrue(recordMetaData1.get(0).offset() >= 0, "Offset is
positive");
+ assertTrue(recordMetaData1.get(0).topic().startsWith("test"),
"Topic Name start with 'test'");
+ }
+ }
+
+ @Test
+ public void propagatedHeaderIsReceivedByKafka() throws Exception {
+ String propagatedStringHeaderKey = "PROPAGATED_STRING_HEADER";
+ String propagatedStringHeaderValue = "propagated string header value";
+
+ String propagatedIntegerHeaderKey = "PROPAGATED_INTEGER_HEADER";
+ Integer propagatedIntegerHeaderValue = 54545;
+
+ String propagatedLongHeaderKey = "PROPAGATED_LONG_HEADER";
+ Long propagatedLongHeaderValue = 5454545454545L;
+
+ String propagatedDoubleHeaderKey = "PROPAGATED_DOUBLE_HEADER";
+ Double propagatedDoubleHeaderValue = 43434.545D;
+
+ String propagatedBytesHeaderKey = "PROPAGATED_BYTES_HEADER";
+ byte[] propagatedBytesHeaderValue = new byte[] { 121, 34, 34, 54, 5,
3, 54, -34 };
+
+ String propagatedBooleanHeaderKey = "PROPAGATED_BOOLEAN_HEADER";
+ Boolean propagatedBooleanHeaderValue = Boolean.TRUE;
+
+ Map<String, Object> camelHeaders = new HashMap<>();
+ camelHeaders.put(propagatedStringHeaderKey,
propagatedStringHeaderValue);
+ camelHeaders.put(propagatedIntegerHeaderKey,
propagatedIntegerHeaderValue);
+ camelHeaders.put(propagatedLongHeaderKey, propagatedLongHeaderValue);
+ camelHeaders.put(propagatedDoubleHeaderKey,
propagatedDoubleHeaderValue);
+ camelHeaders.put(propagatedBytesHeaderKey, propagatedBytesHeaderValue);
+ camelHeaders.put(propagatedBooleanHeaderKey,
propagatedBooleanHeaderValue);
+
+ camelHeaders.put("CustomObjectHeader", new Object());
+ camelHeaders.put("CustomNullObjectHeader", null);
+ camelHeaders.put("CamelFilteredHeader", "CamelFilteredHeader value");
+
+ CountDownLatch messagesLatch = new CountDownLatch(1);
+ propagatedHeadersTemplate.sendBodyAndHeaders("Some test message",
camelHeaders);
+
+ List<ConsumerRecord<String, String>> records =
pollForRecords(createStringKafkaConsumer("propagatedHeaderConsumer"),
+ TOPIC_PROPAGATED_HEADERS, messagesLatch);
+ boolean allMessagesReceived = messagesLatch.await(10_000,
TimeUnit.MILLISECONDS);
+
+ assertTrue(allMessagesReceived,
+ "Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount());
+
+ ConsumerRecord<String, String> record = records.get(0);
+ Headers headers = record.headers();
+ assertNotNull(headers, "Kafka Headers should not be null.");
+ // we have 6 headers
+ assertEquals(6, headers.toArray().length, "6 propagated header is
expected.");
+ assertEquals(propagatedStringHeaderValue, new
String(getHeaderValue(propagatedStringHeaderKey, headers)),
+ "Propagated string value received");
+ assertEquals(propagatedIntegerHeaderValue,
+
Integer.valueOf(ByteBuffer.wrap(getHeaderValue(propagatedIntegerHeaderKey,
headers)).getInt()),
+ "Propagated integer value received");
+ assertEquals(propagatedLongHeaderValue,
+
Long.valueOf(ByteBuffer.wrap(getHeaderValue(propagatedLongHeaderKey,
headers)).getLong()),
+ "Propagated long value received");
+ assertEquals(propagatedDoubleHeaderValue,
+
Double.valueOf(ByteBuffer.wrap(getHeaderValue(propagatedDoubleHeaderKey,
headers)).getDouble()),
+ "Propagated double value received");
+ assertArrayEquals(propagatedBytesHeaderValue,
getHeaderValue(propagatedBytesHeaderKey, headers),
+ "Propagated byte array value received");
+ assertEquals(propagatedBooleanHeaderValue,
+ Boolean.valueOf(new
String(getHeaderValue(propagatedBooleanHeaderKey, headers))),
+ "Propagated boolean value received");
+ }
+
+ @Test
+ public void recordSpecificHeaderIsNotReceivedByKafka() throws Exception {
+ String propagatedStringHeaderKey = KafkaConstants.TOPIC;
+ String propagatedStringHeaderValue = "source topic";
+
+ Map<String, Object> camelHeaders = new HashMap<>();
+ camelHeaders.put(propagatedStringHeaderKey,
propagatedStringHeaderValue);
+
+ CountDownLatch messagesLatch = new CountDownLatch(1);
+ noRecordSpecificHeadersTemplate.sendBodyAndHeaders("Some test
message", camelHeaders);
+
+ List<ConsumerRecord<String, String>> records = pollForRecords(
+ createStringKafkaConsumer("noRecordSpecificHeadersConsumer"),
TOPIC_NO_RECORD_SPECIFIC_HEADERS, messagesLatch);
+ boolean allMessagesReceived = messagesLatch.await(10_000,
TimeUnit.MILLISECONDS);
+
+ assertTrue(allMessagesReceived,
+ "Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount());
+
+ ConsumerRecord<String, String> record = records.get(0);
+ Headers headers = record.headers();
+ assertNotNull(headers, "Kafka Headers should not be null.");
+ // we have 0 headers
+ assertEquals(0, headers.toArray().length, "0 propagated headers are
expected");
+ }
+
+ @Test
+ public void headerFilterStrategyCouldBeOverridden() {
+ KafkaEndpoint kafkaEndpoint
+ =
context.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy",
KafkaEndpoint.class);
+ assertIsInstanceOf(MyHeaderFilterStrategy.class,
kafkaEndpoint.getConfiguration().getHeaderFilterStrategy());
+ }
+
+ @Test
+ public void headerSerializerCouldBeOverridden() {
+ KafkaEndpoint kafkaEndpoint = context
+
.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerSerializer=#myHeaderSerializer",
KafkaEndpoint.class);
+ assertIsInstanceOf(MyKafkaHeadersSerializer.class,
kafkaEndpoint.getConfiguration().getHeaderSerializer());
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:startStrings").to(toStrings).to(mockEndpoint);
+
+
from("direct:startStrings2").to(toStrings2).to(mockEndpoint);
+
+ from("direct:startBytes").to(toBytes).to(mockEndpoint);
+
+
from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint);
+
+
from("direct:propagatedHeaders").to(toPropagatedHeaders).to(mockEndpoint);
+
+
from("direct:noRecordSpecificHeaders").to(toNoRecordSpecificHeaders).to(mockEndpoint);
+ }
+ };
+ }
+
+ @Bean("myStrategy")
+ public MyHeaderFilterStrategy createMyHeaderFilterStrategy(){
+ return new MyHeaderFilterStrategy();
+ }
+
+ @Bean("myHeaderSerializer")
+ public MyKafkaHeadersSerializer createMyKafkaHeadersSerializer(){
+ return new MyKafkaHeadersSerializer();
+ }
+ }
+
+ private byte[] getHeaderValue(String headerKey, Headers headers) {
+ Header foundHeader = StreamSupport.stream(headers.spliterator(),
false).filter(header -> header.key().equals(headerKey))
+ .findFirst().orElse(null);
+ assertNotNull(foundHeader, "Header should be sent");
+ return foundHeader.value();
+ }
+
+ private static KafkaConsumer<String, String>
createStringKafkaConsumer(final String groupId) {
+ Properties stringsProps = new Properties();
+
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
getBootstrapServers());
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
groupId);
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
"1000");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"30000");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+
+ return new KafkaConsumer<>(stringsProps);
+ }
+
+ private static KafkaConsumer<byte[], byte[]> createByteKafkaConsumer(final
String groupId) {
+ Properties stringsProps = new Properties();
+
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
getBootstrapServers());
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
groupId);
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
"1000");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"30000");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+
+ return new KafkaConsumer<>(stringsProps);
+ }
+
+ private List<ConsumerRecord<String, String>> pollForRecords(
+ KafkaConsumer<String, String> consumerConn, String topic,
CountDownLatch messagesLatch) {
+
+ List<ConsumerRecord<String, String>> consumedRecords = new
ArrayList<>();
+ consumerConn.subscribe(Collections.singletonList(topic));
+
+ new Thread(() -> {
+ while (messagesLatch.getCount() != 0) {
+ for (ConsumerRecord<String, String> record :
consumerConn.poll(Duration.ofMillis(100))) {
+ consumedRecords.add(record);
+ messagesLatch.countDown();
+ }
+ }
+ }).start();
+
+ return consumedRecords;
+ }
+
+ private void createKafkaMessageConsumer(
+ KafkaConsumer<String, String> consumerConn, String topic, String
topicInHeader, CountDownLatch messagesLatch) {
+
+ consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
+ boolean run = true;
+
+ while (run) {
+ ConsumerRecords<String, String> records =
consumerConn.poll(Duration.ofMillis(100));
+ for (int i = 0; i < records.count(); i++) {
+ messagesLatch.countDown();
+ if (messagesLatch.getCount() == 0) {
+ run = false;
+ }
+ }
+ }
+
+ }
+
+ private void createKafkaBytesMessageConsumer(
+ KafkaConsumer<byte[], byte[]> consumerConn, String topic, String
topicInHeader, CountDownLatch messagesLatch) {
+
+ consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
+ boolean run = true;
+
+ while (run) {
+ ConsumerRecords<byte[], byte[]> records =
consumerConn.poll(Duration.ofMillis(100));
+ for (int i = 0; i < records.count(); i++) {
+ messagesLatch.countDown();
+ if (messagesLatch.getCount() == 0) {
+ run = false;
+ }
+ }
+ }
+
+ }
+
+ private void sendMessagesInRoute(int messages, ProducerTemplate template,
Object bodyOther, String... headersWithValue) {
+ Map<String, Object> headerMap = new HashMap<>();
+ if (headersWithValue != null) {
+ for (int i = 0; i < headersWithValue.length; i = i + 2) {
+ headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
+ }
+ }
+ sendMessagesInRoute(messages, template, bodyOther, headerMap);
+ }
+
+ private void sendMessagesInRoute(int messages, ProducerTemplate template,
Object bodyOther, Map<String, Object> headerMap) {
+ for (int k = 0; k < messages; k++) {
+ template.sendBodyAndHeaders(bodyOther, headerMap);
+ }
+ }
+
+ private static class MyHeaderFilterStrategy extends
DefaultHeaderFilterStrategy {
+ }
+
+ private static class MyKafkaHeadersSerializer extends
DefaultKafkaHeaderSerializer {
+ }
+
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaToDIT.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaToDIT.java
new file mode 100644
index 0000000..6706a81
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaToDIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
+ KafkaToDIT.class,
+ KafkaToDIT.TestConfiguration.class,
+ }
+)
+public class KafkaToDIT extends BaseEmbeddedKafkaTestSupport {
+
+ @Autowired
+ ProducerTemplate template;
+
+ @Test
+ public void testToD() {
+ template.sendBodyAndHeader("direct:start", "Hello bar", "where",
"bar");
+ template.sendBodyAndHeader("direct:start", "Hello beer", "where",
"beer");
+
+ // there should only be one kafka endpoint
+ long count = context.getEndpoints().stream().filter(e ->
e.getEndpointUri().startsWith("kafka:")).count();
+ assertEquals(1, count, "There should only be 1 kafka endpoint");
+ }
+
+ @Configuration
+ public class TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ // route message dynamic using toD
+ from("direct:start").toD("kafka://${header.where}");
+ }
+ };
+ }
+ }
+
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockConsumerInterceptor.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockConsumerInterceptor.java
new file mode 100644
index 0000000..1914f2e
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockConsumerInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.ArrayList;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockConsumerInterceptor implements ConsumerInterceptor<String,
String> {
+ public static ArrayList<ConsumerRecords<String, String>> recordsCaptured =
new ArrayList<>();
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MockConsumerInterceptor.class);
+
+ @Override
+ public ConsumerRecords<String, String> onConsume(ConsumerRecords<String,
String> consumerRecords) {
+ consumerRecords.forEach(r -> LOG.trace("Captured on mock: {}",
r.value()));
+ recordsCaptured.add(consumerRecords);
+ return consumerRecords;
+ }
+
+ @Override
+ public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
+ // noop
+ }
+
+ @Override
+ public void close() {
+ // noop
+ }
+
+ @Override
+ public void configure(Map<String, ?> map) {
+ // noop
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockProducerInterceptor.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockProducerInterceptor.java
new file mode 100644
index 0000000..329f324
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/MockProducerInterceptor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.ArrayList;
+import java.util.Map;
+import org.apache.kafka.clients.producer.ProducerInterceptor;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+public class MockProducerInterceptor implements ProducerInterceptor<String,
String> {
+
+ public static ArrayList<ProducerRecord<String, String>> recordsCaptured =
new ArrayList<>();
+
+ @Override
+ public ProducerRecord<String, String> onSend(ProducerRecord<String,
String> producerRecord) {
+ recordsCaptured.add(producerRecord);
+ return producerRecord;
+ }
+
+ @Override
+ public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
+ // noop
+ }
+
+ @Override
+ public void close() {
+ // noop
+ }
+
+ @Override
+ public void configure(Map<String, ?> map) {
+ // noop
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/OffsetStateRepositoryConfiguration.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/OffsetStateRepositoryConfiguration.java
new file mode 100644
index 0000000..1025c7c
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/OffsetStateRepositoryConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.concurrent.CountDownLatch;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class OffsetStateRepositoryConfiguration {
+
+ @Bean("offset")
+ public KafkaConsumerRebalanceIT.OffsetStateRepository
createOffsetStateRepository(CountDownLatch messagesLatch){
+ return new
KafkaConsumerRebalanceIT.OffsetStateRepository(messagesLatch);
+ }
+
+ @Bean
+ public CountDownLatch createCountDownLatch(){
+ return new CountDownLatch(1);
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/ResumeStrategyConfiguration.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/ResumeStrategyConfiguration.java
new file mode 100644
index 0000000..e9ea18c
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/ResumeStrategyConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.concurrent.CountDownLatch;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ResumeStrategyConfiguration {
+
+ @Bean("resumeStrategy")
+ public
KafkaConsumerWithResumeRouteStrategyIT.TestKafkaConsumerResumeStrategy
createTestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch){
+ return new
KafkaConsumerWithResumeRouteStrategyIT.TestKafkaConsumerResumeStrategy(messagesLatch);
+ }
+
+ @Bean
+ public CountDownLatch createCountDownLatch(){
+ return new CountDownLatch(1);
+ }
+}
diff --git
a/components-starter/camel-kafka-starter/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/components-starter/camel-kafka-starter/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
new file mode 100644
index 0000000..99befb6
--- /dev/null
+++
b/components-starter/camel-kafka-starter/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.time.Duration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.hamcrest.core.IsNot;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.mockito.Mockito.when;
+
+
+@ExtendWith(MockitoExtension.class)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ KafkaConsumerTest.class,
+ }
+)
+public class KafkaConsumerTest {
+
+ @Mock
+ private KafkaConsumer<Object, Object> kafkaConsumer;
+
+ @BeforeEach
+ public void init() {
+
when(kafkaConsumer.poll(Duration.ofSeconds(1))).thenReturn(ConsumerRecords.empty());
+ }
+
+ @Test
+ public void testPollGivenReturnsEmptyConsumerRecordShouldNotBeNull() {
+ ConsumerRecords<Object, Object> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));
+ assertThat(consumerRecords, IsNot.not(nullValue()));
+ }
+
+ @Test
+ public void testPollGivenReturnsEmptyPartitionsShouldNotBeNull() {
+ ConsumerRecords<Object, Object> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));
+ assertThat(consumerRecords.partitions(), IsNot.not(nullValue()));
+ }
+}