This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch 22811
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b331eced8ddd64d172002bec2340af04b5475dff
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri Feb 27 09:14:52 2026 +0100

    CAMEL-22811 - camel-kafka: Support Kafka 4.0 KIP-848 consumer rebalance 
protocol (group.protocol=consumer)
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 .../org/apache/camel/catalog/components/kafka.json |   4 +-
 .../org/apache/camel/component/kafka/kafka.json    |   4 +-
 .../camel/component/kafka/KafkaConfiguration.java  |  15 ++-
 .../integration/KafkaConsumerGroupProtocolIT.java  | 149 +++++++++++++++++++++
 .../ROOT/pages/camel-4x-upgrade-guide-4_19.adoc    |  13 ++
 .../dsl/KafkaComponentBuilderFactory.java          |   5 +-
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  |   5 +-
 .../test/infra/kafka/common/KafkaProperties.java   |   3 +-
 .../services/ContainerLocalKafkaInfraService.java  |   2 +-
 9 files changed, 188 insertions(+), 12 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 51a4fd91ec8c..457e0e224993 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -50,7 +50,7 @@
     "fetchWaitMaxMs": { "index": 23, "kind": "property", "displayName": "Fetch 
Wait Max Ms", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 500, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The maximum amount of time the server will 
block before answering the fetch [...]
     "groupId": { "index": 24, "kind": "property", "displayName": "Group Id", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "A string that uniquely identifies the group of 
consumer processes to which this consumer belongs. By setting the s [...]
     "groupInstanceId": { "index": 25, "kind": "property", "displayName": 
"Group Instance Id", "group": "consumer", "label": "consumer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "A unique identifier of the consumer instance 
provided by the end user. Only non-empty strings are [...]
-    "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group 
Protocol", "group": "consumer", "label": "consumer", "required": false, "type": 
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"classic", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The consumer group protocol to use. The cl 
[...]
+    "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group 
Protocol", "group": "consumer", "label": "consumer", "required": false, "type": 
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"classic", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The consumer group protocol to use. The cl 
[...]
     "groupRemoteAssignor": { "index": 27, "kind": "property", "displayName": 
"Group Remote Assignor", "group": "consumer", "label": "consumer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The name of the server-side assignor to use 
when group.protocol is set to consumer. If no [...]
     "headerDeserializer": { "index": 28, "kind": "property", "displayName": 
"Header Deserializer", "group": "consumer", "label": "consumer", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom KafkaHeaderDeserializer to des 
[...]
     "heartbeatIntervalMs": { "index": 29, "kind": "property", "displayName": 
"Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 3000, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The expected time 
between heartbeats to the consumer coordinator  [...]
@@ -195,7 +195,7 @@
     "fetchWaitMaxMs": { "index": 22, "kind": "parameter", "displayName": 
"Fetch Wait Max Ms", "group": "consumer", "label": "consumer", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 500, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The maximum amount of time the server will 
block before answering the fetc [...]
     "groupId": { "index": 23, "kind": "parameter", "displayName": "Group Id", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "A string that uniquely identifies the group of 
consumer processes to which this consumer belongs. By setting the  [...]
     "groupInstanceId": { "index": 24, "kind": "parameter", "displayName": 
"Group Instance Id", "group": "consumer", "label": "consumer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "A unique identifier of the consumer instance 
provided by the end user. Only non-empty strings ar [...]
-    "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group 
Protocol", "group": "consumer", "label": "consumer", "required": false, "type": 
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"classic", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The consumer group protocol to use. The c [...]
+    "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group 
Protocol", "group": "consumer", "label": "consumer", "required": false, "type": 
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"classic", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The consumer group protocol to use. The c [...]
     "groupRemoteAssignor": { "index": 26, "kind": "parameter", "displayName": 
"Group Remote Assignor", "group": "consumer", "label": "consumer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The name of the server-side assignor to use 
when group.protocol is set to consumer. If n [...]
     "headerDeserializer": { "index": 27, "kind": "parameter", "displayName": 
"Header Deserializer", "group": "consumer", "label": "consumer", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom KafkaHeaderDeserializer to de 
[...]
     "heartbeatIntervalMs": { "index": 28, "kind": "parameter", "displayName": 
"Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 3000, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The expected time 
between heartbeats to the consumer coordinator [...]
diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
index 51a4fd91ec8c..457e0e224993 100644
--- 
a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json
@@ -50,7 +50,7 @@
     "fetchWaitMaxMs": { "index": 23, "kind": "property", "displayName": "Fetch 
Wait Max Ms", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 500, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The maximum amount of time the server will 
block before answering the fetch [...]
     "groupId": { "index": 24, "kind": "property", "displayName": "Group Id", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "A string that uniquely identifies the group of 
consumer processes to which this consumer belongs. By setting the s [...]
     "groupInstanceId": { "index": 25, "kind": "property", "displayName": 
"Group Instance Id", "group": "consumer", "label": "consumer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "A unique identifier of the consumer instance 
provided by the end user. Only non-empty strings are [...]
-    "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group 
Protocol", "group": "consumer", "label": "consumer", "required": false, "type": 
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"classic", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The consumer group protocol to use. The cl 
[...]
+    "groupProtocol": { "index": 26, "kind": "property", "displayName": "Group 
Protocol", "group": "consumer", "label": "consumer", "required": false, "type": 
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"classic", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The consumer group protocol to use. The cl 
[...]
     "groupRemoteAssignor": { "index": 27, "kind": "property", "displayName": 
"Group Remote Assignor", "group": "consumer", "label": "consumer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The name of the server-side assignor to use 
when group.protocol is set to consumer. If no [...]
     "headerDeserializer": { "index": 28, "kind": "property", "displayName": 
"Header Deserializer", "group": "consumer", "label": "consumer", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom KafkaHeaderDeserializer to des 
[...]
     "heartbeatIntervalMs": { "index": 29, "kind": "property", "displayName": 
"Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 3000, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The expected time 
between heartbeats to the consumer coordinator  [...]
@@ -195,7 +195,7 @@
     "fetchWaitMaxMs": { "index": 22, "kind": "parameter", "displayName": 
"Fetch Wait Max Ms", "group": "consumer", "label": "consumer", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 500, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The maximum amount of time the server will 
block before answering the fetc [...]
     "groupId": { "index": 23, "kind": "parameter", "displayName": "Group Id", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "A string that uniquely identifies the group of 
consumer processes to which this consumer belongs. By setting the  [...]
     "groupInstanceId": { "index": 24, "kind": "parameter", "displayName": 
"Group Instance Id", "group": "consumer", "label": "consumer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "A unique identifier of the consumer instance 
provided by the end user. Only non-empty strings ar [...]
-    "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group 
Protocol", "group": "consumer", "label": "consumer", "required": false, "type": 
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"classic", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The consumer group protocol to use. The c [...]
+    "groupProtocol": { "index": 25, "kind": "parameter", "displayName": "Group 
Protocol", "group": "consumer", "label": "consumer", "required": false, "type": 
"enum", "javaType": "java.lang.String", "enum": [ "classic", "consumer" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"classic", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The consumer group protocol to use. The c [...]
     "groupRemoteAssignor": { "index": 26, "kind": "parameter", "displayName": 
"Group Remote Assignor", "group": "consumer", "label": "consumer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The name of the server-side assignor to use 
when group.protocol is set to consumer. If n [...]
     "headerDeserializer": { "index": 27, "kind": "parameter", "displayName": 
"Header Deserializer", "group": "consumer", "label": "consumer", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer", "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom KafkaHeaderDeserializer to de 
[...]
     "heartbeatIntervalMs": { "index": 28, "kind": "parameter", "displayName": 
"Heartbeat Interval Ms", "group": "consumer", "label": "consumer", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 3000, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The expected time 
between heartbeats to the consumer coordinator [...]
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index dc275a1703b8..f7d9c196aee0 100755
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -566,16 +566,22 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         addPropertyIfNotEmpty(props, 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer());
         addPropertyIfNotEmpty(props, ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
getFetchMinBytes());
         addPropertyIfNotEmpty(props, ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
getFetchMaxBytes());
-        addPropertyIfNotEmpty(props, 
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
+        // classic-only properties: skip when using the new consumer protocol 
(KIP-848)
+        boolean classicProtocol = !"consumer".equals(getGroupProtocol());
+        if (classicProtocol) {
+            addPropertyIfNotEmpty(props, 
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
+            addPropertyIfNotEmpty(props, 
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
+        }
         addPropertyIfNotEmpty(props, 
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
-        addPropertyIfNotEmpty(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
getSessionTimeoutMs());
         addPropertyIfNotEmpty(props, 
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs());
         addPropertyIfNotEmpty(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
getMaxPollRecords());
         addPropertyIfNotEmpty(props, 
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, getInterceptorClasses());
         addPropertyIfNotEmpty(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
getAutoOffsetReset());
         addPropertyIfNotEmpty(props, 
ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
         addPropertyIfNotEmpty(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
getAutoCommitEnable());
-        addPropertyIfNotEmpty(props, 
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
+        if (classicProtocol) {
+            addPropertyIfNotEmpty(props, 
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
+        }
         addPropertyIfNotEmpty(props, ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
getGroupProtocol());
         addPropertyIfNotEmpty(props, 
ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, getGroupRemoteAssignor());
         addPropertyIfNotEmpty(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
getReceiveBufferBytes());
@@ -1916,6 +1922,9 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
      * The consumer group protocol to use. The "classic" protocol uses the 
traditional partition assignment and
      * rebalancing mechanism. The "consumer" protocol enables the new KIP-848 
consumer rebalance protocol which provides
      * faster and more efficient rebalancing.
+     *
+     * When set to "consumer", classic-only properties (heartbeatIntervalMs, 
sessionTimeoutMs, partitionAssignor) are
+     * automatically excluded from the consumer configuration.
      */
     public void setGroupProtocol(String groupProtocol) {
         this.groupProtocol = groupProtocol;
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerGroupProtocolIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerGroupProtocolIT.java
new file mode 100644
index 000000000000..7838be0c2678
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerGroupProtocolIT.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.integration.common.KafkaAdminUtil;
+import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.infra.core.annotations.RouteFixture;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.FeatureMetadata;
+import org.apache.kafka.clients.admin.FinalizedVersionRange;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test for the KIP-848 consumer rebalance protocol 
(group.protocol=consumer). Verifies that Camel's Kafka
+ * consumer works correctly with the new consumer protocol, which does not use 
classic-only properties like
+ * heartbeat.interval.ms, session.timeout.ms, and 
partition.assignment.strategy.
+ *
+ * This test requires a Kafka 4.0+ broker with the new group coordinator 
enabled (group.version >= 1).
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumerGroupProtocolIT extends BaseKafkaTestSupport {
+    public static final String TOPIC = "test-group-protocol-" + 
Uuid.randomUuid();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerGroupProtocolIT.class);
+
+    private static final String FROM_URI = "kafka:" + TOPIC
+                                           + 
"?groupId=KafkaConsumerGroupProtocolIT&autoOffsetReset=earliest"
+                                           + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                                           + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                                           + 
"&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
+                                           + 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor"
+                                           + "&groupProtocol=consumer";
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeAll
+    static void checkConsumerProtocolSupport() {
+        try (AdminClient adminClient = 
KafkaAdminUtil.createAdminClient(service)) {
+            FeatureMetadata metadata = 
adminClient.describeFeatures().featureMetadata().get(10, TimeUnit.SECONDS);
+            Map<String, FinalizedVersionRange> finalizedFeatures = 
metadata.finalizedFeatures();
+            FinalizedVersionRange groupVersion = 
finalizedFeatures.get("group.version");
+            Assumptions.assumeTrue(
+                    groupVersion != null && groupVersion.maxVersionLevel() >= 
1,
+                    "Broker does not support the consumer group protocol 
(KIP-848), requires Kafka 4.0+ with group.version >= 1");
+        } catch (Exception e) {
+            Assumptions.assumeTrue(false,
+                    "Could not determine broker feature support: " + 
e.getMessage());
+        }
+    }
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+    }
+
+    @RouteFixture
+    @Override
+    public void createRouteBuilder(CamelContext context) throws Exception {
+        context.addRoutes(createRouteBuilder());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(FROM_URI)
+                        .process(exchange -> LOG.trace("Captured on the 
processor: {}", exchange.getMessage().getBody()))
+                        
.routeId("group-protocol-it").to(KafkaTestUtil.MOCK_RESULT);
+            }
+        };
+    }
+
+    @Test
+    public void kafkaMessageIsConsumedWithConsumerProtocol() throws 
InterruptedException {
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        String skippedHeaderKey = "CamelSkippedHeader";
+
+        MockEndpoint to = 
contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
+
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
+        to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped 
header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, 
propagatedHeaderValue));
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+
+        assertEquals(5, MockConsumerInterceptor.recordsCaptured.stream()
+                .flatMap(i -> 
StreamSupport.stream(i.records(TOPIC).spliterator(), false)).count());
+
+        java.util.Map<String, Object> headers = 
to.getExchanges().get(0).getIn().getHeaders();
+        assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive 
skipped header");
+        assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive 
propagated header");
+    }
+}
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
index 3d9244a9d699..1587daec1855 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
@@ -123,6 +123,19 @@ Google recommends migrating your Pub/Sub Lite workloads to 
either:
 * **Google Cloud Pub/Sub** → use the `camel-google-pubsub` component
 * **Google Cloud Managed Service for Apache Kafka** → use the `camel-kafka` 
component
 
+=== camel-kafka
+
+The Kafka client library has been upgraded from 3.9.x to 4.2.0. This is a 
major upgrade with the following notable changes:
+
+* The `lingerMs` option default value has changed from `0` to `5` to align 
with the Kafka 4.x client default for `linger.ms`.
+* Two new consumer options have been added: `groupProtocol` (default 
`classic`, enums: `classic`, `consumer`) and `groupRemoteAssignor`. These 
support the KIP-848 consumer rebalance protocol introduced in Kafka 4.0. When 
`groupProtocol` is set to `consumer`, the classic-only properties 
(`heartbeatIntervalMs`, `sessionTimeoutMs`, `partitionAssignor`) are 
automatically excluded from the consumer configuration.
+* The removed `DefaultPartitioner` and `UniformStickyPartitioner` classes are 
no longer available. If you were explicitly setting `partitioner` to one of 
these classes, remove the configuration as Kafka 4.x uses the default 
partitioner automatically.
+
+The test infrastructure for Kafka has been updated:
+
+* The `local-kafka3-container` mapping has been renamed to 
`local-kafka-container`. If you were using 
`-Dkafka.instance.type=local-kafka3-container`, update to 
`-Dkafka.instance.type=local-kafka-container`.
+* The Strimzi test service now runs in KRaft mode (no ZooKeeper). The 
`ZookeeperContainer` class has been removed.
+
 === camel-mail
 
 When configured a custom `IdempotentRepository` on `camel-mail` endpoint, then 
Camel will now auto-start
diff --git 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 78eba1c22b1c..71f9c7d38a36 100644
--- 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -590,7 +590,10 @@ public interface KafkaComponentBuilderFactory {
          * The consumer group protocol to use. The classic protocol uses the
          * traditional partition assignment and rebalancing mechanism. The
          * consumer protocol enables the new KIP-848 consumer rebalance 
protocol
-         * which provides faster and more efficient rebalancing.
+         * which provides faster and more efficient rebalancing. When set to
+         * consumer, classic-only properties (heartbeatIntervalMs,
+         * sessionTimeoutMs, partitionAssignor) are automatically excluded from
+         * the consumer configuration.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 2b5c7b2978f9..9dc46314a149 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -871,7 +871,10 @@ public interface KafkaEndpointBuilderFactory {
          * The consumer group protocol to use. The classic protocol uses the
          * traditional partition assignment and rebalancing mechanism. The
          * consumer protocol enables the new KIP-848 consumer rebalance 
protocol
-         * which provides faster and more efficient rebalancing.
+         * which provides faster and more efficient rebalancing. When set to
+         * consumer, classic-only properties (heartbeatIntervalMs,
+         * sessionTimeoutMs, partitionAssignor) are automatically excluded from
+         * the consumer configuration.
          * 
          * The option is a: <code>java.lang.String</code> type.
          * 
diff --git 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
index 9b6881ee970a..31114c890770 100644
--- 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
+++ 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
@@ -21,8 +21,7 @@ public final class KafkaProperties {
     public static final String KAFKA_BOOTSTRAP_SERVERS = 
"kafka.bootstrap.servers";
     public static final String KAFKA_ZOOKEEPER_ADDRESS = 
"kafka.zookeeper.address";
     public static final String CONFLUENT_CONTAINER = 
"confluent.container.image";
-    public static final String KAFKA_CONTAINER = "kafka.container";
-    public static final String KAFKA_CONTAINER_IMAGE = "kafka.container.image";
+    public static final String KAFKA_CONTAINER = "kafka.container.image";
     public static final String REDPANDA_CONTAINER = "redpanda.container.image";
     public static final String STRIMZI_CONTAINER = "strimzi.container.image";
 
diff --git 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java
 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java
index 71e6b883faad..80fc7143d1f8 100644
--- 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java
+++ 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaInfraService.java
@@ -33,7 +33,7 @@ import org.testcontainers.utility.DockerImageName;
 public class ContainerLocalKafkaInfraService implements KafkaInfraService, 
ContainerService<KafkaContainer> {
     public static final String KAFKA_IMAGE_NAME = 
LocalPropertyResolver.getProperty(
             ContainerLocalKafkaInfraService.class,
-            KafkaProperties.KAFKA_CONTAINER_IMAGE);
+            KafkaProperties.KAFKA_CONTAINER);
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ContainerLocalKafkaInfraService.class);
     protected KafkaContainer kafka;


Reply via email to