This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 6455194c3e0c CAMEL-22811 - camel-kafka: Support Kafka 4.0 KIP-848
consumer rebalance protocol (group.protocol=consumer) (#21626)
6455194c3e0c is described below
commit 6455194c3e0caf7e5bdfa53a60eab854ff3c6355
Author: Andrea Cosentino <[email protected]>
AuthorDate: Mon Mar 2 10:21:45 2026 +0100
CAMEL-22811 - camel-kafka: Support Kafka 4.0 KIP-848 consumer rebalance
protocol (group.protocol=consumer) (#21626)
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../org/apache/camel/catalog/components/kafka.json | 4 +-
.../org/apache/camel/component/kafka/kafka.json | 4 +-
.../camel/component/kafka/KafkaConfiguration.java | 15 ++-
.../integration/KafkaConsumerGroupProtocolIT.java | 149 +++++++++++++++++++++
.../ROOT/pages/camel-4x-upgrade-guide-4_19.adoc | 13 ++
.../dsl/KafkaComponentBuilderFactory.java | 5 +-
.../endpoint/dsl/KafkaEndpointBuilderFactory.java | 5 +-
.../test/infra/kafka/common/KafkaProperties.java | 3 +-
.../services/ContainerLocalKafkaInfraService.java | 2 +-
9 files changed, 188 insertions(+), 12 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 51a4fd91ec8c..457e0e224993 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -50,7 +50,7 @@
"fetchWaitMaxMs": { "index": 23, "kind": "property", "displayName": "Fetch
Wait Max Ms", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 500, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The maximum amount of time the server will
block before answering the fetch [...]
"groupId": { "index": 24, "kind": "property", "displayName": "Group Id",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "A string that uniquely identifies the group of
consumer processes to which this consumer belongs. By setting the s [...]
"groupInstanceId": { "index": 25, "kind": "property", "displayName":
"Group Instance Id", "group": "consumer", "label": "consumer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "A unique identifier of the consumer instance
provided by the end user. Only non-empty strings are [...]
- "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group
Protocol", "group": "consumer", "label": "consumer", "required": false, "type":
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"classic", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The consumer group protocol to use. The cl
[...]
+ "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group
Protocol", "group": "consumer", "label": "consumer", "required": false, "type":
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"classic", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The consumer group protocol to use. The cl
[...]
"groupRemoteAssignor": { "index": 27, "kind": "property", "displayName":
"Group Remote Assignor", "group": "consumer", "label": "consumer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The name of the server-side assignor to use
when group.protocol is set to consumer. If no [...]
"headerDeserializer": { "index": 28, "kind": "property", "displayName":
"Header Deserializer", "group": "consumer", "label": "consumer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "To use a custom KafkaHeaderDeserializer to des
[...]
"heartbeatIntervalMs": { "index": 29, "kind": "property", "displayName":
"Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 3000,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The expected time
between heartbeats to the consumer coordinator [...]
@@ -195,7 +195,7 @@
"fetchWaitMaxMs": { "index": 22, "kind": "parameter", "displayName":
"Fetch Wait Max Ms", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 500, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The maximum amount of time the server will
block before answering the fetc [...]
"groupId": { "index": 23, "kind": "parameter", "displayName": "Group Id",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "A string that uniquely identifies the group of
consumer processes to which this consumer belongs. By setting the [...]
"groupInstanceId": { "index": 24, "kind": "parameter", "displayName":
"Group Instance Id", "group": "consumer", "label": "consumer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "A unique identifier of the consumer instance
provided by the end user. Only non-empty strings ar [...]
- "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group
Protocol", "group": "consumer", "label": "consumer", "required": false, "type":
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"classic", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The consumer group protocol to use. The c [...]
+ "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group
Protocol", "group": "consumer", "label": "consumer", "required": false, "type":
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"classic", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The consumer group protocol to use. The c [...]
"groupRemoteAssignor": { "index": 26, "kind": "parameter", "displayName":
"Group Remote Assignor", "group": "consumer", "label": "consumer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The name of the server-side assignor to use
when group.protocol is set to consumer. If n [...]
"headerDeserializer": { "index": 27, "kind": "parameter", "displayName":
"Header Deserializer", "group": "consumer", "label": "consumer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "To use a custom KafkaHeaderDeserializer to de
[...]
"heartbeatIntervalMs": { "index": 28, "kind": "parameter", "displayName":
"Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 3000,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The expected time
between heartbeats to the consumer coordinator [...]
diff --git
a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
index 51a4fd91ec8c..457e0e224993 100644
---
a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
+++
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
@@ -50,7 +50,7 @@
"fetchWaitMaxMs": { "index": 23, "kind": "property", "displayName": "Fetch
Wait Max Ms", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 500, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The maximum amount of time the server will
block before answering the fetch [...]
"groupId": { "index": 24, "kind": "property", "displayName": "Group Id",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "A string that uniquely identifies the group of
consumer processes to which this consumer belongs. By setting the s [...]
"groupInstanceId": { "index": 25, "kind": "property", "displayName":
"Group Instance Id", "group": "consumer", "label": "consumer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "A unique identifier of the consumer instance
provided by the end user. Only non-empty strings are [...]
- "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group
Protocol", "group": "consumer", "label": "consumer", "required": false, "type":
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"classic", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The consumer group protocol to use. The cl
[...]
+ "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group
Protocol", "group": "consumer", "label": "consumer", "required": false, "type":
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"classic", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The consumer group protocol to use. The cl
[...]
"groupRemoteAssignor": { "index": 27, "kind": "property", "displayName":
"Group Remote Assignor", "group": "consumer", "label": "consumer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The name of the server-side assignor to use
when group.protocol is set to consumer. If no [...]
"headerDeserializer": { "index": 28, "kind": "property", "displayName":
"Header Deserializer", "group": "consumer", "label": "consumer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "To use a custom KafkaHeaderDeserializer to des
[...]
"heartbeatIntervalMs": { "index": 29, "kind": "property", "displayName":
"Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 3000,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The expected time
between heartbeats to the consumer coordinator [...]
@@ -195,7 +195,7 @@
"fetchWaitMaxMs": { "index": 22, "kind": "parameter", "displayName":
"Fetch Wait Max Ms", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 500, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The maximum amount of time the server will
block before answering the fetc [...]
"groupId": { "index": 23, "kind": "parameter", "displayName": "Group Id",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "A string that uniquely identifies the group of
consumer processes to which this consumer belongs. By setting the [...]
"groupInstanceId": { "index": 24, "kind": "parameter", "displayName":
"Group Instance Id", "group": "consumer", "label": "consumer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "A unique identifier of the consumer instance
provided by the end user. Only non-empty strings ar [...]
- "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group
Protocol", "group": "consumer", "label": "consumer", "required": false, "type":
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"classic", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The consumer group protocol to use. The c [...]
+ "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group
Protocol", "group": "consumer", "label": "consumer", "required": false, "type":
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"classic", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The consumer group protocol to use. The c [...]
"groupRemoteAssignor": { "index": 26, "kind": "parameter", "displayName":
"Group Remote Assignor", "group": "consumer", "label": "consumer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The name of the server-side assignor to use
when group.protocol is set to consumer. If n [...]
"headerDeserializer": { "index": 27, "kind": "parameter", "displayName":
"Header Deserializer", "group": "consumer", "label": "consumer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "To use a custom KafkaHeaderDeserializer to de
[...]
"heartbeatIntervalMs": { "index": 28, "kind": "parameter", "displayName":
"Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 3000,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The expected time
between heartbeats to the consumer coordinator [...]
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index dc275a1703b8..f7d9c196aee0 100755
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -566,16 +566,22 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
addPropertyIfNotEmpty(props,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer());
addPropertyIfNotEmpty(props, ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
getFetchMinBytes());
addPropertyIfNotEmpty(props, ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
getFetchMaxBytes());
- addPropertyIfNotEmpty(props,
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
+ // classic-only properties: skip when using the new consumer protocol
(KIP-848)
+ boolean classicProtocol = !"consumer".equals(getGroupProtocol());
+ if (classicProtocol) {
+ addPropertyIfNotEmpty(props,
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
+ addPropertyIfNotEmpty(props,
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
+ }
addPropertyIfNotEmpty(props,
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
- addPropertyIfNotEmpty(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
getSessionTimeoutMs());
addPropertyIfNotEmpty(props,
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs());
addPropertyIfNotEmpty(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
getMaxPollRecords());
addPropertyIfNotEmpty(props,
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, getInterceptorClasses());
addPropertyIfNotEmpty(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
getAutoOffsetReset());
addPropertyIfNotEmpty(props,
ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
addPropertyIfNotEmpty(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
getAutoCommitEnable());
- addPropertyIfNotEmpty(props,
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
+ if (classicProtocol) {
+ addPropertyIfNotEmpty(props,
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
+ }
addPropertyIfNotEmpty(props, ConsumerConfig.GROUP_PROTOCOL_CONFIG,
getGroupProtocol());
addPropertyIfNotEmpty(props,
ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, getGroupRemoteAssignor());
addPropertyIfNotEmpty(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG,
getReceiveBufferBytes());
@@ -1916,6 +1922,9 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
* The consumer group protocol to use. The "classic" protocol uses the
traditional partition assignment and
* rebalancing mechanism. The "consumer" protocol enables the new KIP-848
consumer rebalance protocol which provides
* faster and more efficient rebalancing.
+ *
+ * When set to "consumer", classic-only properties (heartbeatIntervalMs,
sessionTimeoutMs, partitionAssignor) are
+ * automatically excluded from the consumer configuration.
*/
public void setGroupProtocol(String groupProtocol) {
this.groupProtocol = groupProtocol;
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerGroupProtocolIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerGroupProtocolIT.java
new file mode 100644
index 000000000000..7838be0c2678
--- /dev/null
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerGroupProtocolIT.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.integration.common.KafkaAdminUtil;
+import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.infra.core.annotations.RouteFixture;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.FeatureMetadata;
+import org.apache.kafka.clients.admin.FinalizedVersionRange;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test for the KIP-848 consumer rebalance protocol
(group.protocol=consumer). Verifies that Camel's Kafka
+ * consumer works correctly with the new consumer protocol, which does not use
classic-only properties like
+ * heartbeat.interval.ms, session.timeout.ms, and
partition.assignment.strategy.
+ *
+ * This test requires a Kafka 4.0+ broker with the new group coordinator
enabled (group.version >= 1).
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumerGroupProtocolIT extends BaseKafkaTestSupport {
+ public static final String TOPIC = "test-group-protocol-" +
Uuid.randomUuid();
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaConsumerGroupProtocolIT.class);
+
+ private static final String FROM_URI = "kafka:" + TOPIC
+ +
"?groupId=KafkaConsumerGroupProtocolIT&autoOffsetReset=earliest"
+ +
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
+ +
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor"
+ + "&groupProtocol=consumer";
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ @BeforeAll
+ static void checkConsumerProtocolSupport() {
+ try (AdminClient adminClient =
KafkaAdminUtil.createAdminClient(service)) {
+ FeatureMetadata metadata =
adminClient.describeFeatures().featureMetadata().get(10, TimeUnit.SECONDS);
+ Map<String, FinalizedVersionRange> finalizedFeatures =
metadata.finalizedFeatures();
+ FinalizedVersionRange groupVersion =
finalizedFeatures.get("group.version");
+ Assumptions.assumeTrue(
+ groupVersion != null && groupVersion.maxVersionLevel() >=
1,
+ "Broker does not support the consumer group protocol
(KIP-848), requires Kafka 4.0+ with group.version >= 1");
+ } catch (Exception e) {
+ Assumptions.assumeTrue(false,
+ "Could not determine broker feature support: " +
e.getMessage());
+ }
+ }
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ MockConsumerInterceptor.recordsCaptured.clear();
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+
+ @RouteFixture
+ @Override
+ public void createRouteBuilder(CamelContext context) throws Exception {
+ context.addRoutes(createRouteBuilder());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(FROM_URI)
+ .process(exchange -> LOG.trace("Captured on the
processor: {}", exchange.getMessage().getBody()))
+
.routeId("group-protocol-it").to(KafkaTestUtil.MOCK_RESULT);
+ }
+ };
+ }
+
+ @Test
+ public void kafkaMessageIsConsumedWithConsumerProtocol() throws
InterruptedException {
+ String propagatedHeaderKey = "PropagatedCustomHeader";
+ byte[] propagatedHeaderValue = "propagated header value".getBytes();
+ String skippedHeaderKey = "CamelSkippedHeader";
+
+ MockEndpoint to =
contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
+
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
+ to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
+
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped
header value".getBytes()));
+ data.headers().add(new RecordHeader(propagatedHeaderKey,
propagatedHeaderValue));
+ producer.send(data);
+ }
+
+ to.assertIsSatisfied(3000);
+
+ assertEquals(5, MockConsumerInterceptor.recordsCaptured.stream()
+ .flatMap(i ->
StreamSupport.stream(i.records(TOPIC).spliterator(), false)).count());
+
+ java.util.Map<String, Object> headers =
to.getExchanges().get(0).getIn().getHeaders();
+ assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive
skipped header");
+ assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive
propagated header");
+ }
+}
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
index 631eb45d7153..a6a9eeca11bf 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
@@ -249,6 +249,19 @@ Google recommends migrating your Pub/Sub Lite workloads to
either:
* **Google Cloud Pub/Sub** → use the `camel-google-pubsub` component
* **Google Cloud Managed Service for Apache Kafka** → use the `camel-kafka`
component
+=== camel-kafka
+
+The Kafka client library has been upgraded from 3.9.x to 4.2.0. This is a
major upgrade with the following notable changes:
+
+* The `lingerMs` option default value has changed from `0` to `5` to align
with the Kafka 4.x client default for `linger.ms`.
+* Two new consumer options have been added: `groupProtocol` (default
`classic`, enums: `classic`, `consumer`) and `groupRemoteAssignor`. These
support the KIP-848 consumer rebalance protocol introduced in Kafka 4.0. When
`groupProtocol` is set to `consumer`, the classic-only properties
(`heartbeatIntervalMs`, `sessionTimeoutMs`, `partitionAssignor`) are
automatically excluded from the consumer configuration.
+* The removed `DefaultPartitioner` and `UniformStickyPartitioner` classes are
no longer available. If you were explicitly setting `partitioner` to one of
these classes, remove the configuration as Kafka 4.x uses the default
partitioner automatically.
+
+The test infrastructure for Kafka has been updated:
+
+* The `local-kafka3-container` mapping has been renamed to
`local-kafka-container`. If you were using
`-Dkafka.instance.type=local-kafka3-container`, update to
`-Dkafka.instance.type=local-kafka-container`.
+* The Strimzi test service now runs in KRaft mode (no ZooKeeper). The
`ZookeeperContainer` class has been removed.
+
=== camel-json-patch
The `camel-json-patch` is now deprecated - the library it uses is not active
maintained and this module does not work with Jackon 3.
diff --git
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 78eba1c22b1c..71f9c7d38a36 100644
---
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -590,7 +590,10 @@ public interface KafkaComponentBuilderFactory {
* The consumer group protocol to use. The classic protocol uses the
* traditional partition assignment and rebalancing mechanism. The
* consumer protocol enables the new KIP-848 consumer rebalance
protocol
- * which provides faster and more efficient rebalancing.
+ * which provides faster and more efficient rebalancing. When set to
+ * consumer, classic-only properties (heartbeatIntervalMs,
+ * sessionTimeoutMs, partitionAssignor) are automatically excluded from
+ * the consumer configuration.
*
* The option is a: <code>java.lang.String</code> type.
*
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 2b5c7b2978f9..9dc46314a149 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -871,7 +871,10 @@ public interface KafkaEndpointBuilderFactory {
* The consumer group protocol to use. The classic protocol uses the
* traditional partition assignment and rebalancing mechanism. The
* consumer protocol enables the new KIP-848 consumer rebalance
protocol
- * which provides faster and more efficient rebalancing.
+ * which provides faster and more efficient rebalancing. When set to
+ * consumer, classic-only properties (heartbeatIntervalMs,
+ * sessionTimeoutMs, partitionAssignor) are automatically excluded from
+ * the consumer configuration.
*
* The option is a: <code>java.lang.String</code> type.
*
diff --git
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
index 9b6881ee970a..31114c890770 100644
---
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
+++
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
@@ -21,8 +21,7 @@ public final class KafkaProperties {
public static final String KAFKA_BOOTSTRAP_SERVERS =
"kafka.bootstrap.servers";
public static final String KAFKA_ZOOKEEPER_ADDRESS =
"kafka.zookeeper.address";
public static final String CONFLUENT_CONTAINER =
"confluent.container.image";
- public static final String KAFKA_CONTAINER = "kafka.container";
- public static final String KAFKA_CONTAINER_IMAGE = "kafka.container.image";
+ public static final String KAFKA_CONTAINER = "kafka.container.image";
public static final String REDPANDA_CONTAINER = "redpanda.container.image";
public static final String STRIMZI_CONTAINER = "strimzi.container.image";
diff --git
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java
index 71e6b883faad..80fc7143d1f8 100644
---
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java
+++
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java
@@ -33,7 +33,7 @@ import org.testcontainers.utility.DockerImageName;
public class ContainerLocalKafkaInfraService implements KafkaInfraService,
ContainerService<KafkaContainer> {
public static final String KAFKA_IMAGE_NAME =
LocalPropertyResolver.getProperty(
ContainerLocalKafkaInfraService.class,
- KafkaProperties.KAFKA_CONTAINER_IMAGE);
+ KafkaProperties.KAFKA_CONTAINER);
private static final Logger LOG =
LoggerFactory.getLogger(ContainerLocalKafkaInfraService.class);
protected KafkaContainer kafka;