This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit cfc5a88c10cf19fa4baa74f0f793751cf73bb894 Author: Andrea Cosentino <[email protected]> AuthorDate: Wed Apr 5 10:53:21 2023 +0200 CAMEL-19180 - Kafka Idempotent Repository does not give the user control over a randomized group id if the kafka broker requires the id to be in a specified form Signed-off-by: Andrea Cosentino <[email protected]> --- .../kafka/KafkaIdempotentRepository.java | 47 ++++++++++++- .../KafkaConsumerIdempotentGroupIdIT.java | 80 ++++++++++++++++++++++ 2 files changed, 125 insertions(+), 2 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java index 8ae44575332..89debcfaeb9 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java @@ -82,6 +82,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot // configurable private String topic; private String bootstrapServers; + + private String groupId = null; private Properties producerConfig; private Properties consumerConfig; private int maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE; @@ -113,6 +115,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS); } + public KafkaIdempotentRepository(String topic, String bootstrapServers, String groupId) { + this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId); + } + public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs) { this.topic = topic; this.bootstrapServers = bootstrapServers; @@ -124,6 +130,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS); } + public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, String groupId) { + this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId); + } + public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs) { this.topic = topic; @@ -133,6 +143,24 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this.pollDurationMs = pollDurationMs; } + public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs, String groupId) { + this.topic = topic; + this.bootstrapServers = bootstrapServers; + this.maxCacheSize = maxCacheSize; + this.pollDurationMs = pollDurationMs; + this.groupId = groupId; + } + + public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, + int pollDurationMs, String groupId) { + this.topic = topic; + this.consumerConfig = consumerConfig; + this.producerConfig = producerConfig; + this.maxCacheSize = maxCacheSize; + this.pollDurationMs = pollDurationMs; + this.groupId = groupId; + } + public String getTopic() { return topic; } @@ -250,6 +278,19 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this.pollDurationMs = pollDurationMs; } + public String getGroupId() { + return groupId; + } + + /** + * Sets the group id of the Kafka consumer. + * + * @param groupId The poll duration in milliseconds. + */ + public void setGroupId(String groupId) { + this.groupId = groupId; + } + @Override public void setCamelContext(CamelContext camelContext) { this.camelContext = camelContext; @@ -284,8 +325,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot ObjectHelper.notNull(producerConfig, "producerConfig"); // each consumer instance must have control over its own offset, so - // assign a groupID at random - String groupId = UUID.randomUUID().toString(); + // assign a groupID at random if not specified + if (ObjectHelper.isEmpty(groupId)) { + groupId = UUID.randomUUID().toString(); + } log.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java new file mode 100644 index 00000000000..09937f7542b --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.integration.common.KafkaTestUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; + +import java.util.Arrays; + +import static org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHeader; + +@DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "false") +public class KafkaConsumerIdempotentGroupIdIT extends KafkaConsumerIdempotentTestSupport { + + public static final String TOPIC = "idempt"; + + private int size = 200; + + @BindToRegistry("kafkaIdempotentRepository") + private KafkaIdempotentRepository testIdempotent = new KafkaIdempotentRepository("TEST_IDEMPOTENT", getBootstrapServers(), "test_1"); + + @BeforeEach + public void before() { + kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); + doSend(size, TOPIC); + } + + @AfterEach + public void after() { + kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from("kafka:" + TOPIC + + "?groupId=KafkaConsumerIdempotentIT&autoOffsetReset=earliest" + + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true" + + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor").routeId("foo") + .idempotentConsumer(numericHeader("id")) + .idempotentRepository("kafkaIdempotentRepository") + .to(KafkaTestUtil.MOCK_RESULT); + } + }; + } + + @Test + @DisplayName("Numeric headers is consumable when using idempotent (CAMEL-16914)") + void kafkaIdempotentMessageIsConsumedByCamel() { + MockEndpoint to = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT); + + doRun(to, size); + } +}
