This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 9596c45a068 KAFKA-20183: Share consumer group fails when group ID 
contains colon character (#21471)
9596c45a068 is described below

commit 9596c45a0682a42f1c81845306b701e6813df585
Author: Federico Valeri <[email protected]>
AuthorDate: Mon Feb 16 11:11:23 2026 +0100

    KAFKA-20183: Share consumer group fails when group ID contains colon 
character (#21471)
    
    
    Reviewers: Mickael Maison <[email protected]>, Andrew Schofield
     <[email protected]>, Sushant Mahajan <[email protected]>
---
 .../kafka/server/share/SharePartitionKey.java      |  57 +++++----
 .../kafka/server/share/SharePartitionKeyTest.java  | 129 +++++++++++++++++++++
 2 files changed, 162 insertions(+), 24 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
index 89f555b564e..8565064a597 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/SharePartitionKey.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 
+import java.util.Arrays;
 import java.util.Objects;
 
 /**
@@ -67,52 +68,60 @@ public class SharePartitionKey {
 
 
     /**
-     * Returns a SharePartitionKey from input string of format - 
groupId:topicId:partition
+     * Returns a SharePartitionKey from input string of format - 
groupId:topicId:partition.
+     * The key is parsed from the right: partition is the last segment, 
topicId is
+     * the second-to-last, and groupId is everything before (which may contain 
colons).
+     *
      * @param key - String in format groupId:topicId:partition
      * @return object representing SharePartitionKey
      * @throws IllegalArgumentException if the key is empty or has invalid 
format
      */
     public static SharePartitionKey getInstance(String key) {
-        validate(key);
-        String[] tokens = key.split(":");
-        return new SharePartitionKey(
-                tokens[0].trim(),
-                Uuid.fromString(tokens[1]),
-                Integer.parseInt(tokens[2])
-        );
-    }
-
-    /**
-     * Validates whether the String argument has a valid SharePartitionKey 
format - groupId:topicId:partition
-     * @param key - String in format groupId:topicId:partition
-     * @throws IllegalArgumentException if the key is empty or has invalid 
format
-     */
-    public static void validate(String key) {
         Objects.requireNonNull(key, "Share partition key cannot be null");
         if (key.isEmpty()) {
             throw new IllegalArgumentException("Share partition key cannot be 
empty");
         }
 
         String[] tokens = key.split(":");
-        if (tokens.length != 3) {
-            throw new IllegalArgumentException("Invalid key format: expected - 
groupId:topicId:partition, found -  " + key);
+        if (tokens.length < 3) {
+            throw new IllegalArgumentException("Invalid key format: expected - 
groupId:topicId:partition, found - " + key);
         }
 
-        if (tokens[0].trim().isEmpty()) {
-            throw new IllegalArgumentException("GroupId must be alphanumeric 
string");
+        int last = tokens.length - 1;
+        String partitionStr = tokens[last];
+        String topicIdStr = tokens[last - 1];
+        String groupId = String.join(":", Arrays.copyOfRange(tokens, 0, last - 
1));
+
+        if (groupId.trim().isEmpty()) {
+            throw new IllegalArgumentException("GroupId must not be empty");
         }
 
+        Uuid topicId;
         try {
-            Uuid.fromString(tokens[1]);
+            topicId = Uuid.fromString(topicIdStr);
         } catch (Exception e) {
-            throw new IllegalArgumentException("Invalid topic ID: " + 
tokens[1], e);
+            throw new IllegalArgumentException("Invalid topic ID: " + 
topicIdStr, e);
         }
 
+        int partition;
         try {
-            Integer.parseInt(tokens[2]);
+            partition = Integer.parseInt(partitionStr);
         } catch (Exception e) {
-            throw new IllegalArgumentException("Invalid partition: " + 
tokens[2], e);
+            throw new IllegalArgumentException("Invalid partition: " + 
partitionStr, e);
         }
+
+        return new SharePartitionKey(groupId, topicId, partition);
+    }
+
+    /**
+     * Validates whether the String argument has a valid SharePartitionKey 
format - groupId:topicId:partition.
+     * The key is parsed from the right since groupId may contain colons.
+     *
+     * @param key - String in format groupId:topicId:partition
+     * @throws IllegalArgumentException if the key is empty or has invalid 
format
+     */
+    public static void validate(String key) {
+        getInstance(key);
     }
 
     public static SharePartitionKey getInstance(String groupId, Uuid topicId, 
int partition) {
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/share/SharePartitionKeyTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/share/SharePartitionKeyTest.java
new file mode 100644
index 00000000000..b8c13d624bc
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/share/SharePartitionKeyTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class SharePartitionKeyTest {
+    @Test
+    public void testGetInstanceFromSimpleKey() {
+        Uuid topicId = Uuid.randomUuid();
+        String key = "my-group:" + topicId + ":3";
+        SharePartitionKey spk = SharePartitionKey.getInstance(key);
+        assertEquals("my-group", spk.groupId());
+        assertEquals(topicId, spk.topicId());
+        assertEquals(3, spk.partition());
+    }
+
+    @Test
+    public void testGetInstanceFromKeyWithColonInGroupId() {
+        Uuid topicId = Uuid.randomUuid();
+        String key = "abc:de:" + topicId + ":0";
+        SharePartitionKey spk = SharePartitionKey.getInstance(key);
+        assertEquals("abc:de", spk.groupId());
+        assertEquals(topicId, spk.topicId());
+        assertEquals(0, spk.partition());
+    }
+
+    @Test
+    public void testGetInstanceFromKeyWithMultipleColonsInGroupId() {
+        Uuid topicId = Uuid.randomUuid();
+        String key = "a:b:c:d:" + topicId + ":5";
+        SharePartitionKey spk = SharePartitionKey.getInstance(key);
+        assertEquals("a:b:c:d", spk.groupId());
+        assertEquals(topicId, spk.topicId());
+        assertEquals(5, spk.partition());
+    }
+
+    @Test
+    public void testGetInstanceFromKeyWithGroupIdStartingWithColon() {
+        Uuid topicId = Uuid.randomUuid();
+        String key = ":mygroup:" + topicId + ":2";
+        SharePartitionKey spk = SharePartitionKey.getInstance(key);
+        assertEquals(":mygroup", spk.groupId());
+        assertEquals(topicId, spk.topicId());
+        assertEquals(2, spk.partition());
+    }
+
+    @Test
+    public void testGetInstanceFromKeyWithGroupIdEndingWithColon() {
+        Uuid topicId = Uuid.randomUuid();
+        String key = "mygroup::" + topicId + ":4";
+        SharePartitionKey spk = SharePartitionKey.getInstance(key);
+        assertEquals("mygroup:", spk.groupId());
+        assertEquals(topicId, spk.topicId());
+        assertEquals(4, spk.partition());
+    }
+
+    @Test
+    public void testValidateSimpleKey() {
+        Uuid topicId = Uuid.randomUuid();
+        SharePartitionKey.validate("group:" + topicId + ":0");
+    }
+
+    @Test
+    public void testValidateKeyWithColonInGroupId() {
+        Uuid topicId = Uuid.randomUuid();
+        SharePartitionKey.validate("abc:dd:" + topicId + ":0");
+    }
+
+    @Test
+    public void testValidateRejectsNull() {
+        assertThrows(NullPointerException.class, () -> 
SharePartitionKey.validate(null));
+    }
+
+    @Test
+    public void testValidateRejectsEmpty() {
+        assertThrows(IllegalArgumentException.class, () -> 
SharePartitionKey.validate(""));
+    }
+
+    @Test
+    public void testValidateRejectsTooFewSegments() {
+        assertThrows(IllegalArgumentException.class, () -> 
SharePartitionKey.validate("onlytwo:parts"));
+    }
+
+    @Test
+    public void testValidateRejectsEmptyGroupId() {
+        assertThrows(IllegalArgumentException.class, () -> 
SharePartitionKey.validate(":" + Uuid.randomUuid() + ":0"));
+    }
+
+    @Test
+    public void testValidateRejectsInvalidTopicId() {
+        assertThrows(IllegalArgumentException.class, () -> 
SharePartitionKey.validate("group:not-a-uuid:0"));
+    }
+
+    @Test
+    public void testValidateRejectsInvalidPartition() {
+        assertThrows(IllegalArgumentException.class, () -> 
SharePartitionKey.validate("group:" + Uuid.randomUuid() + ":abc"));
+    }
+
+    @Test
+    public void testRoundTripWithCoordinatorKey() {
+        Uuid topicId = Uuid.randomUuid();
+        String groupId = "my:group:with:colons";
+        String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, 
topicId, 3);
+        SharePartitionKey spk = SharePartitionKey.getInstance(coordinatorKey);
+        assertEquals(groupId, spk.groupId());
+        assertEquals(topicId, spk.topicId());
+        assertEquals(3, spk.partition());
+    }
+}

Reply via email to