This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4c64b358d6279d50dc3173ba24f21adf5e31f012 Author: Lucas Brutschy <[email protected]> AuthorDate: Fri Jun 7 15:32:08 2024 +0200 Resolve conflict from 11/25 rebase Basic heartbeat RPC handler See https://github.com/lucasbru/kafka/pull/18 --- .../group/GroupCoordinatorAdapter.scala | 11 +- core/src/main/scala/kafka/server/KafkaApis.scala | 26 +++++ .../group/GroupCoordinatorAdapterTest.scala | 18 +++- .../scala/unit/kafka/server/KafkaApisTest.scala | 68 ++++++++++++- .../kafka/coordinator/group/GroupCoordinator.java | 16 +++ .../coordinator/group/GroupCoordinatorService.java | 31 ++++++ .../coordinator/group/GroupCoordinatorShard.java | 18 ++++ .../coordinator/group/GroupMetadataManager.java | 102 ++++++++++++++++++- .../coordinator/group/streams/Assignment.java | 6 +- .../group/GroupCoordinatorServiceTest.java | 113 +++++++++++++++++++++ .../group/GroupCoordinatorShardTest.java | 34 +++++++ .../group/GroupMetadataManagerTest.java | 94 +++++++++++++++++ .../group/GroupMetadataManagerTestContext.java | 32 ++++++ .../coordinator/group/streams/AssignmentTest.java | 18 ++-- 14 files changed, 569 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index d45221d1e71..f7b456699d0 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -18,7 +18,7 @@ package kafka.coordinator.group import kafka.server.{KafkaConfig, ReplicaManager} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...] +import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...] import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch @@ -86,6 +86,15 @@ private[group] class GroupCoordinatorAdapter( )) } + override def streamsHeartbeat( + context: RequestContext, + request: StreamsHeartbeatRequestData + ): CompletableFuture[StreamsHeartbeatResponseData] = { + FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( + s"The old group coordinator does not support ${ApiKeys.STREAMS_HEARTBEAT.name} API." + )) + } + override def shareGroupHeartbeat( context: RequestContext, request: ShareGroupHeartbeatRequestData diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 046f1570707..3bd368bf1e5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -277,6 +277,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request) case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request) case ApiKeys.STREAMS_INITIALIZE => handleStreamsInitialize(request).exceptionally(handleError) + case ApiKeys.STREAMS_HEARTBEAT => handleStreamsHeartbeat(request).exceptionally(handleError) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3911,6 +3912,31 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { + val streamsHeartbeatRequest = request.body[StreamsHeartbeatRequest] + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, streamsHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, streamsHeartbeatRequest.data.groupId)) { + requestHelper.sendMaybeThrottle(request, streamsHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + groupCoordinator.streamsHeartbeat( + request.context, + streamsHeartbeatRequest.data, + ).handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, streamsHeartbeatRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, new StreamsHeartbeatResponse(response)) + } + } + } + } + def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 0d57eea15f9..03fcf3ff21d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -19,7 +19,7 @@ package kafka.coordinator.group import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException} -import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...] +import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...] import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection} @@ -94,6 +94,22 @@ class GroupCoordinatorAdapterTest { assertFutureThrows(future, classOf[UnsupportedVersionException]) } + @Test + def testStreamsHeartbeat(): Unit = { + val groupCoordinator = mock(classOf[GroupCoordinator]) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) + + val ctx = makeContext(ApiKeys.STREAMS_HEARTBEAT, ApiKeys.STREAMS_HEARTBEAT.latestVersion) + val request = new StreamsHeartbeatRequestData()() + .setGroupId("group") + + val future = adapter.streamsHeartbeat(ctx, request) + + assertTrue(future.isDone) + assertTrue(future.isCompletedExceptionally) + assertFutureThrows(future, classOf[UnsupportedVersionException]) + } + @Test def testJoinShareGroup(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 4c0f3421e3f..c0bfa5d0f31 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -11233,7 +11233,8 @@ class KafkaApisTest extends Logging { val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } - + + @Test def testStreamsInitializeRequest(): Unit = { val streamsInitializeRequest = new StreamsInitializeRequestData().setGroupId("group") @@ -11296,6 +11297,71 @@ class KafkaApisTest extends Logging { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + + @Test + def testStreamsHeartbeatRequest(): Unit = { + val streamsHeartbeatRequest = new StreamsHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build()) + + val future = new CompletableFuture[StreamsHeartbeatResponseData]() + when(groupCoordinator.streamsHeartbeat( + requestChannelRequest.context, + streamsHeartbeatRequest + )).thenReturn(future) + kafkaApis = createKafkaApis(overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" + )) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + val streamsHeartbeatResponse = new StreamsHeartbeatResponseData() + .setMemberId("member") + + future.complete(streamsHeartbeatResponse) + val response = verifyNoThrottling[StreamsHeartbeatResponse](requestChannelRequest) + assertEquals(streamsHeartbeatResponse, response.data) + } + + @Test + def testStreamsHeartbeatRequestFutureFailed(): Unit = { + val streamsHeartbeatRequest = new StreamsHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build()) + + val future = new CompletableFuture[StreamsHeartbeatResponseData]() + when(groupCoordinator.streamsHeartbeat( + requestChannelRequest.context, + streamsHeartbeatRequest + )).thenReturn(future) + kafkaApis = createKafkaApis(overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" + )) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) + val response = verifyNoThrottling[StreamsHeartbeatResponse](requestChannelRequest) + assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode) + } + + @Test + def testStreamsHeartbeatRequestAuthorizationFailed(): Unit = { + val streamsHeartbeatRequest = new StreamsHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true") + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + val response = verifyNoThrottling[StreamsHeartbeatResponse](requestChannelRequest) + assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) + } + @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 8a5032451b1..900a789f2c0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; @@ -98,6 +100,20 @@ public interface GroupCoordinator { StreamsInitializeRequestData request ); + /** + * Heartbeat to a Streams Group. + * + * @param context The request context. + * @param request The StreamsHeartbeatResponseData data. + * + * @return A future yielding the response. + * The error code(s) of the response are set to indicate the error(s) occurred during the execution. + */ + CompletableFuture<StreamsHeartbeatResponseData> streamsHeartbeat( + RequestContext context, + StreamsHeartbeatRequestData request + ); + /** * Heartbeat to a Share Group. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 2a3eef88a04..9f180c5798a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -44,6 +44,8 @@ import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; @@ -375,6 +377,35 @@ public class GroupCoordinatorService implements GroupCoordinator { )); } + /** + * See {@link GroupCoordinator#streamsHeartbeat(RequestContext, StreamsHeartbeatRequestData)}. + */ + @Override + public CompletableFuture<StreamsHeartbeatResponseData> streamsHeartbeat( + RequestContext context, + StreamsHeartbeatRequestData request + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture(new StreamsHeartbeatResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + return runtime.scheduleWriteOperation( + "streams-heartbeat", + topicPartitionFor(request.groupId()), + Duration.ofMillis(config.offsetCommitTimeoutMs), + coordinator -> coordinator.streamsHeartbeat(context, request) + ).exceptionally(exception -> handleOperationException( + "streams-heartbeat", + request, + exception, + (error, message) -> new StreamsHeartbeatResponseData() + .setErrorCode(error.code()) + .setErrorMessage(message) + )); + } + /** * See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index b002f96b60b..2eed6f395e3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; @@ -379,6 +381,22 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord return groupMetadataManager.streamsInitialize(context, request); } + /** + * Handles a StreamsHeartbeat request. + * + * @param context The request context. + * @param request The actual StreamsHeartbeat request. + * + * @return A Result containing the StreamsHeartbeat response and + * a list of records to update the state machine. + */ + public CoordinatorResult<StreamsHeartbeatResponseData, CoordinatorRecord> streamsHeartbeat( + RequestContext context, + StreamsHeartbeatRequestData request + ) { + return groupMetadataManager.streamsHeartbeat(context, request); + } + /** * Handles a ShareGroupHeartbeat request. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index c7cf02f7974..f95b9693ca8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -53,6 +53,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; @@ -1427,6 +1429,52 @@ public class GroupMetadataManager { /** + /** + * Validates the request. + * + * @param request The request to validate. + * + * @throws InvalidRequestException if the request is not valid. + * @throws UnsupportedAssignorException if the assignor is not supported. + */ + private void throwIfStreamsHeartbeatRequestIsInvalid( + StreamsHeartbeatRequestData request + ) throws InvalidRequestException, UnsupportedAssignorException { + throwIfEmptyString(request.groupId(), "GroupId can't be empty."); + throwIfEmptyString(request.instanceId(), "InstanceId can't be empty."); + throwIfEmptyString(request.rackId(), "RackId can't be empty."); + + if (request.memberEpoch() > 0 || request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) { + throwIfEmptyString(request.memberId(), "MemberId can't be empty."); + } else if (request.memberEpoch() == 0) { + if (request.rebalanceTimeoutMs() == -1) { + throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request."); + } + if (request.activeTasks() == null || !request.activeTasks().isEmpty()) { + throw new InvalidRequestException("ActiveTasks must be empty when (re-)joining."); + } + if (request.standbyTasks() == null || !request.standbyTasks().isEmpty()) { + throw new InvalidRequestException("StandbyTasks must be empty when (re-)joining."); + } + if (request.warmupTasks() == null || !request.warmupTasks().isEmpty()) { + throw new InvalidRequestException("WarmupTasks must be empty when (re-)joining."); + } + // TODO: check that active, standby and warmup do not intersect + } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { + throwIfEmptyString(request.memberId(), "MemberId can't be empty."); + throwIfNull(request.instanceId(), "InstanceId can't be null."); + } else { + throw new InvalidRequestException("MemberEpoch is invalid."); + } + + // TODO: Check that task assignor exists. +// if (request.assignor() != null && !assignors.containsKey(request.assignor())) { +// throw new UnsupportedAssignorException("Assignor " + request.assignor() +// + " is not supported. Supported assignors: " + String.join(", ", assignors.keySet()) +// + "."); +// } + } + /** * Verifies that the partitions currently owned by the member (the ones set in the * request) matches the ones that the member should own. It matches if the consumer @@ -3640,12 +3688,12 @@ public class GroupMetadataManager { } /** - * Handles a ConsumerGroupHeartbeat request. + * Handles a StreamsInitialize request. * * @param context The request context. - * @param request The actual ConsumerGroupHeartbeat request. + * @param request The actual StreamsInitialize request. * - * @return A Result containing the ConsumerGroupHeartbeat response and + * @return A Result containing the StreamsInitialize response and * a list of records to update the state machine. */ public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> streamsInitialize( @@ -3660,6 +3708,54 @@ public class GroupMetadataManager { ); } + /** + * Handles a StreamsHeartbeat request. + * + * @param context The request context. + * @param request The actual StreamsHeartbeat request. + * + * @return A Result containing the StreamsHeartbeat response and + * a list of records to update the state machine. + */ + public CoordinatorResult<StreamsHeartbeatResponseData, CoordinatorRecord> streamsHeartbeat( + RequestContext context, + StreamsHeartbeatRequestData request + ) throws ApiException { + throwIfStreamsHeartbeatRequestIsInvalid(request); + + return new CoordinatorResult<>( + Collections.emptyList(), + new StreamsHeartbeatResponseData() + .setErrorCode(Errors.NONE.code()) + ); + +// if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { +// // TODO: -1 means that the member wants to leave the group. +// // -2 means that a static member wants to leave the group. +// return consumerGroupLeave( +// request.groupId(), +// request.instanceId(), +// request.memberId(), +// request.memberEpoch() +// ); +// } else { +// // TODO: Otherwise, it is a regular heartbeat. +// return consumerGroupHeartbeat( +// request.groupId(), +// request.memberId(), +// request.memberEpoch(), +// request.instanceId(), +// request.rackId(), +// request.rebalanceTimeoutMs(), +// context.clientId(), +// context.clientAddress.toString(), +// request.subscribedTopicNames(), +// request.serverAssignor(), +// request.topicPartitions() +// ); +// } + } + /** * Replays StreamsGroupTopologyKey/Value to update the hard state of * the streams group. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java index 735ee414785..54e4cbc63a4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java @@ -103,19 +103,19 @@ public class Assignment { return new Assignment( record.activeTasks().stream() .collect(Collectors.toMap( - StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology, + StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology, taskId -> new HashSet<>(taskId.partitions()) ) ), record.standbyTasks().stream() .collect(Collectors.toMap( - StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology, + StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology, taskId -> new HashSet<>(taskId.partitions()) ) ), record.warmupTasks().stream() .collect(Collectors.toMap( - StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology, + StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology, taskId -> new HashSet<>(taskId.partitions()) ) ) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 96273e368e1..fa19409b208 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -54,6 +54,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; @@ -375,6 +377,117 @@ public class GroupCoordinatorServiceTest { ); } + @Test + public void testStreamsHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + StreamsHeartbeatRequestData request = new StreamsHeartbeatRequestData() + .setGroupId("foo"); + + CompletableFuture<StreamsHeartbeatResponseData> future = service.streamsHeartbeat( + requestContext(ApiKeys.STREAMS_HEARTBEAT), + request + ); + + assertEquals( + new StreamsHeartbeatResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), + future.get() + ); + } + + @Test + public void testStreamsHeartbeat() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + StreamsHeartbeatRequestData request = new StreamsHeartbeatRequestData() + .setGroupId("foo"); + + service.startup(() -> 1); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("streams-heartbeat"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture( + new StreamsHeartbeatResponseData() + )); + + CompletableFuture<StreamsHeartbeatResponseData> future = service.streamsHeartbeat( + requestContext(ApiKeys.STREAMS_HEARTBEAT), + request + ); + + assertEquals(new StreamsHeartbeatResponseData(), future.get(5, TimeUnit.SECONDS)); + } + + private static Stream<Arguments> testStreamsHeartbeatWithExceptionSource() { + return Stream.of( + Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), + Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), + Arguments.arguments(new org.apache.kafka.common.errors.TimeoutException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), + Arguments.arguments(new NotLeaderOrFollowerException(), Errors.NOT_COORDINATOR.code(), null), + Arguments.arguments(new KafkaStorageException(), Errors.NOT_COORDINATOR.code(), null), + Arguments.arguments(new RecordTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null), + Arguments.arguments(new RecordBatchTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null), + Arguments.arguments(new InvalidFetchSizeException(""), Errors.UNKNOWN_SERVER_ERROR.code(), null), + Arguments.arguments(new InvalidRequestException("Invalid"), Errors.INVALID_REQUEST.code(), "Invalid") + ); + } + + @ParameterizedTest + @MethodSource("testStreamsHeartbeatWithExceptionSource") + public void testStreamsHeartbeatWithException( + Throwable exception, + short expectedErrorCode, + String expectedErrorMessage + ) throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + StreamsHeartbeatRequestData request = new StreamsHeartbeatRequestData() + .setGroupId("foo"); + + service.startup(() -> 1); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("streams-heartbeat"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(exception)); + + CompletableFuture<StreamsHeartbeatResponseData> future = service.streamsHeartbeat( + requestContext(ApiKeys.STREAMS_HEARTBEAT), + request + ); + + assertEquals( + new StreamsHeartbeatResponseData() + .setErrorCode(expectedErrorCode) + .setErrorMessage(expectedErrorMessage), + future.get(5, TimeUnit.SECONDS) + ); + } + @Test public void testPartitionFor() { CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 33756fe53ac..da7fcc52de5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -165,6 +167,38 @@ public class GroupCoordinatorShardTest { assertEquals(result, coordinator.streamsInitialize(context, request)); } + @Test + public void testStreamsHeartbeat() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + + RequestContext context = requestContext(ApiKeys.STREAMS_HEARTBEAT); + StreamsHeartbeatRequestData request = new StreamsHeartbeatRequestData(); + CoordinatorResult<StreamsHeartbeatResponseData, CoordinatorRecord> result = new CoordinatorResult<>( + Collections.emptyList(), + new StreamsHeartbeatResponseData() + ); + + when(groupMetadataManager.streamsHeartbeat( + context, + request + )).thenReturn(result); + + assertEquals(result, coordinator.streamsHeartbeat(context, request)); + } + @Test public void testCommitOffset() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index c9f2d80ca2b..48afadb6ce7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -54,6 +54,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.message.StreamsInitializeRequestData.Subtopology; import org.apache.kafka.common.message.StreamsInitializeRequestData.TopicConfig; @@ -261,6 +262,99 @@ public class GroupMetadataManagerTest { assertEquals("InstanceId can't be null.", ex.getMessage()); } + + @Test + public void testStreamsRequestValidation() { +// TODO MockTaskAssignor assignor = new MockTaskAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +// .withAssignors(Collections.singletonList(assignor)) + .build(); + Exception ex; + + // GroupId must be present in all requests. + ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + new StreamsHeartbeatRequestData())); + assertEquals("GroupId can't be empty.", ex.getMessage()); + + // GroupId can't be all whitespaces. + ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + new StreamsHeartbeatRequestData() + .setGroupId(" "))); + assertEquals("GroupId can't be empty.", ex.getMessage()); + + // RebalanceTimeoutMs must be present in the first request (epoch == 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + new StreamsHeartbeatRequestData() + .setGroupId("foo") + .setMemberEpoch(0))); + assertEquals("RebalanceTimeoutMs must be provided in first request.", ex.getMessage()); + + // ActiveTasks must be present and empty in the first request (epoch == 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + new StreamsHeartbeatRequestData() + .setGroupId("foo") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setStandbyTasks(Collections.emptyList()) + .setWarmupTasks(Collections.emptyList()))); + assertEquals("ActiveTasks must be empty when (re-)joining.", ex.getMessage()); + + // StandbyTasks must be present and empty in the first request (epoch == 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + new StreamsHeartbeatRequestData() + .setGroupId("foo") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setActiveTasks(Collections.emptyList()) + .setWarmupTasks(Collections.emptyList()))); + assertEquals("StandbyTasks must be empty when (re-)joining.", ex.getMessage()); + + // WarmupTasks must be present and empty in the first request (epoch == 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + new StreamsHeartbeatRequestData() + .setGroupId("foo") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setActiveTasks(Collections.emptyList()) + .setStandbyTasks(Collections.emptyList()))); + assertEquals("WarmupTasks must be empty when (re-)joining.", ex.getMessage()); + + // MemberId must be non-empty in all requests except for the first one where it + // could be empty (epoch != 0). + ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + new StreamsHeartbeatRequestData() + .setGroupId("foo") + .setMemberEpoch(1))); + assertEquals("MemberId can't be empty.", ex.getMessage()); + + // InstanceId must be non-empty if provided in all requests. + ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + new StreamsHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(Uuid.randomUuid().toString()) + .setMemberEpoch(1) + .setInstanceId(""))); + assertEquals("InstanceId can't be empty.", ex.getMessage()); + + // RackId must be non-empty if provided in all requests. + ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + new StreamsHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(Uuid.randomUuid().toString()) + .setMemberEpoch(1) + .setRackId(""))); + assertEquals("RackId can't be empty.", ex.getMessage()); + +// TODO: // ServerAssignor must exist if provided in all requests. +// ex = assertThrows(UnsupportedAssignorException.class, () -> context.streamsHeartbeat( +// new StreamsHeartbeatRequestData() +// .setGroupId("foo") +// .setMemberId(Uuid.randomUuid().toString()) +// .setMemberEpoch(1) +// .setServerAssignor("bar"))); +// assertEquals("ServerAssignor bar is not supported. Supported assignors: range.", ex.getMessage()); + } + @Test public void testConsumerHeartbeatRegexValidation() { String memberId = Uuid.randomUuid().toString(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index bdcc0c9e012..e2e663466b3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -36,6 +36,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; @@ -708,6 +710,36 @@ public class GroupMetadataManagerTestContext { return result; } + public CoordinatorResult<StreamsHeartbeatResponseData, CoordinatorRecord> streamsHeartbeat( + StreamsHeartbeatRequestData request + ) { + RequestContext context = new RequestContext( + new RequestHeader( + ApiKeys.STREAMS_HEARTBEAT, + ApiKeys.STREAMS_HEARTBEAT.latestVersion(), + "client", + 0 + ), + "1", + InetAddress.getLoopbackAddress(), + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, + ClientInformation.EMPTY, + false + ); + + CoordinatorResult<StreamsHeartbeatResponseData, CoordinatorRecord> result = groupMetadataManager.streamsHeartbeat( + context, + request + ); + + if (result.replayRecords()) { + result.records().forEach(this::replay); + } + return result; + } + public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> sleep(long ms) { time.sleep(ms); List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = timer.poll(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java index baff38bceaf..49ba49753bc 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java @@ -62,25 +62,25 @@ public class AssignmentTest { public void testFromTargetAssignmentRecord() { String subtopology1 = "subtopology1"; String subtopology2 = "subtopology2"; - List<StreamsGroupTargetAssignmentMemberValue.TaskId> activeTasks = new ArrayList<>(); - activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTasks = new ArrayList<>(); + activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() .setSubtopology(subtopology1) .setPartitions(Arrays.asList(1, 2, 3))); - activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() .setSubtopology(subtopology2) .setPartitions(Arrays.asList(4, 5, 6))); - List<StreamsGroupTargetAssignmentMemberValue.TaskId> standbyTasks = new ArrayList<>(); - standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTasks = new ArrayList<>(); + standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() .setSubtopology(subtopology1) .setPartitions(Arrays.asList(7, 8, 9))); - standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() .setSubtopology(subtopology2) .setPartitions(Arrays.asList(1, 2, 3))); - List<StreamsGroupTargetAssignmentMemberValue.TaskId> warmupTasks = new ArrayList<>(); - warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTasks = new ArrayList<>(); + warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() .setSubtopology(subtopology1) .setPartitions(Arrays.asList(4, 5, 6))); - warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() .setSubtopology(subtopology2) .setPartitions(Arrays.asList(7, 8, 9)));
