This is an automated email from the ASF dual-hosted git repository.

squah-confluent pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f7dbf0bf3bb KAFKA-20570: Catch RuntimeException in ConsumerProtocol 
deserialization in group coordinator (#22264)
f7dbf0bf3bb is described below

commit f7dbf0bf3bbbf7d0e035f72b3370d347361e53ae
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Thu May 21 11:40:32 2026 -0400

    KAFKA-20570: Catch RuntimeException in ConsumerProtocol deserialization in 
group coordinator (#22264)
    
    The `deserialize*` methods in `ConsumerProtocol` only catch
    `BufferUnderflowException` and re-wrap it as `SchemaException`.
    Malformed bytes can also surface as other `RuntimeException`s (e.g.
    `IllegalArgumentException` from negative array lengths in
    `ByteBufferAccessor`), which escape these methods. Callers that only
    guard against `SchemaException` then propagate the failure, which can
    destabilize the group coordinator.
    
      ### Changes
    
      In `ConsumerProtocol`, broaden the `catch` clause from
    `BufferUnderflowException` to `RuntimeException` in all four
    deserialization entry points, re-wrapping as `SchemaException`:
    
      - `deserializeSubscription`
      - `deserializeConsumerProtocolSubscription`
      - `deserializeAssignment`
      - `deserializeConsumerProtocolAssignment`
    
    Reviewers: Sean Quah <[email protected]>, David Jacot
     <[email protected]>
---
 .../consumer/internals/ConsumerProtocol.java       |  16 +--
 .../consumer/internals/ConsumerProtocolTest.java   |  85 +++++++++++++
 .../group/GroupMetadataManagerTest.java            | 140 +++++++++++++++++++++
 .../group/classic/ClassicGroupTest.java            |  31 +++++
 4 files changed, 264 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index aa9d0cc40e4..b334824e8e3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -116,8 +116,8 @@ public class ConsumerProtocol {
                 ownedPartitions,
                 data.generationId(),
                 data.rackId() == null || data.rackId().isEmpty() ? 
Optional.empty() : Optional.of(data.rackId()));
-        } catch (BufferUnderflowException e) {
-            throw new SchemaException("Buffer underflow while parsing consumer 
protocol's subscription", e);
+        } catch (RuntimeException e) {
+            throw new SchemaException("Malformed consumer protocol 
subscription", e);
         }
     }
 
@@ -133,8 +133,8 @@ public class ConsumerProtocol {
 
         try {
             return new ConsumerProtocolSubscription(new 
ByteBufferAccessor(buffer), version);
-        } catch (BufferUnderflowException e) {
-            throw new SchemaException("Buffer underflow while parsing consumer 
protocol's subscription", e);
+        } catch (RuntimeException e) {
+            throw new SchemaException("Malformed consumer protocol 
subscription", e);
         }
     }
 
@@ -186,8 +186,8 @@ public class ConsumerProtocol {
             return new Assignment(
                 assignedPartitions,
                 data.userData() != null ? data.userData().duplicate() : null);
-        } catch (BufferUnderflowException e) {
-            throw new SchemaException("Buffer underflow while parsing consumer 
protocol's assignment", e);
+        } catch (RuntimeException e) {
+            throw new SchemaException("Malformed consumer protocol 
assignment", e);
         }
     }
 
@@ -203,8 +203,8 @@ public class ConsumerProtocol {
 
         try {
             return new ConsumerProtocolAssignment(new 
ByteBufferAccessor(buffer), version);
-        } catch (BufferUnderflowException e) {
-            throw new SchemaException("Buffer underflow while parsing consumer 
protocol's assignment", e);
+        } catch (RuntimeException e) {
+            throw new SchemaException("Malformed consumer protocol 
assignment", e);
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 3f32a22b364..9d1ff526626 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.common.message.ConsumerProtocolSubscription;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.protocol.types.Type;
 
@@ -39,9 +40,11 @@ import java.util.Optional;
 import java.util.Set;
 
 import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ConsumerProtocolTest {
@@ -356,4 +359,86 @@ public class ConsumerProtocolTest {
 
         return buffer;
     }
+
+    @Test
+    public void 
deserializeSubscriptionThrowsSchemaExceptionForEveryTruncation() {
+        Subscription subscription = new Subscription(
+            Arrays.asList("foo", "bar"),
+            ByteBuffer.wrap(new byte[]{0x01, 0x02}),
+            Arrays.asList(new TopicPartition("foo", 0), new 
TopicPartition("bar", 1)),
+            DEFAULT_GENERATION,
+            Optional.of("rack"));
+        ByteBuffer serialized = 
ConsumerProtocol.serializeSubscription(subscription,
+            ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION);
+        byte[] serializedBytes = new byte[serialized.remaining()];
+        serialized.duplicate().get(serializedBytes);
+
+        for (int len = 0; len < serializedBytes.length; len++) {
+            byte[] truncated = Arrays.copyOf(serializedBytes, len);
+            assertThrows(SchemaException.class,
+                () -> 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(truncated)),
+                "Expected SchemaException for subscription truncated to length 
" + len);
+        }
+        assertDoesNotThrow(() -> 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(serializedBytes)));
+    }
+
+    @Test
+    public void deserializeAssignmentThrowsSchemaExceptionForEveryTruncation() 
{
+        Assignment assignment = new Assignment(
+            Arrays.asList(new TopicPartition("foo", 0), new 
TopicPartition("bar", 1)),
+            ByteBuffer.wrap(new byte[]{0x01, 0x02}));
+        ByteBuffer serialized = 
ConsumerProtocol.serializeAssignment(assignment,
+            ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION);
+        byte[] serializedBytes = new byte[serialized.remaining()];
+        serialized.duplicate().get(serializedBytes);
+
+        for (int len = 0; len < serializedBytes.length; len++) {
+            byte[] truncated = Arrays.copyOf(serializedBytes, len);
+            assertThrows(SchemaException.class,
+                () -> 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(truncated)),
+                "Expected SchemaException for assignment truncated to length " 
+ len);
+        }
+        assertDoesNotThrow(() -> 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(serializedBytes)));
+    }
+
+    @Test
+    public void 
deserializeConsumerProtocolSubscriptionThrowsSchemaExceptionForEveryTruncation()
 {
+        Subscription subscription = new Subscription(
+            Arrays.asList("foo", "bar"),
+            ByteBuffer.wrap(new byte[]{0x01, 0x02}),
+            Arrays.asList(new TopicPartition("foo", 0), new 
TopicPartition("bar", 1)),
+            DEFAULT_GENERATION,
+            Optional.of("rack"));
+        ByteBuffer serialized = 
ConsumerProtocol.serializeSubscription(subscription,
+            ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION);
+        byte[] serializedBytes = new byte[serialized.remaining()];
+        serialized.duplicate().get(serializedBytes);
+
+        for (int len = 0; len < serializedBytes.length; len++) {
+            byte[] truncated = Arrays.copyOf(serializedBytes, len);
+            assertThrows(SchemaException.class,
+                () -> 
ConsumerProtocol.deserializeConsumerProtocolSubscription(ByteBuffer.wrap(truncated)),
+                "Expected SchemaException for ConsumerProtocolSubscription 
truncated to length " + len);
+        }
+        assertDoesNotThrow(() -> 
ConsumerProtocol.deserializeConsumerProtocolSubscription(ByteBuffer.wrap(serializedBytes)));
+    }
+
+    @Test
+    public void 
deserializeConsumerProtocolAssignmentThrowsSchemaExceptionForEveryTruncation() {
+        Assignment assignment = new Assignment(
+            Arrays.asList(new TopicPartition("foo", 0), new 
TopicPartition("bar", 1)),
+            ByteBuffer.wrap(new byte[]{0x01, 0x02}));
+        ByteBuffer serialized = 
ConsumerProtocol.serializeAssignment(assignment,
+            ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION);
+        byte[] serializedBytes = new byte[serialized.remaining()];
+        serialized.duplicate().get(serializedBytes);
+
+        for (int len = 0; len < serializedBytes.length; len++) {
+            byte[] truncated = Arrays.copyOf(serializedBytes, len);
+            assertThrows(SchemaException.class,
+                () -> 
ConsumerProtocol.deserializeConsumerProtocolAssignment(ByteBuffer.wrap(truncated)),
+                "Expected SchemaException for ConsumerProtocolAssignment 
truncated to length " + len);
+        }
+        assertDoesNotThrow(() -> 
ConsumerProtocol.deserializeConsumerProtocolAssignment(ByteBuffer.wrap(serializedBytes)));
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index d208e09760b..30ef688d8a5 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -12172,6 +12172,146 @@ public class GroupMetadataManagerTest {
         }
     }
 
+    @Test
+    public void 
testConsumerGroupHeartbeatWithStableClassicGroupFailsOnMalformedProtocol() {
+        String groupId = "group-id";
+        String memberId1 = "member-id-1";
+        String memberId2 = "member-id-2";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Throws RuntimeException when read
+        byte[] poisonMetadata = new byte[]{
+            0, 1,                                              // version 
(int16) = 1
+            (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF // topics array 
length (int32) = -1
+        };
+
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(poisonMetadata));
+
+        Map<String, byte[]> assignments = Map.of(
+            memberId1,
+            Utils.toArray(ConsumerProtocol.serializeAssignment(
+                new ConsumerPartitionAssignor.Assignment(List.of(new 
TopicPartition(fooTopicName, 0)))))
+        );
+
+        ClassicGroup group = context.createClassicGroup(groupId);
+        group.setProtocolName(Optional.of("range"));
+        group.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.empty(),
+                "client-id",
+                "client-host",
+                10000,
+                5000,
+                "consumer",
+                protocols,
+                assignments.get(memberId1)
+            )
+        );
+
+        group.transitionTo(PREPARING_REBALANCE);
+        group.transitionTo(COMPLETING_REBALANCE);
+        group.transitionTo(STABLE);
+
+        
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, 
assignments));
+        context.commit();
+
+        // A new member 2 with the new protocol joins the classic group, 
triggering the upgrade.
+        ConsumerGroupHeartbeatRequestData request = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setRebalanceTimeoutMs(5000)
+            .setServerAssignor("range")
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setTopicPartitions(List.of());
+
+        Exception ex = assertThrows(GroupIdNotFoundException.class,
+            () -> context.consumerGroupHeartbeat(request));
+        assertEquals(
+            "Cannot upgrade classic group group-id to consumer group because 
the embedded consumer protocol is malformed.",
+            ex.getMessage()
+        );
+    }
+
+    @Test
+    public void 
testClassicGroupJoinToConsumerGroupFailsOnMalformedSubscriptionMetadata() {
+        String groupId = "group-id";
+        String existingMemberId = Uuid.randomUuid().toString();
+        String newMemberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        ConsumerGroupMember existingMember = new 
ConsumerGroupMember.Builder(existingMemberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0)), 10))
+            .build();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new 
MockPartitionAssignor("range")))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(existingMember)
+                .withAssignment(existingMemberId, 
mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+                ))))
+            .build();
+
+        // Throws RuntimeException when read.
+        byte[] poisonMetadata = new byte[]{
+            0, 1,                                              // version 
(int16) = 1
+            (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF // topics array 
length (int32) = -1
+        };
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(poisonMetadata));
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestData()
+            .setGroupId(groupId)
+            .setMemberId(newMemberId)
+            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .setProtocols(protocols)
+            .setSessionTimeoutMs(5000)
+            .setRebalanceTimeoutMs(45000);
+
+        IllegalStateException ex = assertThrows(IllegalStateException.class,
+            () -> context.sendClassicGroupJoin(joinRequest));
+        assertEquals("Malformed embedded consumer protocol in subscription 
deserialization.", ex.getMessage());
+    }
+
     @Test
     public void 
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
         String groupId = "group-id";
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 2fad4d9b3a9..2696bdd2c51 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1264,6 +1264,37 @@ public class ClassicGroupTest {
         assertTrue(group.isSubscribedToTopic("topic"));
     }
 
+    @Test
+    public void testComputeSubscribedTopicsHandlesMalformedMemberMetadata() {
+        ClassicGroup group = new ClassicGroup(logContext, "groupId", EMPTY, 
Time.SYSTEM);
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(new byte[]{
+                0, 1,                                              // version 
(int16) = 1
+                (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF // topics 
array length (int32) = -1
+            }));
+
+        ClassicGroupMember poisonMember = new ClassicGroupMember(
+            "poisonMember",
+            Optional.empty(),
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            "consumer",
+            protocols
+        );
+
+        group.add(poisonMember);
+        group.transitionTo(PREPARING_REBALANCE);
+        group.initNextGeneration();
+
+        // RuntimeException should not propagate; falls through to 
Optional.empty().
+        assertEquals(Optional.empty(), group.computeSubscribedTopics());
+    }
+
     @Test
     public void testIsInStates() {
         ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", 
EMPTY, Time.SYSTEM);

Reply via email to