This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new aa0d1f50003 MINOR: Reject requests using unsupported features in 
KIP-1071 (#20031)
aa0d1f50003 is described below

commit aa0d1f500039c57892518f6ca012cb85385d52e6
Author: Lucas Brutschy <lbruts...@confluent.io>
AuthorDate: Wed Jun 25 14:48:56 2025 +0200

    MINOR: Reject requests using unsupported features in KIP-1071 (#20031)
    
    KIP-1071 does not currently support all features planned in the KIP. We
    should reject any requests that are using features that are currently
    not implemented.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, Matthias J. Sax
     <matth...@confluent.io>, Bill Bejeck <b...@confluent.io>
---
 .../coordinator/group/GroupCoordinatorService.java |  22 +++++
 .../org/apache/kafka/coordinator/group/Utils.java  |  16 ++++
 .../group/GroupCoordinatorServiceTest.java         | 105 +++++++++++++++++----
 3 files changed, 124 insertions(+), 19 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 94ef75c846e..ab7ede49cfe 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -141,6 +141,7 @@ import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationEx
 import static org.apache.kafka.coordinator.group.Utils.throwIfEmptyString;
 import static 
org.apache.kafka.coordinator.group.Utils.throwIfNotEmptyCollection;
 import static org.apache.kafka.coordinator.group.Utils.throwIfNotNull;
+import static org.apache.kafka.coordinator.group.Utils.throwIfNotNullOrEmpty;
 import static org.apache.kafka.coordinator.group.Utils.throwIfNull;
 
 /**
@@ -540,6 +541,26 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         }
     }
 
+    /**
+     * Validates the request. Specifically, throws if any not-yet-supported 
features are used.
+     *
+     * @param request The request to validate.
+     * @throws InvalidRequestException if the request is not valid.
+     */
+    private static void 
throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures(
+        StreamsGroupHeartbeatRequestData request
+    ) throws InvalidRequestException {
+        throwIfNotNull(request.instanceId(), "Static membership is not yet 
supported.");
+        throwIfNotNull(request.taskOffsets(), "TaskOffsets are not supported 
yet.");
+        throwIfNotNull(request.taskEndOffsets(), "TaskEndOffsets are not 
supported yet.");
+        throwIfNotNullOrEmpty(request.warmupTasks(), "WarmupTasks are not 
supported yet.");
+        if (request.topology() != null) {
+            for (StreamsGroupHeartbeatRequestData.Subtopology subtopology : 
request.topology().subtopologies()) {
+                throwIfNotEmptyCollection(subtopology.sourceTopicRegex(), 
"Regular expressions for source topics are not supported yet.");
+            }
+        }
+    }
+
     /**
      * See
      * {@link 
GroupCoordinator#streamsGroupHeartbeat(AuthorizableRequestContext, 
StreamsGroupHeartbeatRequestData)}.
@@ -559,6 +580,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         }
 
         try {
+            
throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures(request);
             throwIfStreamsGroupHeartbeatRequestIsInvalid(request);
         } catch (Throwable ex) {
             ApiError apiError = ApiError.fromThrowable(ex);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
index d614123d2a7..02b0ed28e6e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
@@ -280,6 +280,22 @@ public class Utils {
         }
     }
 
+    /**
+     * Throws an InvalidRequestException if the value is not null and 
non-empty.
+     *
+     * @param value The value.
+     * @param error The error message.
+     * @throws InvalidRequestException
+     */
+    static void throwIfNotNullOrEmpty(
+        Collection<?> value,
+        String error
+    ) throws InvalidRequestException {
+        if (value != null && !value.isEmpty()) {
+            throw new InvalidRequestException(error);
+        }
+    }
+
     /**
      * Throws an InvalidRequestException if the value is non-null.
      *
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 1c54abaa40a..01c87696053 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -574,6 +574,91 @@ public class GroupCoordinatorServiceTest {
             future.get(5, TimeUnit.SECONDS)
         );
     }
+    @Test
+    public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws 
Exception {
+
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(mockRuntime())
+            .build(true);
+
+        AuthorizableRequestContext context = 
mock(AuthorizableRequestContext.class);
+        when(context.requestVersion()).thenReturn((int) 
ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion());
+
+        assertEquals(
+            new StreamsGroupHeartbeatResult(
+                new StreamsGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.INVALID_REQUEST.code())
+                    .setErrorMessage("Static membership is not yet 
supported."),
+                Map.of()
+            ),
+            service.streamsGroupHeartbeat(
+                context,
+                new StreamsGroupHeartbeatRequestData()
+                    .setInstanceId(Uuid.randomUuid().toString())
+            ).get(5, TimeUnit.SECONDS)
+        );
+
+        assertEquals(
+            new StreamsGroupHeartbeatResult(
+                new StreamsGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.INVALID_REQUEST.code())
+                    .setErrorMessage("TaskOffsets are not supported yet."),
+                Map.of()
+            ),
+            service.streamsGroupHeartbeat(
+                context,
+                new StreamsGroupHeartbeatRequestData()
+                    .setTaskOffsets(List.of(new 
StreamsGroupHeartbeatRequestData.TaskOffset()))
+            ).get(5, TimeUnit.SECONDS)
+        );
+
+        assertEquals(
+            new StreamsGroupHeartbeatResult(
+                new StreamsGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.INVALID_REQUEST.code())
+                    .setErrorMessage("TaskEndOffsets are not supported yet."),
+                Map.of()
+            ),
+            service.streamsGroupHeartbeat(
+                context,
+                new StreamsGroupHeartbeatRequestData()
+                    .setTaskEndOffsets(List.of(new 
StreamsGroupHeartbeatRequestData.TaskOffset()))
+            ).get(5, TimeUnit.SECONDS)
+        );
+
+        assertEquals(
+            new StreamsGroupHeartbeatResult(
+                new StreamsGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.INVALID_REQUEST.code())
+                    .setErrorMessage("WarmupTasks are not supported yet."),
+                Map.of()
+            ),
+            service.streamsGroupHeartbeat(
+                context,
+                new StreamsGroupHeartbeatRequestData()
+                    .setWarmupTasks(List.of(new 
StreamsGroupHeartbeatRequestData.TaskIds()))
+            ).get(5, TimeUnit.SECONDS)
+        );
+
+        assertEquals(
+            new StreamsGroupHeartbeatResult(
+                new StreamsGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.INVALID_REQUEST.code())
+                    .setErrorMessage("Regular expressions for source topics 
are not supported yet."),
+                Map.of()
+            ),
+            service.streamsGroupHeartbeat(
+                context,
+                new StreamsGroupHeartbeatRequestData()
+                    .setTopology(new 
StreamsGroupHeartbeatRequestData.Topology()
+                        .setSubtopologies(List.of(new 
StreamsGroupHeartbeatRequestData.Subtopology()
+                            .setSourceTopicRegex(List.of("foo.*"))
+                        ))
+                    )
+            ).get(5, TimeUnit.SECONDS)
+        );
+    }
 
     @SuppressWarnings("MethodLength")
     @Test
@@ -584,7 +669,7 @@ public class GroupCoordinatorServiceTest {
             .build(true);
 
         AuthorizableRequestContext context = 
mock(AuthorizableRequestContext.class);
-        when(context.requestVersion()).thenReturn((int) 
ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion());
+        when(context.requestVersion()).thenReturn((int) 
ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion());
 
         String memberId = Uuid.randomUuid().toString();
 
@@ -743,24 +828,6 @@ public class GroupCoordinatorServiceTest {
             ).get(5, TimeUnit.SECONDS)
         );
 
-        // InstanceId must be non-empty if provided in all requests.
-        assertEquals(
-            new StreamsGroupHeartbeatResult(
-                new StreamsGroupHeartbeatResponseData()
-                    .setErrorCode(Errors.INVALID_REQUEST.code())
-                    .setErrorMessage("InstanceId can't be empty."),
-                Map.of()
-            ),
-            service.streamsGroupHeartbeat(
-                context,
-                new StreamsGroupHeartbeatRequestData()
-                    .setGroupId("foo")
-                    .setMemberId(memberId)
-                    .setMemberEpoch(1)
-                    .setInstanceId("")
-            ).get(5, TimeUnit.SECONDS)
-        );
-
         // RackId must be non-empty if provided in all requests.
         assertEquals(
             new StreamsGroupHeartbeatResult(

Reply via email to