This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch 22811 in repository https://gitbox.apache.org/repos/asf/camel.git
commit b331eced8ddd64d172002bec2340af04b5475dff Author: Andrea Cosentino <[email protected]> AuthorDate: Fri Feb 27 09:14:52 2026 +0100 CAMEL-22811 - camel-kafka: Support Kafka 4.0 KIP-848 consumer rebalance protocol (group.protocol=consumer) Signed-off-by: Andrea Cosentino <[email protected]> --- .../org/apache/camel/catalog/components/kafka.json | 4 +- .../org/apache/camel/component/kafka/kafka.json | 4 +- .../camel/component/kafka/KafkaConfiguration.java | 15 ++- .../integration/KafkaConsumerGroupProtocolIT.java | 149 +++++++++++++++++++++ .../ROOT/pages/camel-4x-upgrade-guide-4_19.adoc | 13 ++ .../dsl/KafkaComponentBuilderFactory.java | 5 +- .../endpoint/dsl/KafkaEndpointBuilderFactory.java | 5 +- .../test/infra/kafka/common/KafkaProperties.java | 3 +- .../services/ContainerLocalKafkaInfraService.java | 2 +- 9 files changed, 188 insertions(+), 12 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json index 51a4fd91ec8c..457e0e224993 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json @@ -50,7 +50,7 @@ "fetchWaitMaxMs": { "index": 23, "kind": "property", "displayName": "Fetch Wait Max Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum amount of time the server will block before answering the fetch [...] "groupId": { "index": 24, "kind": "property", "displayName": "Group Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the s [...] "groupInstanceId": { "index": 25, "kind": "property", "displayName": "Group Instance Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "A unique identifier of the consumer instance provided by the end user. Only non-empty strings are [...] - "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group Protocol", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "classic", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The consumer group protocol to use. The cl [...] + "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group Protocol", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "classic", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The consumer group protocol to use. The cl [...] "groupRemoteAssignor": { "index": 27, "kind": "property", "displayName": "Group Remote Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The name of the server-side assignor to use when group.protocol is set to consumer. If no [...] "headerDeserializer": { "index": 28, "kind": "property", "displayName": "Header Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom KafkaHeaderDeserializer to des [...] "heartbeatIntervalMs": { "index": 29, "kind": "property", "displayName": "Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 3000, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The expected time between heartbeats to the consumer coordinator [...] @@ -195,7 +195,7 @@ "fetchWaitMaxMs": { "index": 22, "kind": "parameter", "displayName": "Fetch Wait Max Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum amount of time the server will block before answering the fetc [...] "groupId": { "index": 23, "kind": "parameter", "displayName": "Group Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the [...] "groupInstanceId": { "index": 24, "kind": "parameter", "displayName": "Group Instance Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "A unique identifier of the consumer instance provided by the end user. Only non-empty strings ar [...] - "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group Protocol", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "classic", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The consumer group protocol to use. The c [...] + "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group Protocol", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "classic", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The consumer group protocol to use. The c [...] "groupRemoteAssignor": { "index": 26, "kind": "parameter", "displayName": "Group Remote Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The name of the server-side assignor to use when group.protocol is set to consumer. If n [...] "headerDeserializer": { "index": 27, "kind": "parameter", "displayName": "Header Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom KafkaHeaderDeserializer to de [...] "heartbeatIntervalMs": { "index": 28, "kind": "parameter", "displayName": "Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 3000, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The expected time between heartbeats to the consumer coordinator [...] diff --git a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json index 51a4fd91ec8c..457e0e224993 100644 --- a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json @@ -50,7 +50,7 @@ "fetchWaitMaxMs": { "index": 23, "kind": "property", "displayName": "Fetch Wait Max Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum amount of time the server will block before answering the fetch [...] "groupId": { "index": 24, "kind": "property", "displayName": "Group Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the s [...] "groupInstanceId": { "index": 25, "kind": "property", "displayName": "Group Instance Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "A unique identifier of the consumer instance provided by the end user. Only non-empty strings are [...] - "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group Protocol", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "classic", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The consumer group protocol to use. The cl [...] + "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group Protocol", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "classic", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The consumer group protocol to use. The cl [...] "groupRemoteAssignor": { "index": 27, "kind": "property", "displayName": "Group Remote Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The name of the server-side assignor to use when group.protocol is set to consumer. If no [...] "headerDeserializer": { "index": 28, "kind": "property", "displayName": "Header Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom KafkaHeaderDeserializer to des [...] "heartbeatIntervalMs": { "index": 29, "kind": "property", "displayName": "Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 3000, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The expected time between heartbeats to the consumer coordinator [...] @@ -195,7 +195,7 @@ "fetchWaitMaxMs": { "index": 22, "kind": "parameter", "displayName": "Fetch Wait Max Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum amount of time the server will block before answering the fetc [...] "groupId": { "index": 23, "kind": "parameter", "displayName": "Group Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the [...] "groupInstanceId": { "index": 24, "kind": "parameter", "displayName": "Group Instance Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "A unique identifier of the consumer instance provided by the end user. Only non-empty strings ar [...] - "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group Protocol", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "classic", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The consumer group protocol to use. The c [...] + "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group Protocol", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "classic", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The consumer group protocol to use. The c [...] "groupRemoteAssignor": { "index": 26, "kind": "parameter", "displayName": "Group Remote Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The name of the server-side assignor to use when group.protocol is set to consumer. If n [...] "headerDeserializer": { "index": 27, "kind": "parameter", "displayName": "Header Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom KafkaHeaderDeserializer to de [...] "heartbeatIntervalMs": { "index": 28, "kind": "parameter", "displayName": "Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 3000, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The expected time between heartbeats to the consumer coordinator [...] diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index dc275a1703b8..f7d9c196aee0 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -566,16 +566,22 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware addPropertyIfNotEmpty(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer()); addPropertyIfNotEmpty(props, ConsumerConfig.FETCH_MIN_BYTES_CONFIG, getFetchMinBytes()); addPropertyIfNotEmpty(props, ConsumerConfig.FETCH_MAX_BYTES_CONFIG, getFetchMaxBytes()); - addPropertyIfNotEmpty(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs()); + // classic-only properties: skip when using the new consumer protocol (KIP-848) + boolean classicProtocol = !"consumer".equals(getGroupProtocol()); + if (classicProtocol) { + addPropertyIfNotEmpty(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs()); + addPropertyIfNotEmpty(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs()); + } addPropertyIfNotEmpty(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes()); - addPropertyIfNotEmpty(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs()); addPropertyIfNotEmpty(props, ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs()); addPropertyIfNotEmpty(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords()); addPropertyIfNotEmpty(props, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, getInterceptorClasses()); addPropertyIfNotEmpty(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset()); addPropertyIfNotEmpty(props, ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs()); addPropertyIfNotEmpty(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, getAutoCommitEnable()); - addPropertyIfNotEmpty(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor()); + if (classicProtocol) { + addPropertyIfNotEmpty(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor()); + } addPropertyIfNotEmpty(props, ConsumerConfig.GROUP_PROTOCOL_CONFIG, getGroupProtocol()); addPropertyIfNotEmpty(props, ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, getGroupRemoteAssignor()); addPropertyIfNotEmpty(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes()); @@ -1916,6 +1922,9 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware * The consumer group protocol to use. The "classic" protocol uses the traditional partition assignment and * rebalancing mechanism. The "consumer" protocol enables the new KIP-848 consumer rebalance protocol which provides * faster and more efficient rebalancing. + * + * When set to "consumer", classic-only properties (heartbeatIntervalMs, sessionTimeoutMs, partitionAssignor) are + * automatically excluded from the consumer configuration. */ public void setGroupProtocol(String groupProtocol) { this.groupProtocol = groupProtocol; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerGroupProtocolIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerGroupProtocolIT.java new file mode 100644 index 000000000000..7838be0c2678 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerGroupProtocolIT.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.integration.common.KafkaAdminUtil; +import org.apache.camel.component.kafka.integration.common.KafkaTestUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.infra.core.annotations.RouteFixture; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.FeatureMetadata; +import org.apache.kafka.clients.admin.FinalizedVersionRange; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test for the KIP-848 consumer rebalance protocol (group.protocol=consumer). Verifies that Camel's Kafka + * consumer works correctly with the new consumer protocol, which does not use classic-only properties like + * heartbeat.interval.ms, session.timeout.ms, and partition.assignment.strategy. + * + * This test requires a Kafka 4.0+ broker with the new group coordinator enabled (group.version >= 1). + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class KafkaConsumerGroupProtocolIT extends BaseKafkaTestSupport { + public static final String TOPIC = "test-group-protocol-" + Uuid.randomUuid(); + + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerGroupProtocolIT.class); + + private static final String FROM_URI = "kafka:" + TOPIC + + "?groupId=KafkaConsumerGroupProtocolIT&autoOffsetReset=earliest" + + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true" + + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor" + + "&groupProtocol=consumer"; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeAll + static void checkConsumerProtocolSupport() { + try (AdminClient adminClient = KafkaAdminUtil.createAdminClient(service)) { + FeatureMetadata metadata = adminClient.describeFeatures().featureMetadata().get(10, TimeUnit.SECONDS); + Map<String, FinalizedVersionRange> finalizedFeatures = metadata.finalizedFeatures(); + FinalizedVersionRange groupVersion = finalizedFeatures.get("group.version"); + Assumptions.assumeTrue( + groupVersion != null && groupVersion.maxVersionLevel() >= 1, + "Broker does not support the consumer group protocol (KIP-848), requires Kafka 4.0+ with group.version >= 1"); + } catch (Exception e) { + Assumptions.assumeTrue(false, + "Could not determine broker feature support: " + e.getMessage()); + } + } + + @BeforeEach + public void before() { + Properties props = getDefaultProperties(); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + } + + @RouteFixture + @Override + public void createRouteBuilder(CamelContext context) throws Exception { + context.addRoutes(createRouteBuilder()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from(FROM_URI) + .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody())) + .routeId("group-protocol-it").to(KafkaTestUtil.MOCK_RESULT); + } + }; + } + + @Test + public void kafkaMessageIsConsumedWithConsumerProtocol() throws InterruptedException { + String propagatedHeaderKey = "PropagatedCustomHeader"; + byte[] propagatedHeaderValue = "propagated header value".getBytes(); + String skippedHeaderKey = "CamelSkippedHeader"; + + MockEndpoint to = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT); + + to.expectedMessageCount(5); + to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); + to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue); + + for (int k = 0; k < 5; k++) { + String msg = "message-" + k; + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg); + data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes())); + data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue)); + producer.send(data); + } + + to.assertIsSatisfied(3000); + + assertEquals(5, MockConsumerInterceptor.recordsCaptured.stream() + .flatMap(i -> StreamSupport.stream(i.records(TOPIC).spliterator(), false)).count()); + + java.util.Map<String, Object> headers = to.getExchanges().get(0).getIn().getHeaders(); + assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive skipped header"); + assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive propagated header"); + } +} diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc index 3d9244a9d699..1587daec1855 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc @@ -123,6 +123,19 @@ Google recommends migrating your Pub/Sub Lite workloads to either: * **Google Cloud Pub/Sub** → use the `camel-google-pubsub` component * **Google Cloud Managed Service for Apache Kafka** → use the `camel-kafka` component +=== camel-kafka + +The Kafka client library has been upgraded from 3.9.x to 4.2.0. This is a major upgrade with the following notable changes: + +* The `lingerMs` option default value has changed from `0` to `5` to align with the Kafka 4.x client default for `linger.ms`. +* Two new consumer options have been added: `groupProtocol` (default `classic`, enums: `classic`, `consumer`) and `groupRemoteAssignor`. These support the KIP-848 consumer rebalance protocol introduced in Kafka 4.0. When `groupProtocol` is set to `consumer`, the classic-only properties (`heartbeatIntervalMs`, `sessionTimeoutMs`, `partitionAssignor`) are automatically excluded from the consumer configuration. +* The removed `DefaultPartitioner` and `UniformStickyPartitioner` classes are no longer available. If you were explicitly setting `partitioner` to one of these classes, remove the configuration as Kafka 4.x uses the default partitioner automatically. + +The test infrastructure for Kafka has been updated: + +* The `local-kafka3-container` mapping has been renamed to `local-kafka-container`. If you were using `-Dkafka.instance.type=local-kafka3-container`, update to `-Dkafka.instance.type=local-kafka-container`. +* The Strimzi test service now runs in KRaft mode (no ZooKeeper). The `ZookeeperContainer` class has been removed. + === camel-mail When configured a custom `IdempotentRepository` on `camel-mail` endpoint, then Camel will now auto-start diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java index 78eba1c22b1c..71f9c7d38a36 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java @@ -590,7 +590,10 @@ public interface KafkaComponentBuilderFactory { * The consumer group protocol to use. The classic protocol uses the * traditional partition assignment and rebalancing mechanism. The * consumer protocol enables the new KIP-848 consumer rebalance protocol - * which provides faster and more efficient rebalancing. + * which provides faster and more efficient rebalancing. When set to + * consumer, classic-only properties (heartbeatIntervalMs, + * sessionTimeoutMs, partitionAssignor) are automatically excluded from + * the consumer configuration. * * The option is a: <code>java.lang.String</code> type. * diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java index 2b5c7b2978f9..9dc46314a149 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java @@ -871,7 +871,10 @@ public interface KafkaEndpointBuilderFactory { * The consumer group protocol to use. The classic protocol uses the * traditional partition assignment and rebalancing mechanism. The * consumer protocol enables the new KIP-848 consumer rebalance protocol - * which provides faster and more efficient rebalancing. + * which provides faster and more efficient rebalancing. When set to + * consumer, classic-only properties (heartbeatIntervalMs, + * sessionTimeoutMs, partitionAssignor) are automatically excluded from + * the consumer configuration. * * The option is a: <code>java.lang.String</code> type. * diff --git a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java index 9b6881ee970a..31114c890770 100644 --- a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java +++ b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java @@ -21,8 +21,7 @@ public final class KafkaProperties { public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String KAFKA_ZOOKEEPER_ADDRESS = "kafka.zookeeper.address"; public static final String CONFLUENT_CONTAINER = "confluent.container.image"; - public static final String KAFKA_CONTAINER = "kafka.container"; - public static final String KAFKA_CONTAINER_IMAGE = "kafka.container.image"; + public static final String KAFKA_CONTAINER = "kafka.container.image"; public static final String REDPANDA_CONTAINER = "redpanda.container.image"; public static final String STRIMZI_CONTAINER = "strimzi.container.image"; diff --git a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java index 71e6b883faad..80fc7143d1f8 100644 --- a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java +++ b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java @@ -33,7 +33,7 @@ import org.testcontainers.utility.DockerImageName; public class ContainerLocalKafkaInfraService implements KafkaInfraService, ContainerService<KafkaContainer> { public static final String KAFKA_IMAGE_NAME = LocalPropertyResolver.getProperty( ContainerLocalKafkaInfraService.class, - KafkaProperties.KAFKA_CONTAINER_IMAGE); + KafkaProperties.KAFKA_CONTAINER); private static final Logger LOG = LoggerFactory.getLogger(ContainerLocalKafkaInfraService.class); protected KafkaContainer kafka;
