This is an automated email from the ASF dual-hosted git repository.
squah-confluent pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f7dbf0bf3bb KAFKA-20570: Catch RuntimeException in ConsumerProtocol
deserialization in group coordinator (#22264)
f7dbf0bf3bb is described below
commit f7dbf0bf3bbbf7d0e035f72b3370d347361e53ae
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Thu May 21 11:40:32 2026 -0400
KAFKA-20570: Catch RuntimeException in ConsumerProtocol deserialization in
group coordinator (#22264)
The `deserialize*` methods in `ConsumerProtocol` only catch
`BufferUnderflowException` and re-wrap it as `SchemaException`.
Malformed bytes can also surface as other `RuntimeException`s (e.g.
`IllegalArgumentException` from negative array lengths in
`ByteBufferAccessor`), which escape these methods. Callers that only
guard against `SchemaException` then propagate the failure, which can
destabilize the group coordinator.
### Changes
In `ConsumerProtocol`, broaden the `catch` clause from
`BufferUnderflowException` to `RuntimeException` in all four
deserialization entry points, re-wrapping as `SchemaException`:
- `deserializeSubscription`
- `deserializeConsumerProtocolSubscription`
- `deserializeAssignment`
- `deserializeConsumerProtocolAssignment`
Reviewers: Sean Quah <[email protected]>, David Jacot
<[email protected]>
---
.../consumer/internals/ConsumerProtocol.java | 16 +--
.../consumer/internals/ConsumerProtocolTest.java | 85 +++++++++++++
.../group/GroupMetadataManagerTest.java | 140 +++++++++++++++++++++
.../group/classic/ClassicGroupTest.java | 31 +++++
4 files changed, 264 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index aa9d0cc40e4..b334824e8e3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -116,8 +116,8 @@ public class ConsumerProtocol {
ownedPartitions,
data.generationId(),
data.rackId() == null || data.rackId().isEmpty() ?
Optional.empty() : Optional.of(data.rackId()));
- } catch (BufferUnderflowException e) {
- throw new SchemaException("Buffer underflow while parsing consumer
protocol's subscription", e);
+ } catch (RuntimeException e) {
+ throw new SchemaException("Malformed consumer protocol
subscription", e);
}
}
@@ -133,8 +133,8 @@ public class ConsumerProtocol {
try {
return new ConsumerProtocolSubscription(new
ByteBufferAccessor(buffer), version);
- } catch (BufferUnderflowException e) {
- throw new SchemaException("Buffer underflow while parsing consumer
protocol's subscription", e);
+ } catch (RuntimeException e) {
+ throw new SchemaException("Malformed consumer protocol
subscription", e);
}
}
@@ -186,8 +186,8 @@ public class ConsumerProtocol {
return new Assignment(
assignedPartitions,
data.userData() != null ? data.userData().duplicate() : null);
- } catch (BufferUnderflowException e) {
- throw new SchemaException("Buffer underflow while parsing consumer
protocol's assignment", e);
+ } catch (RuntimeException e) {
+ throw new SchemaException("Malformed consumer protocol
assignment", e);
}
}
@@ -203,8 +203,8 @@ public class ConsumerProtocol {
try {
return new ConsumerProtocolAssignment(new
ByteBufferAccessor(buffer), version);
- } catch (BufferUnderflowException e) {
- throw new SchemaException("Buffer underflow while parsing consumer
protocol's assignment", e);
+ } catch (RuntimeException e) {
+ throw new SchemaException("Malformed consumer protocol
assignment", e);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 3f32a22b364..9d1ff526626 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -24,6 +24,7 @@ import
org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
@@ -39,9 +40,11 @@ import java.util.Optional;
import java.util.Set;
import static
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConsumerProtocolTest {
@@ -356,4 +359,86 @@ public class ConsumerProtocolTest {
return buffer;
}
+
+ @Test
+ public void
deserializeSubscriptionThrowsSchemaExceptionForEveryTruncation() {
+ Subscription subscription = new Subscription(
+ Arrays.asList("foo", "bar"),
+ ByteBuffer.wrap(new byte[]{0x01, 0x02}),
+ Arrays.asList(new TopicPartition("foo", 0), new
TopicPartition("bar", 1)),
+ DEFAULT_GENERATION,
+ Optional.of("rack"));
+ ByteBuffer serialized =
ConsumerProtocol.serializeSubscription(subscription,
+ ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION);
+ byte[] serializedBytes = new byte[serialized.remaining()];
+ serialized.duplicate().get(serializedBytes);
+
+ for (int len = 0; len < serializedBytes.length; len++) {
+ byte[] truncated = Arrays.copyOf(serializedBytes, len);
+ assertThrows(SchemaException.class,
+ () ->
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(truncated)),
+ "Expected SchemaException for subscription truncated to length
" + len);
+ }
+ assertDoesNotThrow(() ->
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(serializedBytes)));
+ }
+
+ @Test
+ public void deserializeAssignmentThrowsSchemaExceptionForEveryTruncation()
{
+ Assignment assignment = new Assignment(
+ Arrays.asList(new TopicPartition("foo", 0), new
TopicPartition("bar", 1)),
+ ByteBuffer.wrap(new byte[]{0x01, 0x02}));
+ ByteBuffer serialized =
ConsumerProtocol.serializeAssignment(assignment,
+ ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION);
+ byte[] serializedBytes = new byte[serialized.remaining()];
+ serialized.duplicate().get(serializedBytes);
+
+ for (int len = 0; len < serializedBytes.length; len++) {
+ byte[] truncated = Arrays.copyOf(serializedBytes, len);
+ assertThrows(SchemaException.class,
+ () ->
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(truncated)),
+ "Expected SchemaException for assignment truncated to length "
+ len);
+ }
+ assertDoesNotThrow(() ->
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(serializedBytes)));
+ }
+
+ @Test
+ public void
deserializeConsumerProtocolSubscriptionThrowsSchemaExceptionForEveryTruncation()
{
+ Subscription subscription = new Subscription(
+ Arrays.asList("foo", "bar"),
+ ByteBuffer.wrap(new byte[]{0x01, 0x02}),
+ Arrays.asList(new TopicPartition("foo", 0), new
TopicPartition("bar", 1)),
+ DEFAULT_GENERATION,
+ Optional.of("rack"));
+ ByteBuffer serialized =
ConsumerProtocol.serializeSubscription(subscription,
+ ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION);
+ byte[] serializedBytes = new byte[serialized.remaining()];
+ serialized.duplicate().get(serializedBytes);
+
+ for (int len = 0; len < serializedBytes.length; len++) {
+ byte[] truncated = Arrays.copyOf(serializedBytes, len);
+ assertThrows(SchemaException.class,
+ () ->
ConsumerProtocol.deserializeConsumerProtocolSubscription(ByteBuffer.wrap(truncated)),
+ "Expected SchemaException for ConsumerProtocolSubscription
truncated to length " + len);
+ }
+ assertDoesNotThrow(() ->
ConsumerProtocol.deserializeConsumerProtocolSubscription(ByteBuffer.wrap(serializedBytes)));
+ }
+
+ @Test
+ public void
deserializeConsumerProtocolAssignmentThrowsSchemaExceptionForEveryTruncation() {
+ Assignment assignment = new Assignment(
+ Arrays.asList(new TopicPartition("foo", 0), new
TopicPartition("bar", 1)),
+ ByteBuffer.wrap(new byte[]{0x01, 0x02}));
+ ByteBuffer serialized =
ConsumerProtocol.serializeAssignment(assignment,
+ ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION);
+ byte[] serializedBytes = new byte[serialized.remaining()];
+ serialized.duplicate().get(serializedBytes);
+
+ for (int len = 0; len < serializedBytes.length; len++) {
+ byte[] truncated = Arrays.copyOf(serializedBytes, len);
+ assertThrows(SchemaException.class,
+ () ->
ConsumerProtocol.deserializeConsumerProtocolAssignment(ByteBuffer.wrap(truncated)),
+ "Expected SchemaException for ConsumerProtocolAssignment
truncated to length " + len);
+ }
+ assertDoesNotThrow(() ->
ConsumerProtocol.deserializeConsumerProtocolAssignment(ByteBuffer.wrap(serializedBytes)));
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index d208e09760b..30ef688d8a5 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -12172,6 +12172,146 @@ public class GroupMetadataManagerTest {
}
}
+ @Test
+ public void
testConsumerGroupHeartbeatWithStableClassicGroupFailsOnMalformedProtocol() {
+ String groupId = "group-id";
+ String memberId1 = "member-id-1";
+ String memberId2 = "member-id-2";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Throws RuntimeException when read
+ byte[] poisonMetadata = new byte[]{
+ 0, 1, // version
(int16) = 1
+ (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF // topics array
length (int32) = -1
+ };
+
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(poisonMetadata));
+
+ Map<String, byte[]> assignments = Map.of(
+ memberId1,
+ Utils.toArray(ConsumerProtocol.serializeAssignment(
+ new ConsumerPartitionAssignor.Assignment(List.of(new
TopicPartition(fooTopicName, 0)))))
+ );
+
+ ClassicGroup group = context.createClassicGroup(groupId);
+ group.setProtocolName(Optional.of("range"));
+ group.add(
+ new ClassicGroupMember(
+ memberId1,
+ Optional.empty(),
+ "client-id",
+ "client-host",
+ 10000,
+ 5000,
+ "consumer",
+ protocols,
+ assignments.get(memberId1)
+ )
+ );
+
+ group.transitionTo(PREPARING_REBALANCE);
+ group.transitionTo(COMPLETING_REBALANCE);
+ group.transitionTo(STABLE);
+
+
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group,
assignments));
+ context.commit();
+
+ // A new member 2 with the new protocol joins the classic group,
triggering the upgrade.
+ ConsumerGroupHeartbeatRequestData request = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setTopicPartitions(List.of());
+
+ Exception ex = assertThrows(GroupIdNotFoundException.class,
+ () -> context.consumerGroupHeartbeat(request));
+ assertEquals(
+ "Cannot upgrade classic group group-id to consumer group because
the embedded consumer protocol is malformed.",
+ ex.getMessage()
+ );
+ }
+
+ @Test
+ public void
testClassicGroupJoinToConsumerGroupFailsOnMalformedSubscriptionMetadata() {
+ String groupId = "group-id";
+ String existingMemberId = Uuid.randomUuid().toString();
+ String newMemberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ ConsumerGroupMember existingMember = new
ConsumerGroupMember.Builder(existingMemberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)), 10))
+ .build();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new
MockPartitionAssignor("range")))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(existingMember)
+ .withAssignment(existingMemberId,
mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))))
+ .build();
+
+ // Throws RuntimeException when read.
+ byte[] poisonMetadata = new byte[]{
+ 0, 1, // version
(int16) = 1
+ (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF // topics array
length (int32) = -1
+ };
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(poisonMetadata));
+
+ JoinGroupRequestData joinRequest = new JoinGroupRequestData()
+ .setGroupId(groupId)
+ .setMemberId(newMemberId)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setProtocols(protocols)
+ .setSessionTimeoutMs(5000)
+ .setRebalanceTimeoutMs(45000);
+
+ IllegalStateException ex = assertThrows(IllegalStateException.class,
+ () -> context.sendClassicGroupJoin(joinRequest));
+ assertEquals("Malformed embedded consumer protocol in subscription
deserialization.", ex.getMessage());
+ }
+
@Test
public void
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
String groupId = "group-id";
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 2fad4d9b3a9..2696bdd2c51 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1264,6 +1264,37 @@ public class ClassicGroupTest {
assertTrue(group.isSubscribedToTopic("topic"));
}
+ @Test
+ public void testComputeSubscribedTopicsHandlesMalformedMemberMetadata() {
+ ClassicGroup group = new ClassicGroup(logContext, "groupId", EMPTY,
Time.SYSTEM);
+
+ JoinGroupRequestProtocolCollection protocols = new
JoinGroupRequestProtocolCollection();
+ protocols.add(new JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(new byte[]{
+ 0, 1, // version
(int16) = 1
+ (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF // topics
array length (int32) = -1
+ }));
+
+ ClassicGroupMember poisonMember = new ClassicGroupMember(
+ "poisonMember",
+ Optional.empty(),
+ clientId,
+ clientHost,
+ rebalanceTimeoutMs,
+ sessionTimeoutMs,
+ "consumer",
+ protocols
+ );
+
+ group.add(poisonMember);
+ group.transitionTo(PREPARING_REBALANCE);
+ group.initNextGeneration();
+
+ // RuntimeException should not propagate; falls through to
Optional.empty().
+ assertEquals(Optional.empty(), group.computeSubscribedTopics());
+ }
+
@Test
public void testIsInStates() {
ClassicGroup group = new ClassicGroup(new LogContext(), "groupId",
EMPTY, Time.SYSTEM);