This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 9596c45a068 KAFKA-20183: Share consumer group fails when group ID
contains colon character (#21471)
9596c45a068 is described below
commit 9596c45a0682a42f1c81845306b701e6813df585
Author: Federico Valeri <[email protected]>
AuthorDate: Mon Feb 16 11:11:23 2026 +0100
KAFKA-20183: Share consumer group fails when group ID contains colon
character (#21471)
Reviewers: Mickael Maison <[email protected]>, Andrew Schofield
<[email protected]>, Sushant Mahajan <[email protected]>
---
.../kafka/server/share/SharePartitionKey.java | 57 +++++----
.../kafka/server/share/SharePartitionKeyTest.java | 129 +++++++++++++++++++++
2 files changed, 162 insertions(+), 24 deletions(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
b/server-common/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
index 89f555b564e..8565064a597 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import java.util.Arrays;
import java.util.Objects;
/**
@@ -67,52 +68,60 @@ public class SharePartitionKey {
/**
- * Returns a SharePartitionKey from input string of format -
groupId:topicId:partition
+ * Returns a SharePartitionKey from input string of format -
groupId:topicId:partition.
+ * The key is parsed from the right: partition is the last segment,
topicId is
+ * the second-to-last, and groupId is everything before (which may contain
colons).
+ *
* @param key - String in format groupId:topicId:partition
* @return object representing SharePartitionKey
* @throws IllegalArgumentException if the key is empty or has invalid
format
*/
public static SharePartitionKey getInstance(String key) {
- validate(key);
- String[] tokens = key.split(":");
- return new SharePartitionKey(
- tokens[0].trim(),
- Uuid.fromString(tokens[1]),
- Integer.parseInt(tokens[2])
- );
- }
-
- /**
- * Validates whether the String argument has a valid SharePartitionKey
format - groupId:topicId:partition
- * @param key - String in format groupId:topicId:partition
- * @throws IllegalArgumentException if the key is empty or has invalid
format
- */
- public static void validate(String key) {
Objects.requireNonNull(key, "Share partition key cannot be null");
if (key.isEmpty()) {
throw new IllegalArgumentException("Share partition key cannot be
empty");
}
String[] tokens = key.split(":");
- if (tokens.length != 3) {
- throw new IllegalArgumentException("Invalid key format: expected -
groupId:topicId:partition, found - " + key);
+ if (tokens.length < 3) {
+ throw new IllegalArgumentException("Invalid key format: expected -
groupId:topicId:partition, found - " + key);
}
- if (tokens[0].trim().isEmpty()) {
- throw new IllegalArgumentException("GroupId must be alphanumeric
string");
+ int last = tokens.length - 1;
+ String partitionStr = tokens[last];
+ String topicIdStr = tokens[last - 1];
+ String groupId = String.join(":", Arrays.copyOfRange(tokens, 0, last -
1));
+
+ if (groupId.trim().isEmpty()) {
+ throw new IllegalArgumentException("GroupId must not be empty");
}
+ Uuid topicId;
try {
- Uuid.fromString(tokens[1]);
+ topicId = Uuid.fromString(topicIdStr);
} catch (Exception e) {
- throw new IllegalArgumentException("Invalid topic ID: " +
tokens[1], e);
+ throw new IllegalArgumentException("Invalid topic ID: " +
topicIdStr, e);
}
+ int partition;
try {
- Integer.parseInt(tokens[2]);
+ partition = Integer.parseInt(partitionStr);
} catch (Exception e) {
- throw new IllegalArgumentException("Invalid partition: " +
tokens[2], e);
+ throw new IllegalArgumentException("Invalid partition: " +
partitionStr, e);
}
+
+ return new SharePartitionKey(groupId, topicId, partition);
+ }
+
+ /**
+ * Validates whether the String argument has a valid SharePartitionKey
format - groupId:topicId:partition.
+ * The key is parsed from the right since groupId may contain colons.
+ *
+ * @param key - String in format groupId:topicId:partition
+ * @throws IllegalArgumentException if the key is empty or has invalid
format
+ */
+ public static void validate(String key) {
+ getInstance(key);
}
public static SharePartitionKey getInstance(String groupId, Uuid topicId,
int partition) {
diff --git
a/server-common/src/test/java/org/apache/kafka/server/share/SharePartitionKeyTest.java
b/server-common/src/test/java/org/apache/kafka/server/share/SharePartitionKeyTest.java
new file mode 100644
index 00000000000..b8c13d624bc
--- /dev/null
+++
b/server-common/src/test/java/org/apache/kafka/server/share/SharePartitionKeyTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class SharePartitionKeyTest {
+ @Test
+ public void testGetInstanceFromSimpleKey() {
+ Uuid topicId = Uuid.randomUuid();
+ String key = "my-group:" + topicId + ":3";
+ SharePartitionKey spk = SharePartitionKey.getInstance(key);
+ assertEquals("my-group", spk.groupId());
+ assertEquals(topicId, spk.topicId());
+ assertEquals(3, spk.partition());
+ }
+
+ @Test
+ public void testGetInstanceFromKeyWithColonInGroupId() {
+ Uuid topicId = Uuid.randomUuid();
+ String key = "abc:de:" + topicId + ":0";
+ SharePartitionKey spk = SharePartitionKey.getInstance(key);
+ assertEquals("abc:de", spk.groupId());
+ assertEquals(topicId, spk.topicId());
+ assertEquals(0, spk.partition());
+ }
+
+ @Test
+ public void testGetInstanceFromKeyWithMultipleColonsInGroupId() {
+ Uuid topicId = Uuid.randomUuid();
+ String key = "a:b:c:d:" + topicId + ":5";
+ SharePartitionKey spk = SharePartitionKey.getInstance(key);
+ assertEquals("a:b:c:d", spk.groupId());
+ assertEquals(topicId, spk.topicId());
+ assertEquals(5, spk.partition());
+ }
+
+ @Test
+ public void testGetInstanceFromKeyWithGroupIdStartingWithColon() {
+ Uuid topicId = Uuid.randomUuid();
+ String key = ":mygroup:" + topicId + ":2";
+ SharePartitionKey spk = SharePartitionKey.getInstance(key);
+ assertEquals(":mygroup", spk.groupId());
+ assertEquals(topicId, spk.topicId());
+ assertEquals(2, spk.partition());
+ }
+
+ @Test
+ public void testGetInstanceFromKeyWithGroupIdEndingWithColon() {
+ Uuid topicId = Uuid.randomUuid();
+ String key = "mygroup::" + topicId + ":4";
+ SharePartitionKey spk = SharePartitionKey.getInstance(key);
+ assertEquals("mygroup:", spk.groupId());
+ assertEquals(topicId, spk.topicId());
+ assertEquals(4, spk.partition());
+ }
+
+ @Test
+ public void testValidateSimpleKey() {
+ Uuid topicId = Uuid.randomUuid();
+ SharePartitionKey.validate("group:" + topicId + ":0");
+ }
+
+ @Test
+ public void testValidateKeyWithColonInGroupId() {
+ Uuid topicId = Uuid.randomUuid();
+ SharePartitionKey.validate("abc:dd:" + topicId + ":0");
+ }
+
+ @Test
+ public void testValidateRejectsNull() {
+ assertThrows(NullPointerException.class, () ->
SharePartitionKey.validate(null));
+ }
+
+ @Test
+ public void testValidateRejectsEmpty() {
+ assertThrows(IllegalArgumentException.class, () ->
SharePartitionKey.validate(""));
+ }
+
+ @Test
+ public void testValidateRejectsTooFewSegments() {
+ assertThrows(IllegalArgumentException.class, () ->
SharePartitionKey.validate("onlytwo:parts"));
+ }
+
+ @Test
+ public void testValidateRejectsEmptyGroupId() {
+ assertThrows(IllegalArgumentException.class, () ->
SharePartitionKey.validate(":" + Uuid.randomUuid() + ":0"));
+ }
+
+ @Test
+ public void testValidateRejectsInvalidTopicId() {
+ assertThrows(IllegalArgumentException.class, () ->
SharePartitionKey.validate("group:not-a-uuid:0"));
+ }
+
+ @Test
+ public void testValidateRejectsInvalidPartition() {
+ assertThrows(IllegalArgumentException.class, () ->
SharePartitionKey.validate("group:" + Uuid.randomUuid() + ":abc"));
+ }
+
+ @Test
+ public void testRoundTripWithCoordinatorKey() {
+ Uuid topicId = Uuid.randomUuid();
+ String groupId = "my:group:with:colons";
+ String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId,
topicId, 3);
+ SharePartitionKey spk = SharePartitionKey.getInstance(coordinatorKey);
+ assertEquals(groupId, spk.groupId());
+ assertEquals(topicId, spk.topicId());
+ assertEquals(3, spk.partition());
+ }
+}