This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push: new aa0d1f50003 MINOR: Reject requests using unsupported features in KIP-1071 (#20031) aa0d1f50003 is described below commit aa0d1f500039c57892518f6ca012cb85385d52e6 Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Wed Jun 25 14:48:56 2025 +0200 MINOR: Reject requests using unsupported features in KIP-1071 (#20031) KIP-1071 does not currently support all features planned in the KIP. We should reject any requests that are using features that are currently not implemented. Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, Matthias J. Sax <matth...@confluent.io>, Bill Bejeck <b...@confluent.io> --- .../coordinator/group/GroupCoordinatorService.java | 22 +++++ .../org/apache/kafka/coordinator/group/Utils.java | 16 ++++ .../group/GroupCoordinatorServiceTest.java | 105 +++++++++++++++++---- 3 files changed, 124 insertions(+), 19 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 94ef75c846e..ab7ede49cfe 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -141,6 +141,7 @@ import static org.apache.kafka.coordinator.common.runtime.CoordinatorOperationEx import static org.apache.kafka.coordinator.group.Utils.throwIfEmptyString; import static org.apache.kafka.coordinator.group.Utils.throwIfNotEmptyCollection; import static org.apache.kafka.coordinator.group.Utils.throwIfNotNull; +import static org.apache.kafka.coordinator.group.Utils.throwIfNotNullOrEmpty; import static org.apache.kafka.coordinator.group.Utils.throwIfNull; /** @@ -540,6 +541,26 @@ public class GroupCoordinatorService implements GroupCoordinator { } } + /** + * Validates the request. Specifically, throws if any not-yet-supported features are used. + * + * @param request The request to validate. + * @throws InvalidRequestException if the request is not valid. + */ + private static void throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures( + StreamsGroupHeartbeatRequestData request + ) throws InvalidRequestException { + throwIfNotNull(request.instanceId(), "Static membership is not yet supported."); + throwIfNotNull(request.taskOffsets(), "TaskOffsets are not supported yet."); + throwIfNotNull(request.taskEndOffsets(), "TaskEndOffsets are not supported yet."); + throwIfNotNullOrEmpty(request.warmupTasks(), "WarmupTasks are not supported yet."); + if (request.topology() != null) { + for (StreamsGroupHeartbeatRequestData.Subtopology subtopology : request.topology().subtopologies()) { + throwIfNotEmptyCollection(subtopology.sourceTopicRegex(), "Regular expressions for source topics are not supported yet."); + } + } + } + /** * See * {@link GroupCoordinator#streamsGroupHeartbeat(AuthorizableRequestContext, StreamsGroupHeartbeatRequestData)}. @@ -559,6 +580,7 @@ public class GroupCoordinatorService implements GroupCoordinator { } try { + throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures(request); throwIfStreamsGroupHeartbeatRequestIsInvalid(request); } catch (Throwable ex) { ApiError apiError = ApiError.fromThrowable(ex); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java index d614123d2a7..02b0ed28e6e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -280,6 +280,22 @@ public class Utils { } } + /** + * Throws an InvalidRequestException if the value is not null and non-empty. + * + * @param value The value. + * @param error The error message. + * @throws InvalidRequestException + */ + static void throwIfNotNullOrEmpty( + Collection<?> value, + String error + ) throws InvalidRequestException { + if (value != null && !value.isEmpty()) { + throw new InvalidRequestException(error); + } + } + /** * Throws an InvalidRequestException if the value is non-null. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 1c54abaa40a..01c87696053 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -574,6 +574,91 @@ public class GroupCoordinatorServiceTest { future.get(5, TimeUnit.SECONDS) ); } + @Test + public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws Exception { + + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(mockRuntime()) + .build(true); + + AuthorizableRequestContext context = mock(AuthorizableRequestContext.class); + when(context.requestVersion()).thenReturn((int) ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion()); + + assertEquals( + new StreamsGroupHeartbeatResult( + new StreamsGroupHeartbeatResponseData() + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Static membership is not yet supported."), + Map.of() + ), + service.streamsGroupHeartbeat( + context, + new StreamsGroupHeartbeatRequestData() + .setInstanceId(Uuid.randomUuid().toString()) + ).get(5, TimeUnit.SECONDS) + ); + + assertEquals( + new StreamsGroupHeartbeatResult( + new StreamsGroupHeartbeatResponseData() + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("TaskOffsets are not supported yet."), + Map.of() + ), + service.streamsGroupHeartbeat( + context, + new StreamsGroupHeartbeatRequestData() + .setTaskOffsets(List.of(new StreamsGroupHeartbeatRequestData.TaskOffset())) + ).get(5, TimeUnit.SECONDS) + ); + + assertEquals( + new StreamsGroupHeartbeatResult( + new StreamsGroupHeartbeatResponseData() + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("TaskEndOffsets are not supported yet."), + Map.of() + ), + service.streamsGroupHeartbeat( + context, + new StreamsGroupHeartbeatRequestData() + .setTaskEndOffsets(List.of(new StreamsGroupHeartbeatRequestData.TaskOffset())) + ).get(5, TimeUnit.SECONDS) + ); + + assertEquals( + new StreamsGroupHeartbeatResult( + new StreamsGroupHeartbeatResponseData() + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("WarmupTasks are not supported yet."), + Map.of() + ), + service.streamsGroupHeartbeat( + context, + new StreamsGroupHeartbeatRequestData() + .setWarmupTasks(List.of(new StreamsGroupHeartbeatRequestData.TaskIds())) + ).get(5, TimeUnit.SECONDS) + ); + + assertEquals( + new StreamsGroupHeartbeatResult( + new StreamsGroupHeartbeatResponseData() + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Regular expressions for source topics are not supported yet."), + Map.of() + ), + service.streamsGroupHeartbeat( + context, + new StreamsGroupHeartbeatRequestData() + .setTopology(new StreamsGroupHeartbeatRequestData.Topology() + .setSubtopologies(List.of(new StreamsGroupHeartbeatRequestData.Subtopology() + .setSourceTopicRegex(List.of("foo.*")) + )) + ) + ).get(5, TimeUnit.SECONDS) + ); + } @SuppressWarnings("MethodLength") @Test @@ -584,7 +669,7 @@ public class GroupCoordinatorServiceTest { .build(true); AuthorizableRequestContext context = mock(AuthorizableRequestContext.class); - when(context.requestVersion()).thenReturn((int) ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion()); + when(context.requestVersion()).thenReturn((int) ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion()); String memberId = Uuid.randomUuid().toString(); @@ -743,24 +828,6 @@ public class GroupCoordinatorServiceTest { ).get(5, TimeUnit.SECONDS) ); - // InstanceId must be non-empty if provided in all requests. - assertEquals( - new StreamsGroupHeartbeatResult( - new StreamsGroupHeartbeatResponseData() - .setErrorCode(Errors.INVALID_REQUEST.code()) - .setErrorMessage("InstanceId can't be empty."), - Map.of() - ), - service.streamsGroupHeartbeat( - context, - new StreamsGroupHeartbeatRequestData() - .setGroupId("foo") - .setMemberId(memberId) - .setMemberEpoch(1) - .setInstanceId("") - ).get(5, TimeUnit.SECONDS) - ); - // RackId must be non-empty if provided in all requests. assertEquals( new StreamsGroupHeartbeatResult(