This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 93b72dba05cc682f849fb00110f15c1413fee563 Author: Bruno Cadonna <[email protected]> AuthorDate: Fri Aug 30 11:14:10 2024 +0200 Rename streamsHeartbeatX and streamsInitializeX to streamsGroupX --- .../consumer/internals/RequestManagers.java | 28 +++++------ .../StreamsGroupHeartbeatRequestManager.java | 38 +++++++-------- ...a => StreamsGroupInitializeRequestManager.java} | 14 +++--- .../StreamsGroupHeartbeatRequestManagerTest.java | 6 +-- .../StreamsGroupInitializeRequestManagerTest.java | 16 +++---- .../group/GroupCoordinatorAdapter.scala | 4 +- core/src/main/scala/kafka/server/KafkaApis.scala | 38 +++++++-------- .../group/GroupCoordinatorAdapterTest.scala | 8 ++-- .../scala/unit/kafka/server/KafkaApisTest.scala | 8 ++-- .../kafka/coordinator/group/GroupCoordinator.java | 8 ++-- .../coordinator/group/GroupCoordinatorService.java | 12 ++--- .../coordinator/group/GroupCoordinatorShard.java | 21 ++++----- .../coordinator/group/GroupMetadataManager.java | 54 +++++++++++----------- .../group/streams/CurrentAssignmentBuilder.java | 14 +++--- .../group/GroupCoordinatorServiceTest.java | 32 ++++++------- .../group/GroupCoordinatorShardTest.java | 12 ++--- .../group/GroupMetadataManagerTest.java | 38 +++++++-------- .../group/GroupMetadataManagerTestContext.java | 8 ++-- 18 files changed, 179 insertions(+), 180 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 7cf4a3e67f6..4f3ec1b2398 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -58,8 +58,8 @@ public class RequestManagers implements Closeable { public final TopicMetadataRequestManager topicMetadataRequestManager; public final FetchRequestManager fetchRequestManager; public final Optional<ShareConsumeRequestManager> shareConsumeRequestManager; - public final Optional<StreamsGroupHeartbeatRequestManager> streamsHeartbeatRequestManager; - public final Optional<StreamsInitializeRequestManager> streamsInitializeRequestManager; + public final Optional<StreamsGroupHeartbeatRequestManager> streamsGroupHeartbeatRequestManager; + public final Optional<StreamsGroupInitializeRequestManager> streamsGroupInitializeRequestManager; private final List<Optional<? extends RequestManager>> entries; private final IdempotentCloser closer = new IdempotentCloser(); @@ -71,8 +71,8 @@ public class RequestManagers implements Closeable { Optional<CommitRequestManager> commitRequestManager, Optional<ConsumerHeartbeatRequestManager> heartbeatRequestManager, Optional<ConsumerMembershipManager> membershipManager, - Optional<StreamsGroupHeartbeatRequestManager> streamsHeartbeatRequestManager, - Optional<StreamsInitializeRequestManager> streamsInitializeRequestManager) { + Optional<StreamsGroupHeartbeatRequestManager> streamsGroupHeartbeatRequestManager, + Optional<StreamsGroupInitializeRequestManager> streamsGroupInitializeRequestManager) { this.log = logContext.logger(RequestManagers.class); this.offsetsRequestManager = requireNonNull(offsetsRequestManager, "OffsetsRequestManager cannot be null"); this.coordinatorRequestManager = coordinatorRequestManager; @@ -84,8 +84,8 @@ public class RequestManagers implements Closeable { this.shareHeartbeatRequestManager = Optional.empty(); this.consumerMembershipManager = membershipManager; this.shareMembershipManager = Optional.empty(); - this.streamsHeartbeatRequestManager = streamsHeartbeatRequestManager; - this.streamsInitializeRequestManager = streamsInitializeRequestManager; + this.streamsGroupHeartbeatRequestManager = streamsGroupHeartbeatRequestManager; + this.streamsGroupInitializeRequestManager = streamsGroupInitializeRequestManager; List<Optional<? extends RequestManager>> list = new ArrayList<>(); list.add(coordinatorRequestManager); @@ -95,8 +95,8 @@ public class RequestManagers implements Closeable { list.add(Optional.of(offsetsRequestManager)); list.add(Optional.of(topicMetadataRequestManager)); list.add(Optional.of(fetchRequestManager)); - list.add(streamsHeartbeatRequestManager); - list.add(streamsInitializeRequestManager); + list.add(streamsGroupHeartbeatRequestManager); + list.add(streamsGroupInitializeRequestManager); entries = Collections.unmodifiableList(list); } @@ -113,8 +113,8 @@ public class RequestManagers implements Closeable { this.shareHeartbeatRequestManager = shareHeartbeatRequestManager; this.consumerMembershipManager = Optional.empty(); this.shareMembershipManager = shareMembershipManager; - this.streamsHeartbeatRequestManager = Optional.empty(); - this.streamsInitializeRequestManager = Optional.empty(); + this.streamsGroupHeartbeatRequestManager = Optional.empty(); + this.streamsGroupInitializeRequestManager = Optional.empty(); this.offsetsRequestManager = null; this.topicMetadataRequestManager = null; this.fetchRequestManager = null; @@ -199,7 +199,7 @@ public class RequestManagers implements Closeable { CoordinatorRequestManager coordinator = null; CommitRequestManager commitRequestManager = null; StreamsGroupHeartbeatRequestManager streamsGroupHeartbeatRequestManager = null; - StreamsInitializeRequestManager streamsInitializeRequestManager = null; + StreamsGroupInitializeRequestManager streamsGroupInitializeRequestManager = null; if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) { Optional<String> serverAssignor = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); @@ -245,7 +245,7 @@ public class RequestManagers implements Closeable { membershipManager.registerStateListener(applicationThreadMemberStateListener); if (streamsInstanceMetadata.isPresent()) { - streamsInitializeRequestManager = new StreamsInitializeRequestManager( + streamsGroupInitializeRequestManager = new StreamsGroupInitializeRequestManager( logContext, groupRebalanceConfig.groupId, streamsInstanceMetadata.get(), @@ -255,7 +255,7 @@ public class RequestManagers implements Closeable { time, config, coordinator, - streamsInitializeRequestManager, + streamsGroupInitializeRequestManager, membershipManager, backgroundEventHandler, metrics, @@ -297,7 +297,7 @@ public class RequestManagers implements Closeable { Optional.ofNullable(heartbeatRequestManager), Optional.ofNullable(membershipManager), Optional.ofNullable(streamsGroupHeartbeatRequestManager), - Optional.ofNullable(streamsInitializeRequestManager) + Optional.ofNullable(streamsGroupInitializeRequestManager) ); } }; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index 2b3b7c8f248..ba0a289f187 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -63,7 +63,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { private final ConsumerMembershipManager membershipManager; - private final StreamsInitializeRequestManager streamsInitializeRequestManager; + private final StreamsGroupInitializeRequestManager streamsGroupInitializeRequestManager; private final BackgroundEventHandler backgroundEventHandler; @@ -82,7 +82,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { final Time time, final ConsumerConfig config, final CoordinatorRequestManager coordinatorRequestManager, - final StreamsInitializeRequestManager streamsInitializeRequestManager, + final StreamsGroupInitializeRequestManager streamsGroupInitializeRequestManager, final ConsumerMembershipManager membershipManager, final BackgroundEventHandler backgroundEventHandler, final Metrics metrics, @@ -92,7 +92,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { this.coordinatorRequestManager = coordinatorRequestManager; this.logger = logContext.logger(getClass()); this.membershipManager = membershipManager; - this.streamsInitializeRequestManager = streamsInitializeRequestManager; + this.streamsGroupInitializeRequestManager = streamsGroupInitializeRequestManager; this.backgroundEventHandler = backgroundEventHandler; int maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); @@ -189,12 +189,12 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { Errors error = Errors.forCode(((StreamsGroupHeartbeatResponse) response.responseBody()).data().errorCode()); if (error == Errors.NONE) { - logger.debug("StreamsHeartbeat responded successfully: {}", response); + logger.debug("StreamsGroupHeartbeat responded successfully: {}", response); } else { - logger.error("StreamsHeartbeat failed because of {}: {}", error, response); + logger.error("StreamsGroupHeartbeat failed because of {}: {}", error, response); } } else { - logger.error("StreamsHeartbeat failed because of unexpected exception.", exception); + logger.error("StreamsGroupHeartbeat failed because of unexpected exception.", exception); } }); } @@ -203,13 +203,13 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { this.heartbeatRequestState.onFailedAttempt(responseTimeMs); this.heartbeatState.reset(); if (exception instanceof RetriableException) { - String message = String.format("StreamsHeartbeatRequest failed because of the retriable exception. " + + String message = String.format("StreamsGroupHeartbeatRequest failed because of the retriable exception. " + "Will retry in %s ms: %s", heartbeatRequestState.remainingBackoffMs(responseTimeMs), exception.getMessage()); logger.debug(message); } else { - logger.error("StreamsHeartbeatRequest failed due to fatal error", exception); + logger.error("StreamsGroupHeartbeatRequest failed due to fatal error", exception); handleFatalFailure(exception); } } @@ -230,7 +230,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { heartbeatRequestState.resetTimer(); if (data.shouldInitializeTopology()) { - streamsInitializeRequestManager.initialize(); + streamsGroupInitializeRequestManager.initialize(); } if (data.partitionsByUserEndpoint() != null) { streamsInterface.partitionsByHost.set(convertHostInfoMap(data)); @@ -340,7 +340,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { switch (error) { case NOT_COORDINATOR: // the manager should retry immediately when the coordinator node becomes available again - message = String.format("StreamsHeartbeatRequest failed because the group coordinator %s is incorrect. " + + message = String.format("StreamsGroupHeartbeatRequest failed because the group coordinator %s is incorrect. " + "Will attempt to find the coordinator again and retry", coordinatorRequestManager.coordinator()); logInfo(message, response, currentTimeMs); @@ -350,7 +350,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { break; case COORDINATOR_NOT_AVAILABLE: - message = String.format("StreamsHeartbeatRequest failed because the group coordinator %s is not available. " + + message = String.format("StreamsGroupHeartbeatRequest failed because the group coordinator %s is not available. " + "Will attempt to find the coordinator again and retry", coordinatorRequestManager.coordinator()); logInfo(message, response, currentTimeMs); @@ -361,7 +361,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { case COORDINATOR_LOAD_IN_PROGRESS: // the manager will backoff and retry - message = String.format("StreamsHeartbeatRequest failed because the group coordinator %s is still loading." + + message = String.format("StreamsGroupHeartbeatRequest failed because the group coordinator %s is still loading." + "Will retry", coordinatorRequestManager.coordinator()); logInfo(message, response, currentTimeMs); @@ -370,12 +370,12 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { case GROUP_AUTHORIZATION_FAILED: GroupAuthorizationException exception = GroupAuthorizationException.forGroupId(membershipManager.groupId()); - logger.error("StreamsHeartbeatRequest failed due to group authorization failure: {}", exception.getMessage()); + logger.error("StreamsGroupHeartbeatRequest failed due to group authorization failure: {}", exception.getMessage()); handleFatalFailure(error.exception(exception.getMessage())); break; case UNRELEASED_INSTANCE_ID: - logger.error("StreamsHeartbeatRequest failed due to the instance id {} was not released: {}", + logger.error("StreamsGroupHeartbeatRequest failed due to the instance id {} was not released: {}", membershipManager.groupInstanceId().orElse("null"), errorMessage); handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage)); break; @@ -384,12 +384,12 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { case GROUP_MAX_SIZE_REACHED: case UNSUPPORTED_ASSIGNOR: case UNSUPPORTED_VERSION: - logger.error("StreamsHeartbeatRequest failed due to error: {}", error); + logger.error("StreamsGroupHeartbeatRequest failed due to error: {}", error); handleFatalFailure(error.exception(errorMessage)); break; case FENCED_MEMBER_EPOCH: - message = String.format("StreamsHeartbeatRequest failed for member %s because epoch %s is fenced.", + message = String.format("StreamsGroupHeartbeatRequest failed for member %s because epoch %s is fenced.", membershipManager.memberId(), membershipManager.memberEpoch()); logInfo(message, response, currentTimeMs); membershipManager.transitionToFenced(); @@ -398,7 +398,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { break; case UNKNOWN_MEMBER_ID: - message = String.format("StreamsHeartbeatRequest failed because member %s is unknown.", + message = String.format("StreamsGroupHeartbeatRequest failed because member %s is unknown.", membershipManager.memberId()); logInfo(message, response, currentTimeMs); membershipManager.transitionToFenced(); @@ -408,7 +408,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { default: // If the manager receives an unknown error - there could be a bug in the code or a new error code - logger.error("StreamsHeartbeatRequest failed due to unexpected error: {}", error); + logger.error("StreamsGroupHeartbeatRequest failed due to unexpected error: {}", error); handleFatalFailure(error.exception(errorMessage)); break; } @@ -604,7 +604,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { .collect(Collectors.toList()); } - // Fields of StreamsHeartbeatRequest sent in the most recent request + // Fields of StreamsGroupHeartbeatRequest sent in the most recent request static class SentFields { private int rebalanceTimeoutMs = -1; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java similarity index 92% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java index c0d2d9f9124..e7f763bd969 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java @@ -30,7 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -public class StreamsInitializeRequestManager implements RequestManager { +public class StreamsGroupInitializeRequestManager implements RequestManager { private final Logger logger; private final String groupId; @@ -40,10 +40,10 @@ public class StreamsInitializeRequestManager implements RequestManager { private Optional<NetworkClientDelegate.UnsentRequest> unsentRequest = Optional.empty(); - StreamsInitializeRequestManager(final LogContext logContext, - final String groupId, - final StreamsAssignmentInterface streamsAssignmentInterface, - final CoordinatorRequestManager coordinatorRequestManager) { + StreamsGroupInitializeRequestManager(final LogContext logContext, + final String groupId, + final StreamsAssignmentInterface streamsAssignmentInterface, + final CoordinatorRequestManager coordinatorRequestManager) { this.logger = logContext.logger(getClass()); this.groupId = groupId; this.streamsAssignmentInterface = streamsAssignmentInterface; @@ -70,11 +70,11 @@ public class StreamsInitializeRequestManager implements RequestManager { streamsGroupInitializeRequestData.setGroupId(groupId); final List<StreamsGroupInitializeRequestData.Subtopology> topology = getTopologyFromStreams(); streamsGroupInitializeRequestData.setTopology(topology); - final StreamsGroupInitializeRequest.Builder streamsInitializeRequestBuilder = new StreamsGroupInitializeRequest.Builder( + final StreamsGroupInitializeRequest.Builder streamsGroupInitializeRequestBuilder = new StreamsGroupInitializeRequest.Builder( streamsGroupInitializeRequestData ); return new NetworkClientDelegate.UnsentRequest( - streamsInitializeRequestBuilder, + streamsGroupInitializeRequestBuilder, coordinatorRequestManager.coordinator() ); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index aed1b4dafd7..b9fdb4c7722 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -86,7 +86,7 @@ class StreamsGroupHeartbeatRequestManagerTest { private CoordinatorRequestManager coordinatorRequestManager; @Mock - private StreamsInitializeRequestManager streamsInitializeRequestManager; + private StreamsGroupInitializeRequestManager streamsGroupInitializeRequestManager; @Mock private ConsumerMembershipManager membershipManager; @@ -141,7 +141,7 @@ class StreamsGroupHeartbeatRequestManagerTest { time, config, coordinatorRequestManager, - streamsInitializeRequestManager, + streamsGroupInitializeRequestManager, membershipManager, backgroundEventHandler, metrics, @@ -333,7 +333,7 @@ class StreamsGroupHeartbeatRequestManagerTest { mockResponse(data); - verify(streamsInitializeRequestManager).initialize(); + verify(streamsGroupInitializeRequestManager).initialize(); } private void mockResponse(final StreamsGroupHeartbeatResponseData data) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java index dd708c8d79a..48108204474 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java @@ -51,20 +51,20 @@ class StreamsGroupInitializeRequestManagerTest { public void shouldPollEmptyResult() { final CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); final StreamsAssignmentInterface streamsAssignmentInterface = mock(StreamsAssignmentInterface.class); - final StreamsInitializeRequestManager streamsInitializeRequestManager = new StreamsInitializeRequestManager( + final StreamsGroupInitializeRequestManager streamsGroupInitializeRequestManager = new StreamsGroupInitializeRequestManager( logContext, groupId, streamsAssignmentInterface, coordinatorRequestManager ); - final NetworkClientDelegate.PollResult pollResult = streamsInitializeRequestManager.poll(0); + final NetworkClientDelegate.PollResult pollResult = streamsGroupInitializeRequestManager.poll(0); assertEquals(NetworkClientDelegate.PollResult.EMPTY, pollResult); } @Test - public void shouldPollStreamsInitializeRequest() { + public void shouldPollStreamsGroupInitializeRequest() { final Node node = mock(Node.class); final CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(node)); @@ -90,23 +90,23 @@ class StreamsGroupInitializeRequestManagerTest { when(streamsAssignmentInterface.subtopologyMap()).thenReturn( mkMap(mkEntry(subtopologyName1, subtopology1)) ); - final StreamsInitializeRequestManager streamsInitializeRequestManager = new StreamsInitializeRequestManager( + final StreamsGroupInitializeRequestManager streamsGroupInitializeRequestManager = new StreamsGroupInitializeRequestManager( logContext, groupId, streamsAssignmentInterface, coordinatorRequestManager ); - streamsInitializeRequestManager.initialize(); - final NetworkClientDelegate.PollResult pollResult = streamsInitializeRequestManager.poll(0); + streamsGroupInitializeRequestManager.initialize(); + final NetworkClientDelegate.PollResult pollResult = streamsGroupInitializeRequestManager.poll(0); assertEquals(1, pollResult.unsentRequests.size()); final NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0); assertTrue(unsentRequest.node().isPresent()); assertEquals(node, unsentRequest.node().get()); assertEquals(ApiKeys.STREAMS_GROUP_INITIALIZE, unsentRequest.requestBuilder().apiKey()); - final StreamsGroupInitializeRequest.Builder streamsInitializeRequestBuilder = (StreamsGroupInitializeRequest.Builder) unsentRequest.requestBuilder(); - final StreamsGroupInitializeRequest streamsGroupInitializeRequest = streamsInitializeRequestBuilder.build(); + final StreamsGroupInitializeRequest.Builder streamsGroupInitializeRequestBuilder = (StreamsGroupInitializeRequest.Builder) unsentRequest.requestBuilder(); + final StreamsGroupInitializeRequest streamsGroupInitializeRequest = streamsGroupInitializeRequestBuilder.build(); final StreamsGroupInitializeRequestData streamsGroupInitializeRequestData = streamsGroupInitializeRequest.data(); assertEquals(ApiKeys.STREAMS_GROUP_INITIALIZE.id, streamsGroupInitializeRequestData.apiKey()); assertEquals(groupId, streamsGroupInitializeRequestData.groupId()); diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index a48b2c6ffc8..ce8bde14493 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -77,7 +77,7 @@ private[group] class GroupCoordinatorAdapter( )) } - override def streamsInitialize( + override def streamsGroupInitialize( context: RequestContext, request: StreamsGroupInitializeRequestData ): CompletableFuture[StreamsGroupInitializeResponseData] = { @@ -86,7 +86,7 @@ private[group] class GroupCoordinatorAdapter( )) } - override def streamsHeartbeat( + override def streamsGroupHeartbeat( context: RequestContext, request: StreamsGroupHeartbeatRequestData ): CompletableFuture[StreamsGroupHeartbeatResponseData] = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d3bfd420ace..5e2abe450c2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -277,8 +277,8 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request) case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request) case ApiKeys.STREAMS_GROUP_DESCRIBE => handleStreamsGroupDescribe(request).exceptionally(handleError) - case ApiKeys.STREAMS_GROUP_INITIALIZE => handleStreamsInitialize(request).exceptionally(handleError) - case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsHeartbeat(request).exceptionally(handleError) + case ApiKeys.STREAMS_GROUP_INITIALIZE => handleStreamsGroupInitialize(request).exceptionally(handleError) + case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsGroupHeartbeat(request).exceptionally(handleError) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3889,27 +3889,27 @@ class KafkaApis(val requestChannel: RequestChannel, private def isStreamsGroupProtocolEnabled(): Boolean = { config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS) } - - def handleStreamsInitialize(request: RequestChannel.Request): CompletableFuture[Unit] = { - val streamsInitializeRequest = request.body[StreamsGroupInitializeRequest] + + def handleStreamsGroupInitialize(request: RequestChannel.Request): CompletableFuture[Unit] = { + val streamsGroupInitializeRequest = request.body[StreamsGroupInitializeRequest] // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS if (!isStreamsGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. - requestHelper.sendMaybeThrottle(request, streamsInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + requestHelper.sendMaybeThrottle(request, streamsGroupInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) - } else if (!authHelper.authorize(request.context, READ, GROUP, streamsInitializeRequest.data.groupId)) { - requestHelper.sendMaybeThrottle(request, streamsInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + } else if (!authHelper.authorize(request.context, READ, GROUP, streamsGroupInitializeRequest.data.groupId)) { + requestHelper.sendMaybeThrottle(request, streamsGroupInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) } else { - groupCoordinator.streamsInitialize( + groupCoordinator.streamsGroupInitialize( request.context, - streamsInitializeRequest.data, + streamsGroupInitializeRequest.data, ).handle[Unit] { (response, exception) => if (exception != null) { - requestHelper.sendMaybeThrottle(request, streamsInitializeRequest.getErrorResponse(exception)) + requestHelper.sendMaybeThrottle(request, streamsGroupInitializeRequest.getErrorResponse(exception)) } else { requestHelper.sendMaybeThrottle(request, new StreamsGroupInitializeResponse(response)) } @@ -3917,24 +3917,24 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleStreamsHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { - val streamsHeartbeatRequest = request.body[StreamsGroupHeartbeatRequest] + def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { + val streamsGroupHeartbeatRequest = request.body[StreamsGroupHeartbeatRequest] if (!isStreamsGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. - requestHelper.sendMaybeThrottle(request, streamsHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) - } else if (!authHelper.authorize(request.context, READ, GROUP, streamsHeartbeatRequest.data.groupId)) { - requestHelper.sendMaybeThrottle(request, streamsHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + } else if (!authHelper.authorize(request.context, READ, GROUP, streamsGroupHeartbeatRequest.data.groupId)) { + requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) } else { - groupCoordinator.streamsHeartbeat( + groupCoordinator.streamsGroupHeartbeat( request.context, - streamsHeartbeatRequest.data, + streamsGroupHeartbeatRequest.data, ).handle[Unit] { (response, exception) => if (exception != null) { - requestHelper.sendMaybeThrottle(request, streamsHeartbeatRequest.getErrorResponse(exception)) + requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(exception)) } else { requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(response)) } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index d2c2a2ac487..68959acc23a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -79,7 +79,7 @@ class GroupCoordinatorAdapterTest { } @Test - def testStreamsInitialize(): Unit = { + def testStreamsGroupInitialize(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) @@ -87,7 +87,7 @@ class GroupCoordinatorAdapterTest { val request = new StreamsGroupInitializeRequestData() .setGroupId("group") - val future = adapter.streamsInitialize(ctx, request) + val future = adapter.streamsGroupInitialize(ctx, request) assertTrue(future.isDone) assertTrue(future.isCompletedExceptionally) @@ -95,7 +95,7 @@ class GroupCoordinatorAdapterTest { } @Test - def testStreamsHeartbeat(): Unit = { + def testStreamsGroupHeartbeat(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) @@ -103,7 +103,7 @@ class GroupCoordinatorAdapterTest { val request = new StreamsGroupHeartbeatRequestData() .setGroupId("group") - val future = adapter.streamsHeartbeat(ctx, request) + val future = adapter.streamsGroupHeartbeat(ctx, request) assertTrue(future.isDone) assertTrue(future.isCompletedExceptionally) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 202149fe747..1241287965f 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -11243,7 +11243,7 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(new StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, true).build()) val future = new CompletableFuture[StreamsGroupInitializeResponseData]() - when(groupCoordinator.streamsInitialize( + when(groupCoordinator.streamsGroupInitialize( requestChannelRequest.context, streamsGroupInitializeRequest )).thenReturn(future) @@ -11269,7 +11269,7 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(new StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, true).build()) val future = new CompletableFuture[StreamsGroupInitializeResponseData]() - when(groupCoordinator.streamsInitialize( + when(groupCoordinator.streamsGroupInitialize( requestChannelRequest.context, streamsGroupInitializeRequest )).thenReturn(future) @@ -11316,7 +11316,7 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) val future = new CompletableFuture[StreamsGroupHeartbeatResponseData]() - when(groupCoordinator.streamsHeartbeat( + when(groupCoordinator.streamsGroupHeartbeat( requestChannelRequest.context, streamsGroupHeartbeatRequest )).thenReturn(future) @@ -11343,7 +11343,7 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) val future = new CompletableFuture[StreamsGroupHeartbeatResponseData]() - when(groupCoordinator.streamsHeartbeat( + when(groupCoordinator.streamsGroupHeartbeat( requestChannelRequest.context, streamsGroupHeartbeatRequest )).thenReturn(future) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index cea5da08608..7c78db21761 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -91,12 +91,12 @@ public interface GroupCoordinator { * Initialize a Streams Group. * * @param context The request context. - * @param request The StreamsHeartbeatRequest data. + * @param request The StreamsGroupInitializeRequest data. * * @return A future yielding the response. * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ - CompletableFuture<StreamsGroupInitializeResponseData> streamsInitialize( + CompletableFuture<StreamsGroupInitializeResponseData> streamsGroupInitialize( RequestContext context, StreamsGroupInitializeRequestData request ); @@ -105,12 +105,12 @@ public interface GroupCoordinator { * Heartbeat to a Streams Group. * * @param context The request context. - * @param request The StreamsHeartbeatResponseData data. + * @param request The StreamsGroupHeartbeatResponseData data. * * @return A future yielding the response. * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ - CompletableFuture<StreamsGroupHeartbeatResponseData> streamsHeartbeat( + CompletableFuture<StreamsGroupHeartbeatResponseData> streamsGroupHeartbeat( RequestContext context, StreamsGroupHeartbeatRequestData request ); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 5d632528961..c6631945add 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -351,10 +351,10 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#streamsInitialize(RequestContext, org.apache.kafka.common.message.StreamsGroupInitializeRequestData)}. + * See {@link GroupCoordinator#streamsGroupInitialize(RequestContext, org.apache.kafka.common.message.StreamsGroupInitializeRequestData)}. */ @Override - public CompletableFuture<StreamsGroupInitializeResponseData> streamsInitialize( + public CompletableFuture<StreamsGroupInitializeResponseData> streamsGroupInitialize( RequestContext context, StreamsGroupInitializeRequestData request ) { @@ -368,7 +368,7 @@ public class GroupCoordinatorService implements GroupCoordinator { "streams-group-initialize", topicPartitionFor(request.groupId()), Duration.ofMillis(config.offsetCommitTimeoutMs()), - coordinator -> coordinator.streamsInitialize(context, request) + coordinator -> coordinator.streamsGroupInitialize(context, request) ).exceptionally(exception -> handleOperationException( "streams-group-initialize", request, @@ -380,10 +380,10 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#streamsHeartbeat(RequestContext, org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData)}. + * See {@link GroupCoordinator#streamsGroupHeartbeat(RequestContext, org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData)}. */ @Override - public CompletableFuture<StreamsGroupHeartbeatResponseData> streamsHeartbeat( + public CompletableFuture<StreamsGroupHeartbeatResponseData> streamsGroupHeartbeat( RequestContext context, StreamsGroupHeartbeatRequestData request ) { @@ -397,7 +397,7 @@ public class GroupCoordinatorService implements GroupCoordinator { "streams-heartbeat", topicPartitionFor(request.groupId()), Duration.ofMillis(config.offsetCommitTimeoutMs()), - coordinator -> coordinator.streamsHeartbeat(context, request) + coordinator -> coordinator.streamsGroupHeartbeat(context, request) ).exceptionally(exception -> handleOperationException( "streams-heartbeat", request, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index cb2833f7029..225b56f58e1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -107,7 +107,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; -import org.apache.kafka.coordinator.group.taskassignor.MockAssignor; import org.apache.kafka.coordinator.group.taskassignor.StickyTaskAssignor; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -391,35 +390,35 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord } /** - * Handles a StreamsInitialize request. + * Handles a StreamsGroupInitialize request. * * @param context The request context. - * @param request The actual StreamsInitialize request. + * @param request The actual StreamsGroupInitialize request. * - * @return A Result containing the StreamsInitialize response and + * @return A Result containing the StreamsGroupInitialize response and * a list of records to update the state machine. */ - public CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsInitialize( + public CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsGroupInitialize( RequestContext context, StreamsGroupInitializeRequestData request ) { - return groupMetadataManager.streamsInitialize(context, request); + return groupMetadataManager.streamsGroupInitialize(context, request); } /** - * Handles a StreamsHeartbeat request. + * Handles a StreamsGroupHeartbeat request. * * @param context The request context. - * @param request The actual StreamsHeartbeat request. + * @param request The actual StreamsGroupHeartbeat request. * - * @return A Result containing the StreamsHeartbeat response and + * @return A Result containing the StreamsGroupHeartbeat response and * a list of records to update the state machine. */ - public CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> streamsHeartbeat( + public CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> streamsGroupHeartbeat( RequestContext context, StreamsGroupHeartbeatRequestData request ) { - return groupMetadataManager.streamsHeartbeat(context, request); + return groupMetadataManager.streamsGroupHeartbeat(context, request); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 3e51e4b798a..40c9e652a1b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1672,7 +1672,7 @@ public class GroupMetadataManager { * @throws InvalidRequestException if the request is not valid. * @throws UnsupportedAssignorException if the assignor is not supported. */ - private void throwIfStreamsInitializeRequestIsInvalid( + private void throwIfStreamsGroupInitializeRequestIsInvalid( StreamsGroupInitializeRequestData request ) throws InvalidRequestException, UnsupportedAssignorException { throwIfEmptyString(request.groupId(), "GroupId can't be empty."); @@ -1694,7 +1694,7 @@ public class GroupMetadataManager { * @throws InvalidRequestException if the request is not valid. * @throws UnsupportedAssignorException if the assignor is not supported. */ - private void throwIfStreamsHeartbeatRequestIsInvalid( + private void throwIfStreamsGroupHeartbeatRequestIsInvalid( StreamsGroupHeartbeatRequestData request ) throws InvalidRequestException, UnsupportedAssignorException { throwIfEmptyString(request.groupId(), "GroupId can't be empty."); @@ -2243,9 +2243,9 @@ public class GroupMetadataManager { * @param ownedWarmupTasks The list of owned warmup tasks from the request or null. * @param userEndpoint * @param clientTags - * @return A Result containing the StreamsHeartbeat response and a list of records to update the state machine. + * @return A Result containing the StreamsGroupHeartbeat response and a list of records to update the state machine. */ - private CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> streamsHeartbeat( + private CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> streamsGroupHeartbeat( String groupId, String memberId, int memberEpoch, @@ -2422,15 +2422,15 @@ public class GroupMetadataManager { || hasAssignedStandbyTasksChanged(member, updatedMember) || hasAssignedWarmupTasksChanged(member, updatedMember) ) { - response.setActiveTasks(createStreamsHeartbeatResponseTaskIds(updatedMember.assignedActiveTasks())); - response.setStandbyTasks(createStreamsHeartbeatResponseTaskIds(updatedMember.assignedStandbyTasks())); - response.setWarmupTasks(createStreamsHeartbeatResponseTaskIds(updatedMember.assignedWarmupTasks())); + response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedActiveTasks())); + response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedStandbyTasks())); + response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedWarmupTasks())); } return new CoordinatorResult<>(records, response); } - private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsHeartbeatResponseTaskIds(final Map<String, Set<Integer>> taskIds) { + private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIds(final Map<String, Set<Integer>> taskIds) { return taskIds.entrySet().stream() .map(entry -> new StreamsGroupHeartbeatResponseData.TaskIds() .setSubtopology(entry.getKey()) @@ -2655,10 +2655,10 @@ public class GroupMetadataManager { * * @param groupId The group id from the request. * @param subtopologies The list of subtopologies - * @return A Result containing the StreamsInitialize response and a list of records to update the state machine. + * @return A Result containing the StreamsGroupInitialize response and a list of records to update the state machine. */ - private CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsInitialize(String groupId, - List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) + private CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsGroupInitialize(String groupId, + List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) throws ApiException { final List<CoordinatorRecord> records = new ArrayList<>(); @@ -3656,13 +3656,13 @@ public class GroupMetadataManager { * @param targetAssignmentEpoch The target assignment epoch. * @param targetAssignment The target assignment. * @param ownedActiveTasks The list of active tasks owned by the member. This - * is reported in the StreamsHeartbeat API and + * is reported in the StreamsGroupHeartbeat API and * it could be null if not provided. * @param ownedStandbyTasks The list of standby owned by the member. This - * is reported in the StreamsHeartbeat API and + * is reported in the StreamsGroupHeartbeat API and * it could be null if not provided. * @param ownedWarmupTasks The list of warmup tasks owned by the member. This - * is reported in the StreamsHeartbeat API and + * is reported in the StreamsGroupHeartbeat API and * it could be null if not provided. * @param records The list to accumulate any new records. * @return The received member if no changes have been made; or a new @@ -3950,7 +3950,7 @@ public class GroupMetadataManager { * @param memberId The member id from the request. * @param memberEpoch The member epoch from the request. * - * @return A Result containing the StreamsHeartbeat response and + * @return A Result containing the StreamsGroupHeartbeat response and * a list of records to update the state machine. */ private CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> streamsGroupLeave( @@ -4777,40 +4777,40 @@ public class GroupMetadataManager { } /** - * Handles a StreamsInitialize request. + * Handles a StreamsGroupInitialize request. * * @param context The request context. - * @param request The actual StreamsInitialize request. + * @param request The actual StreamsGroupInitialize request. * - * @return A Result containing the StreamsInitialize response and + * @return A Result containing the StreamsGroupInitialize response and * a list of records to update the state machine. */ - public CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsInitialize( + public CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsGroupInitialize( RequestContext context, StreamsGroupInitializeRequestData request ) throws ApiException { - throwIfStreamsInitializeRequestIsInvalid(request); + throwIfStreamsGroupInitializeRequestIsInvalid(request); - return streamsInitialize( + return streamsGroupInitialize( request.groupId(), request.topology() ); } /** - * Handles a StreamsHeartbeat request. + * Handles a StreamsGroupHeartbeat request. * * @param context The request context. - * @param request The actual StreamsHeartbeat request. + * @param request The actual StreamsGroupHeartbeat request. * - * @return A Result containing the StreamsHeartbeat response and + * @return A Result containing the StreamsGroupHeartbeat response and * a list of records to update the state machine. */ - public CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> streamsHeartbeat( + public CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> streamsGroupHeartbeat( RequestContext context, StreamsGroupHeartbeatRequestData request ) throws ApiException { - throwIfStreamsHeartbeatRequestIsInvalid(request); + throwIfStreamsGroupHeartbeatRequestIsInvalid(request); if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { // -2 means that a static member wants to leave the group. @@ -4821,7 +4821,7 @@ public class GroupMetadataManager { request.memberEpoch() ); } else { - return streamsHeartbeat( + return streamsGroupHeartbeat( request.groupId(), request.memberId(), request.memberEpoch(), diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java index f7690381708..4ff347c66b7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java @@ -56,17 +56,17 @@ public class CurrentAssignmentBuilder { private BiFunction<String, Integer, Integer> currentTaskEpoch; /** - * The active tasks owned by the streams. This is directly provided by the member in the StreamsHeartbeat request. + * The active tasks owned by the streams. This is directly provided by the member in the StreamsGroupHeartbeat request. */ private List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks; /** - * The standby tasks owned by the streams. This is directly provided by the member in the StreamsHeartbeat request. + * The standby tasks owned by the streams. This is directly provided by the member in the StreamsGroupHeartbeat request. */ private List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks; /** - * The warmup tasks owned by the streams. This is directly provided by the member in the StreamsHeartbeat request. + * The warmup tasks owned by the streams. This is directly provided by the member in the StreamsGroupHeartbeat request. */ private List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks; @@ -110,7 +110,7 @@ public class CurrentAssignmentBuilder { } /** - * Sets the active tasks currently owned by the member. This comes directly from the last StreamsHeartbeat request. This is used to + * Sets the active tasks currently owned by the member. This comes directly from the last StreamsGroupHeartbeat request. This is used to * determine if the member has revoked the necessary tasks. * * @param ownedActiveTasks A list of topic-tasks. @@ -124,7 +124,7 @@ public class CurrentAssignmentBuilder { } /** - * Sets the standby tasks currently owned by the member. This comes directly from the last StreamsHeartbeat request. This is used to + * Sets the standby tasks currently owned by the member. This comes directly from the last StreamsGroupHeartbeat request. This is used to * determine if the member has revoked the necessary tasks. * * @param ownedStandbyTasks A list of topic-tasks. @@ -138,7 +138,7 @@ public class CurrentAssignmentBuilder { } /** - * Sets the warmup tasks currently owned by the member. This comes directly from the last StreamsHeartbeat request. This is used to + * Sets the warmup tasks currently owned by the member. This comes directly from the last StreamsGroupHeartbeat request. This is used to * determine if the member has revoked the necessary tasks. * * @param ownedWarmupTasks A list of topic-tasks. @@ -178,7 +178,7 @@ public class CurrentAssignmentBuilder { // When the member is in the UNREVOKED_TASKS state, we wait // until the member has revoked the necessary tasks. They are // considered revoked when they are not anymore reported in the - // owned tasks set in the StreamsHeartbeat API. + // owned tasks set in the StreamsGroupHeartbeat API. // If the member provides its owned tasks. We verify if it still // owns any of the revoked tasks. If it does, we cannot progress. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index dfd898c225c..7505aa8c791 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -267,7 +267,7 @@ public class GroupCoordinatorServiceTest { } @Test - public void testStreamsInitializeWhenNotStarted() throws ExecutionException, InterruptedException { + public void testStreamsGroupInitializeWhenNotStarted() throws ExecutionException, InterruptedException { CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), @@ -280,7 +280,7 @@ public class GroupCoordinatorServiceTest { StreamsGroupInitializeRequestData request = new StreamsGroupInitializeRequestData() .setGroupId("foo"); - CompletableFuture<StreamsGroupInitializeResponseData> future = service.streamsInitialize( + CompletableFuture<StreamsGroupInitializeResponseData> future = service.streamsGroupInitialize( requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE), request ); @@ -293,7 +293,7 @@ public class GroupCoordinatorServiceTest { } @Test - public void testStreamsInitialize() throws ExecutionException, InterruptedException, TimeoutException { + public void testStreamsGroupInitialize() throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), @@ -317,7 +317,7 @@ public class GroupCoordinatorServiceTest { new StreamsGroupInitializeResponseData() )); - CompletableFuture<StreamsGroupInitializeResponseData> future = service.streamsInitialize( + CompletableFuture<StreamsGroupInitializeResponseData> future = service.streamsGroupInitialize( requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE), request ); @@ -325,7 +325,7 @@ public class GroupCoordinatorServiceTest { assertEquals(new StreamsGroupInitializeResponseData(), future.get(5, TimeUnit.SECONDS)); } - private static Stream<Arguments> testStreamsInitializeWithExceptionSource() { + private static Stream<Arguments> testStreamsGroupInitializeWithExceptionSource() { return Stream.of( Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), @@ -341,8 +341,8 @@ public class GroupCoordinatorServiceTest { } @ParameterizedTest - @MethodSource("testStreamsInitializeWithExceptionSource") - public void testStreamsInitializeWithException( + @MethodSource("testStreamsGroupInitializeWithExceptionSource") + public void testStreamsGroupInitializeWithException( Throwable exception, short expectedErrorCode, String expectedErrorMessage @@ -368,7 +368,7 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(exception)); - CompletableFuture<StreamsGroupInitializeResponseData> future = service.streamsInitialize( + CompletableFuture<StreamsGroupInitializeResponseData> future = service.streamsGroupInitialize( requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE), request ); @@ -382,7 +382,7 @@ public class GroupCoordinatorServiceTest { } @Test - public void testStreamsHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException { + public void testStreamsGroupHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException { CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), @@ -395,7 +395,7 @@ public class GroupCoordinatorServiceTest { StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData() .setGroupId("foo"); - CompletableFuture<StreamsGroupHeartbeatResponseData> future = service.streamsHeartbeat( + CompletableFuture<StreamsGroupHeartbeatResponseData> future = service.streamsGroupHeartbeat( requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT), request ); @@ -408,7 +408,7 @@ public class GroupCoordinatorServiceTest { } @Test - public void testStreamsHeartbeat() throws ExecutionException, InterruptedException, TimeoutException { + public void testStreamsGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), @@ -432,7 +432,7 @@ public class GroupCoordinatorServiceTest { new StreamsGroupHeartbeatResponseData() )); - CompletableFuture<StreamsGroupHeartbeatResponseData> future = service.streamsHeartbeat( + CompletableFuture<StreamsGroupHeartbeatResponseData> future = service.streamsGroupHeartbeat( requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT), request ); @@ -440,7 +440,7 @@ public class GroupCoordinatorServiceTest { assertEquals(new StreamsGroupHeartbeatResponseData(), future.get(5, TimeUnit.SECONDS)); } - private static Stream<Arguments> testStreamsHeartbeatWithExceptionSource() { + private static Stream<Arguments> testStreamsGroupHeartbeatWithExceptionSource() { return Stream.of( Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), @@ -455,8 +455,8 @@ public class GroupCoordinatorServiceTest { } @ParameterizedTest - @MethodSource("testStreamsHeartbeatWithExceptionSource") - public void testStreamsHeartbeatWithException( + @MethodSource("testStreamsGroupHeartbeatWithExceptionSource") + public void testStreamsGroupHeartbeatWithException( Throwable exception, short expectedErrorCode, String expectedErrorMessage @@ -482,7 +482,7 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(exception)); - CompletableFuture<StreamsGroupHeartbeatResponseData> future = service.streamsHeartbeat( + CompletableFuture<StreamsGroupHeartbeatResponseData> future = service.streamsGroupHeartbeat( requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT), request ); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index e7fffb90c8c..263b2ab76ea 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -136,7 +136,7 @@ public class GroupCoordinatorShardTest { } @Test - public void testStreamsInitialize() { + public void testStreamsGroupInitialize() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); @@ -159,16 +159,16 @@ public class GroupCoordinatorShardTest { new StreamsGroupInitializeResponseData() ); - when(groupMetadataManager.streamsInitialize( + when(groupMetadataManager.streamsGroupInitialize( context, request )).thenReturn(result); - assertEquals(result, coordinator.streamsInitialize(context, request)); + assertEquals(result, coordinator.streamsGroupInitialize(context, request)); } @Test - public void testStreamsHeartbeat() { + public void testStreamsGroupHeartbeat() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); @@ -191,12 +191,12 @@ public class GroupCoordinatorShardTest { new StreamsGroupHeartbeatResponseData() ); - when(groupMetadataManager.streamsHeartbeat( + when(groupMetadataManager.streamsGroupHeartbeat( context, request )).thenReturn(result); - assertEquals(result, coordinator.streamsHeartbeat(context, request)); + assertEquals(result, coordinator.streamsGroupHeartbeat(context, request)); } @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index bbbfe346539..05a94ea49a6 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -272,25 +272,25 @@ public class GroupMetadataManagerTest { Exception ex; // GroupId must be present in all requests. - ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData())); assertEquals("GroupId can't be empty.", ex.getMessage()); // GroupId can't be all whitespaces. - ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId(" "))); assertEquals("GroupId can't be empty.", ex.getMessage()); // RebalanceTimeoutMs must be present in the first request (epoch == 0). - ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId("foo") .setMemberEpoch(0))); assertEquals("RebalanceTimeoutMs must be provided in first request.", ex.getMessage()); // ActiveTasks must be present and empty in the first request (epoch == 0). - ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId("foo") .setMemberEpoch(0) @@ -300,7 +300,7 @@ public class GroupMetadataManagerTest { assertEquals("ActiveTasks must be empty when (re-)joining.", ex.getMessage()); // StandbyTasks must be present and empty in the first request (epoch == 0). - ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId("foo") .setMemberEpoch(0) @@ -310,7 +310,7 @@ public class GroupMetadataManagerTest { assertEquals("StandbyTasks must be empty when (re-)joining.", ex.getMessage()); // WarmupTasks must be present and empty in the first request (epoch == 0). - ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId("foo") .setMemberEpoch(0) @@ -321,14 +321,14 @@ public class GroupMetadataManagerTest { // MemberId must be non-empty in all requests except for the first one where it // could be empty (epoch != 0). - ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId("foo") .setMemberEpoch(1))); assertEquals("MemberId can't be empty.", ex.getMessage()); // InstanceId must be non-empty if provided in all requests. - ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId("foo") .setMemberId(Uuid.randomUuid().toString()) @@ -337,7 +337,7 @@ public class GroupMetadataManagerTest { assertEquals("InstanceId can't be empty.", ex.getMessage()); // RackId must be non-empty if provided in all requests. - ex = assertThrows(InvalidRequestException.class, () -> context.streamsHeartbeat( + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId("foo") .setMemberId(Uuid.randomUuid().toString()) @@ -346,8 +346,8 @@ public class GroupMetadataManagerTest { assertEquals("RackId can't be empty.", ex.getMessage()); // TODO: // ServerAssignor must exist if provided in all requests. -// ex = assertThrows(UnsupportedAssignorException.class, () -> context.streamsHeartbeat( -// new StreamsHeartbeatRequestData() +// ex = assertThrows(UnsupportedAssignorException.class, () -> context.streamsGroupHeartbeat( +// new StreamsGroupHeartbeatRequestData() // .setGroupId("foo") // .setMemberId(Uuid.randomUuid().toString()) // .setMemberEpoch(1) @@ -16016,15 +16016,15 @@ public class GroupMetadataManagerTest { // ) // ) // ); -// CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> result = -// context.streamsInitialize( -// new StreamsInitializeRequestData() +// CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = +// context.streamsGroupInitialize( +// new streamsGroupInitializeRequestData() // .setGroupId(groupId) // .setTopology(topology) // ); // // assertEquals( -// new StreamsInitializeResponseData(), +// new StreamsGroupInitializeResponseData(), // result.response() // ); // @@ -16086,15 +16086,15 @@ public class GroupMetadataManagerTest { // ) // ) // ); -// CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> result = -// context.streamsInitialize( -// new StreamsInitializeRequestData() +// CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = +// context.streamsGroupInitialize( +// new streamsGroupInitializeRequestData() // .setGroupId(groupId) // .setTopology(topology) // ); // // assertEquals( -// new StreamsInitializeResponseData() +// new StreamsGroupInitializeResponseData() // .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code()) // .setErrorMessage("Internal topics changelog do not exist."), // result.response() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 45654736316..d77f5414ee9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -727,7 +727,7 @@ public class GroupMetadataManagerTestContext { } - public CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsInitialize( + public CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsGroupInitialize( StreamsGroupInitializeRequestData request ) { RequestContext context = new RequestContext( @@ -746,7 +746,7 @@ public class GroupMetadataManagerTestContext { false ); - CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = groupMetadataManager.streamsInitialize( + CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = groupMetadataManager.streamsGroupInitialize( context, request ); @@ -757,7 +757,7 @@ public class GroupMetadataManagerTestContext { return result; } - public CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> streamsHeartbeat( + public CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> streamsGroupHeartbeat( StreamsGroupHeartbeatRequestData request ) { RequestContext context = new RequestContext( @@ -776,7 +776,7 @@ public class GroupMetadataManagerTestContext { false ); - CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> result = groupMetadataManager.streamsHeartbeat( + CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> result = groupMetadataManager.streamsGroupHeartbeat( context, request );
