This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 93b72dba05cc682f849fb00110f15c1413fee563
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Aug 30 11:14:10 2024 +0200

    Rename streamsHeartbeatX and streamsInitializeX to streamsGroupX
---
 .../consumer/internals/RequestManagers.java        | 28 +++++------
 .../StreamsGroupHeartbeatRequestManager.java       | 38 +++++++--------
 ...a => StreamsGroupInitializeRequestManager.java} | 14 +++---
 .../StreamsGroupHeartbeatRequestManagerTest.java   |  6 +--
 .../StreamsGroupInitializeRequestManagerTest.java  | 16 +++----
 .../group/GroupCoordinatorAdapter.scala            |  4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 38 +++++++--------
 .../group/GroupCoordinatorAdapterTest.scala        |  8 ++--
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  8 ++--
 .../kafka/coordinator/group/GroupCoordinator.java  |  8 ++--
 .../coordinator/group/GroupCoordinatorService.java | 12 ++---
 .../coordinator/group/GroupCoordinatorShard.java   | 21 ++++-----
 .../coordinator/group/GroupMetadataManager.java    | 54 +++++++++++-----------
 .../group/streams/CurrentAssignmentBuilder.java    | 14 +++---
 .../group/GroupCoordinatorServiceTest.java         | 32 ++++++-------
 .../group/GroupCoordinatorShardTest.java           | 12 ++---
 .../group/GroupMetadataManagerTest.java            | 38 +++++++--------
 .../group/GroupMetadataManagerTestContext.java     |  8 ++--
 18 files changed, 179 insertions(+), 180 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 7cf4a3e67f6..4f3ec1b2398 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -58,8 +58,8 @@ public class RequestManagers implements Closeable {
     public final TopicMetadataRequestManager topicMetadataRequestManager;
     public final FetchRequestManager fetchRequestManager;
     public final Optional<ShareConsumeRequestManager> 
shareConsumeRequestManager;
-    public final Optional<StreamsGroupHeartbeatRequestManager> 
streamsHeartbeatRequestManager;
-    public final Optional<StreamsInitializeRequestManager> 
streamsInitializeRequestManager;
+    public final Optional<StreamsGroupHeartbeatRequestManager> 
streamsGroupHeartbeatRequestManager;
+    public final Optional<StreamsGroupInitializeRequestManager> 
streamsGroupInitializeRequestManager;
     private final List<Optional<? extends RequestManager>> entries;
     private final IdempotentCloser closer = new IdempotentCloser();
 
@@ -71,8 +71,8 @@ public class RequestManagers implements Closeable {
                            Optional<CommitRequestManager> commitRequestManager,
                            Optional<ConsumerHeartbeatRequestManager> 
heartbeatRequestManager,
                            Optional<ConsumerMembershipManager> 
membershipManager,
-                           Optional<StreamsGroupHeartbeatRequestManager> 
streamsHeartbeatRequestManager,
-                           Optional<StreamsInitializeRequestManager> 
streamsInitializeRequestManager) {
+                           Optional<StreamsGroupHeartbeatRequestManager> 
streamsGroupHeartbeatRequestManager,
+                           Optional<StreamsGroupInitializeRequestManager> 
streamsGroupInitializeRequestManager) {
         this.log = logContext.logger(RequestManagers.class);
         this.offsetsRequestManager = requireNonNull(offsetsRequestManager, 
"OffsetsRequestManager cannot be null");
         this.coordinatorRequestManager = coordinatorRequestManager;
@@ -84,8 +84,8 @@ public class RequestManagers implements Closeable {
         this.shareHeartbeatRequestManager = Optional.empty();
         this.consumerMembershipManager = membershipManager;
         this.shareMembershipManager = Optional.empty();
-        this.streamsHeartbeatRequestManager = streamsHeartbeatRequestManager;
-        this.streamsInitializeRequestManager = streamsInitializeRequestManager;
+        this.streamsGroupHeartbeatRequestManager = 
streamsGroupHeartbeatRequestManager;
+        this.streamsGroupInitializeRequestManager = 
streamsGroupInitializeRequestManager;
 
         List<Optional<? extends RequestManager>> list = new ArrayList<>();
         list.add(coordinatorRequestManager);
@@ -95,8 +95,8 @@ public class RequestManagers implements Closeable {
         list.add(Optional.of(offsetsRequestManager));
         list.add(Optional.of(topicMetadataRequestManager));
         list.add(Optional.of(fetchRequestManager));
-        list.add(streamsHeartbeatRequestManager);
-        list.add(streamsInitializeRequestManager);
+        list.add(streamsGroupHeartbeatRequestManager);
+        list.add(streamsGroupInitializeRequestManager);
         entries = Collections.unmodifiableList(list);
     }
 
@@ -113,8 +113,8 @@ public class RequestManagers implements Closeable {
         this.shareHeartbeatRequestManager = shareHeartbeatRequestManager;
         this.consumerMembershipManager = Optional.empty();
         this.shareMembershipManager = shareMembershipManager;
-        this.streamsHeartbeatRequestManager = Optional.empty();
-        this.streamsInitializeRequestManager = Optional.empty();
+        this.streamsGroupHeartbeatRequestManager = Optional.empty();
+        this.streamsGroupInitializeRequestManager = Optional.empty();
         this.offsetsRequestManager = null;
         this.topicMetadataRequestManager = null;
         this.fetchRequestManager = null;
@@ -199,7 +199,7 @@ public class RequestManagers implements Closeable {
                 CoordinatorRequestManager coordinator = null;
                 CommitRequestManager commitRequestManager = null;
                 StreamsGroupHeartbeatRequestManager 
streamsGroupHeartbeatRequestManager = null;
-                StreamsInitializeRequestManager 
streamsInitializeRequestManager = null;
+                StreamsGroupInitializeRequestManager 
streamsGroupInitializeRequestManager = null;
 
                 if (groupRebalanceConfig != null && 
groupRebalanceConfig.groupId != null) {
                     Optional<String> serverAssignor = 
Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
@@ -245,7 +245,7 @@ public class RequestManagers implements Closeable {
                     
membershipManager.registerStateListener(applicationThreadMemberStateListener);
 
                     if (streamsInstanceMetadata.isPresent()) {
-                        streamsInitializeRequestManager = new 
StreamsInitializeRequestManager(
+                        streamsGroupInitializeRequestManager = new 
StreamsGroupInitializeRequestManager(
                             logContext,
                             groupRebalanceConfig.groupId,
                             streamsInstanceMetadata.get(),
@@ -255,7 +255,7 @@ public class RequestManagers implements Closeable {
                             time,
                             config,
                             coordinator,
-                            streamsInitializeRequestManager,
+                            streamsGroupInitializeRequestManager,
                             membershipManager,
                             backgroundEventHandler,
                             metrics,
@@ -297,7 +297,7 @@ public class RequestManagers implements Closeable {
                         Optional.ofNullable(heartbeatRequestManager),
                         Optional.ofNullable(membershipManager),
                         
Optional.ofNullable(streamsGroupHeartbeatRequestManager),
-                        Optional.ofNullable(streamsInitializeRequestManager)
+                        
Optional.ofNullable(streamsGroupInitializeRequestManager)
                 );
             }
         };
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 2b3b7c8f248..ba0a289f187 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -63,7 +63,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
     private final ConsumerMembershipManager membershipManager;
 
-    private final StreamsInitializeRequestManager 
streamsInitializeRequestManager;
+    private final StreamsGroupInitializeRequestManager 
streamsGroupInitializeRequestManager;
 
     private final BackgroundEventHandler backgroundEventHandler;
 
@@ -82,7 +82,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         final Time time,
         final ConsumerConfig config,
         final CoordinatorRequestManager coordinatorRequestManager,
-        final StreamsInitializeRequestManager streamsInitializeRequestManager,
+        final StreamsGroupInitializeRequestManager 
streamsGroupInitializeRequestManager,
         final ConsumerMembershipManager membershipManager,
         final BackgroundEventHandler backgroundEventHandler,
         final Metrics metrics,
@@ -92,7 +92,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         this.coordinatorRequestManager = coordinatorRequestManager;
         this.logger = logContext.logger(getClass());
         this.membershipManager = membershipManager;
-        this.streamsInitializeRequestManager = streamsInitializeRequestManager;
+        this.streamsGroupInitializeRequestManager = 
streamsGroupInitializeRequestManager;
         this.backgroundEventHandler = backgroundEventHandler;
         int maxPollIntervalMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
         long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -189,12 +189,12 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
                 Errors error =
                     Errors.forCode(((StreamsGroupHeartbeatResponse) 
response.responseBody()).data().errorCode());
                 if (error == Errors.NONE) {
-                    logger.debug("StreamsHeartbeat responded successfully: 
{}", response);
+                    logger.debug("StreamsGroupHeartbeat responded 
successfully: {}", response);
                 } else {
-                    logger.error("StreamsHeartbeat failed because of {}: {}", 
error, response);
+                    logger.error("StreamsGroupHeartbeat failed because of {}: 
{}", error, response);
                 }
             } else {
-                logger.error("StreamsHeartbeat failed because of unexpected 
exception.", exception);
+                logger.error("StreamsGroupHeartbeat failed because of 
unexpected exception.", exception);
             }
         });
     }
@@ -203,13 +203,13 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
         this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
         this.heartbeatState.reset();
         if (exception instanceof RetriableException) {
-            String message = String.format("StreamsHeartbeatRequest failed 
because of the retriable exception. " +
+            String message = String.format("StreamsGroupHeartbeatRequest 
failed because of the retriable exception. " +
                     "Will retry in %s ms: %s",
                 heartbeatRequestState.remainingBackoffMs(responseTimeMs),
                 exception.getMessage());
             logger.debug(message);
         } else {
-            logger.error("StreamsHeartbeatRequest failed due to fatal error", 
exception);
+            logger.error("StreamsGroupHeartbeatRequest failed due to fatal 
error", exception);
             handleFatalFailure(exception);
         }
     }
@@ -230,7 +230,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         heartbeatRequestState.resetTimer();
 
         if (data.shouldInitializeTopology()) {
-            streamsInitializeRequestManager.initialize();
+            streamsGroupInitializeRequestManager.initialize();
         }
         if (data.partitionsByUserEndpoint() != null) {
             streamsInterface.partitionsByHost.set(convertHostInfoMap(data));
@@ -340,7 +340,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         switch (error) {
             case NOT_COORDINATOR:
                 // the manager should retry immediately when the coordinator 
node becomes available again
-                message = String.format("StreamsHeartbeatRequest failed 
because the group coordinator %s is incorrect. " +
+                message = String.format("StreamsGroupHeartbeatRequest failed 
because the group coordinator %s is incorrect. " +
                         "Will attempt to find the coordinator again and retry",
                     coordinatorRequestManager.coordinator());
                 logInfo(message, response, currentTimeMs);
@@ -350,7 +350,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
                 break;
 
             case COORDINATOR_NOT_AVAILABLE:
-                message = String.format("StreamsHeartbeatRequest failed 
because the group coordinator %s is not available. " +
+                message = String.format("StreamsGroupHeartbeatRequest failed 
because the group coordinator %s is not available. " +
                         "Will attempt to find the coordinator again and retry",
                     coordinatorRequestManager.coordinator());
                 logInfo(message, response, currentTimeMs);
@@ -361,7 +361,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // the manager will backoff and retry
-                message = String.format("StreamsHeartbeatRequest failed 
because the group coordinator %s is still loading." +
+                message = String.format("StreamsGroupHeartbeatRequest failed 
because the group coordinator %s is still loading." +
                         "Will retry",
                     coordinatorRequestManager.coordinator());
                 logInfo(message, response, currentTimeMs);
@@ -370,12 +370,12 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
             case GROUP_AUTHORIZATION_FAILED:
                 GroupAuthorizationException exception =
                     
GroupAuthorizationException.forGroupId(membershipManager.groupId());
-                logger.error("StreamsHeartbeatRequest failed due to group 
authorization failure: {}", exception.getMessage());
+                logger.error("StreamsGroupHeartbeatRequest failed due to group 
authorization failure: {}", exception.getMessage());
                 handleFatalFailure(error.exception(exception.getMessage()));
                 break;
 
             case UNRELEASED_INSTANCE_ID:
-                logger.error("StreamsHeartbeatRequest failed due to the 
instance id {} was not released: {}",
+                logger.error("StreamsGroupHeartbeatRequest failed due to the 
instance id {} was not released: {}",
                     membershipManager.groupInstanceId().orElse("null"), 
errorMessage);
                 
handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage));
                 break;
@@ -384,12 +384,12 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
             case GROUP_MAX_SIZE_REACHED:
             case UNSUPPORTED_ASSIGNOR:
             case UNSUPPORTED_VERSION:
-                logger.error("StreamsHeartbeatRequest failed due to error: 
{}", error);
+                logger.error("StreamsGroupHeartbeatRequest failed due to 
error: {}", error);
                 handleFatalFailure(error.exception(errorMessage));
                 break;
 
             case FENCED_MEMBER_EPOCH:
-                message = String.format("StreamsHeartbeatRequest failed for 
member %s because epoch %s is fenced.",
+                message = String.format("StreamsGroupHeartbeatRequest failed 
for member %s because epoch %s is fenced.",
                     membershipManager.memberId(), 
membershipManager.memberEpoch());
                 logInfo(message, response, currentTimeMs);
                 membershipManager.transitionToFenced();
@@ -398,7 +398,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
                 break;
 
             case UNKNOWN_MEMBER_ID:
-                message = String.format("StreamsHeartbeatRequest failed 
because member %s is unknown.",
+                message = String.format("StreamsGroupHeartbeatRequest failed 
because member %s is unknown.",
                     membershipManager.memberId());
                 logInfo(message, response, currentTimeMs);
                 membershipManager.transitionToFenced();
@@ -408,7 +408,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
             default:
                 // If the manager receives an unknown error - there could be a 
bug in the code or a new error code
-                logger.error("StreamsHeartbeatRequest failed due to unexpected 
error: {}", error);
+                logger.error("StreamsGroupHeartbeatRequest failed due to 
unexpected error: {}", error);
                 handleFatalFailure(error.exception(errorMessage));
                 break;
         }
@@ -604,7 +604,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
                 .collect(Collectors.toList());
         }
 
-        // Fields of StreamsHeartbeatRequest sent in the most recent request
+        // Fields of StreamsGroupHeartbeatRequest sent in the most recent 
request
         static class SentFields {
 
             private int rebalanceTimeoutMs = -1;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
similarity index 92%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
index c0d2d9f9124..e7f763bd969 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-public class StreamsInitializeRequestManager implements RequestManager {
+public class StreamsGroupInitializeRequestManager implements RequestManager {
 
     private final Logger logger;
     private final String groupId;
@@ -40,10 +40,10 @@ public class StreamsInitializeRequestManager implements 
RequestManager {
     private Optional<NetworkClientDelegate.UnsentRequest> unsentRequest = 
Optional.empty();
 
 
-    StreamsInitializeRequestManager(final LogContext logContext,
-                                    final String groupId,
-                                    final StreamsAssignmentInterface 
streamsAssignmentInterface,
-                                    final CoordinatorRequestManager 
coordinatorRequestManager) {
+    StreamsGroupInitializeRequestManager(final LogContext logContext,
+                                         final String groupId,
+                                         final StreamsAssignmentInterface 
streamsAssignmentInterface,
+                                         final CoordinatorRequestManager 
coordinatorRequestManager) {
         this.logger = logContext.logger(getClass());
         this.groupId = groupId;
         this.streamsAssignmentInterface = streamsAssignmentInterface;
@@ -70,11 +70,11 @@ public class StreamsInitializeRequestManager implements 
RequestManager {
         streamsGroupInitializeRequestData.setGroupId(groupId);
         final List<StreamsGroupInitializeRequestData.Subtopology> topology = 
getTopologyFromStreams();
         streamsGroupInitializeRequestData.setTopology(topology);
-        final StreamsGroupInitializeRequest.Builder 
streamsInitializeRequestBuilder = new StreamsGroupInitializeRequest.Builder(
+        final StreamsGroupInitializeRequest.Builder 
streamsGroupInitializeRequestBuilder = new 
StreamsGroupInitializeRequest.Builder(
             streamsGroupInitializeRequestData
         );
         return new NetworkClientDelegate.UnsentRequest(
-            streamsInitializeRequestBuilder,
+            streamsGroupInitializeRequestBuilder,
             coordinatorRequestManager.coordinator()
         );
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index aed1b4dafd7..b9fdb4c7722 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -86,7 +86,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
     private CoordinatorRequestManager coordinatorRequestManager;
 
     @Mock
-    private StreamsInitializeRequestManager streamsInitializeRequestManager;
+    private StreamsGroupInitializeRequestManager 
streamsGroupInitializeRequestManager;
 
     @Mock
     private ConsumerMembershipManager membershipManager;
@@ -141,7 +141,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
             time,
             config,
             coordinatorRequestManager,
-            streamsInitializeRequestManager,
+            streamsGroupInitializeRequestManager,
             membershipManager,
             backgroundEventHandler,
             metrics,
@@ -333,7 +333,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
 
         mockResponse(data);
 
-        verify(streamsInitializeRequestManager).initialize();
+        verify(streamsGroupInitializeRequestManager).initialize();
     }
 
     private void mockResponse(final StreamsGroupHeartbeatResponseData data) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
index dd708c8d79a..48108204474 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
@@ -51,20 +51,20 @@ class StreamsGroupInitializeRequestManagerTest {
     public void shouldPollEmptyResult() {
         final CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
         final StreamsAssignmentInterface streamsAssignmentInterface = 
mock(StreamsAssignmentInterface.class);
-        final StreamsInitializeRequestManager streamsInitializeRequestManager 
= new StreamsInitializeRequestManager(
+        final StreamsGroupInitializeRequestManager 
streamsGroupInitializeRequestManager = new StreamsGroupInitializeRequestManager(
             logContext,
             groupId,
             streamsAssignmentInterface,
             coordinatorRequestManager
         );
 
-        final NetworkClientDelegate.PollResult pollResult = 
streamsInitializeRequestManager.poll(0);
+        final NetworkClientDelegate.PollResult pollResult = 
streamsGroupInitializeRequestManager.poll(0);
 
         assertEquals(NetworkClientDelegate.PollResult.EMPTY, pollResult);
     }
 
     @Test
-    public void shouldPollStreamsInitializeRequest() {
+    public void shouldPollStreamsGroupInitializeRequest() {
         final Node node = mock(Node.class);
         final CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(node));
@@ -90,23 +90,23 @@ class StreamsGroupInitializeRequestManagerTest {
         when(streamsAssignmentInterface.subtopologyMap()).thenReturn(
             mkMap(mkEntry(subtopologyName1, subtopology1))
         );
-        final StreamsInitializeRequestManager streamsInitializeRequestManager 
= new StreamsInitializeRequestManager(
+        final StreamsGroupInitializeRequestManager 
streamsGroupInitializeRequestManager = new StreamsGroupInitializeRequestManager(
             logContext,
             groupId,
             streamsAssignmentInterface,
             coordinatorRequestManager
         );
 
-        streamsInitializeRequestManager.initialize();
-        final NetworkClientDelegate.PollResult pollResult = 
streamsInitializeRequestManager.poll(0);
+        streamsGroupInitializeRequestManager.initialize();
+        final NetworkClientDelegate.PollResult pollResult = 
streamsGroupInitializeRequestManager.poll(0);
 
         assertEquals(1, pollResult.unsentRequests.size());
         final NetworkClientDelegate.UnsentRequest unsentRequest = 
pollResult.unsentRequests.get(0);
         assertTrue(unsentRequest.node().isPresent());
         assertEquals(node, unsentRequest.node().get());
         assertEquals(ApiKeys.STREAMS_GROUP_INITIALIZE, 
unsentRequest.requestBuilder().apiKey());
-        final StreamsGroupInitializeRequest.Builder 
streamsInitializeRequestBuilder = (StreamsGroupInitializeRequest.Builder) 
unsentRequest.requestBuilder();
-        final StreamsGroupInitializeRequest streamsGroupInitializeRequest = 
streamsInitializeRequestBuilder.build();
+        final StreamsGroupInitializeRequest.Builder 
streamsGroupInitializeRequestBuilder = (StreamsGroupInitializeRequest.Builder) 
unsentRequest.requestBuilder();
+        final StreamsGroupInitializeRequest streamsGroupInitializeRequest = 
streamsGroupInitializeRequestBuilder.build();
         final StreamsGroupInitializeRequestData 
streamsGroupInitializeRequestData = streamsGroupInitializeRequest.data();
         assertEquals(ApiKeys.STREAMS_GROUP_INITIALIZE.id, 
streamsGroupInitializeRequestData.apiKey());
         assertEquals(groupId, streamsGroupInitializeRequestData.groupId());
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index a48b2c6ffc8..ce8bde14493 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -77,7 +77,7 @@ private[group] class GroupCoordinatorAdapter(
     ))
   }
 
-  override def streamsInitialize(
+  override def streamsGroupInitialize(
                                    context: RequestContext,
                                    request: StreamsGroupInitializeRequestData
                                  ): 
CompletableFuture[StreamsGroupInitializeResponseData] = {
@@ -86,7 +86,7 @@ private[group] class GroupCoordinatorAdapter(
     ))
   }
 
-  override def streamsHeartbeat(
+  override def streamsGroupHeartbeat(
                                  context: RequestContext,
                                  request: StreamsGroupHeartbeatRequestData
                                ): 
CompletableFuture[StreamsGroupHeartbeatResponseData] = {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d3bfd420ace..5e2abe450c2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -277,8 +277,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DELETE_SHARE_GROUP_STATE => 
handleDeleteShareGroupStateRequest(request)
         case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => 
handleReadShareGroupStateSummaryRequest(request)
         case ApiKeys.STREAMS_GROUP_DESCRIBE => 
handleStreamsGroupDescribe(request).exceptionally(handleError)
-        case ApiKeys.STREAMS_GROUP_INITIALIZE => 
handleStreamsInitialize(request).exceptionally(handleError)
-        case ApiKeys.STREAMS_GROUP_HEARTBEAT => 
handleStreamsHeartbeat(request).exceptionally(handleError)
+        case ApiKeys.STREAMS_GROUP_INITIALIZE => 
handleStreamsGroupInitialize(request).exceptionally(handleError)
+        case ApiKeys.STREAMS_GROUP_HEARTBEAT => 
handleStreamsGroupHeartbeat(request).exceptionally(handleError)
         case _ => throw new IllegalStateException(s"No handler for request api 
key ${request.header.apiKey}")
       }
     } catch {
@@ -3889,27 +3889,27 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def isStreamsGroupProtocolEnabled(): Boolean = {
     config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
   }
-  
-  def handleStreamsInitialize(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
-    val streamsInitializeRequest = request.body[StreamsGroupInitializeRequest]
+
+  def handleStreamsGroupInitialize(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val streamsGroupInitializeRequest = 
request.body[StreamsGroupInitializeRequest]
 
     // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS
 
     if (!isStreamsGroupProtocolEnabled()) {
       // The API is not supported by the "old" group coordinator (the 
default). If the
       // new one is not enabled, we fail directly here.
-      requestHelper.sendMaybeThrottle(request, 
streamsInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      requestHelper.sendMaybeThrottle(request, 
streamsGroupInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       CompletableFuture.completedFuture[Unit](())
-    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsInitializeRequest.data.groupId)) {
-      requestHelper.sendMaybeThrottle(request, 
streamsInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsGroupInitializeRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
streamsGroupInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
-      groupCoordinator.streamsInitialize(
+      groupCoordinator.streamsGroupInitialize(
         request.context,
-        streamsInitializeRequest.data,
+        streamsGroupInitializeRequest.data,
       ).handle[Unit] { (response, exception) =>
         if (exception != null) {
-          requestHelper.sendMaybeThrottle(request, 
streamsInitializeRequest.getErrorResponse(exception))
+          requestHelper.sendMaybeThrottle(request, 
streamsGroupInitializeRequest.getErrorResponse(exception))
         } else {
           requestHelper.sendMaybeThrottle(request, new 
StreamsGroupInitializeResponse(response))
         }
@@ -3917,24 +3917,24 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleStreamsHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
-    val streamsHeartbeatRequest = request.body[StreamsGroupHeartbeatRequest]
+  def handleStreamsGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val streamsGroupHeartbeatRequest = 
request.body[StreamsGroupHeartbeatRequest]
 
     if (!isStreamsGroupProtocolEnabled()) {
       // The API is not supported by the "old" group coordinator (the 
default). If the
       // new one is not enabled, we fail directly here.
-      requestHelper.sendMaybeThrottle(request, 
streamsHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      requestHelper.sendMaybeThrottle(request, 
streamsGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       CompletableFuture.completedFuture[Unit](())
-    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsHeartbeatRequest.data.groupId)) {
-      requestHelper.sendMaybeThrottle(request, 
streamsHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsGroupHeartbeatRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
streamsGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
-      groupCoordinator.streamsHeartbeat(
+      groupCoordinator.streamsGroupHeartbeat(
         request.context,
-        streamsHeartbeatRequest.data,
+        streamsGroupHeartbeatRequest.data,
       ).handle[Unit] { (response, exception) =>
         if (exception != null) {
-          requestHelper.sendMaybeThrottle(request, 
streamsHeartbeatRequest.getErrorResponse(exception))
+          requestHelper.sendMaybeThrottle(request, 
streamsGroupHeartbeatRequest.getErrorResponse(exception))
         } else {
           requestHelper.sendMaybeThrottle(request, new 
StreamsGroupHeartbeatResponse(response))
         }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index d2c2a2ac487..68959acc23a 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -79,7 +79,7 @@ class GroupCoordinatorAdapterTest {
   }
 
   @Test
-  def testStreamsInitialize(): Unit = {
+  def testStreamsGroupInitialize(): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
     val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
@@ -87,7 +87,7 @@ class GroupCoordinatorAdapterTest {
     val request = new StreamsGroupInitializeRequestData()
       .setGroupId("group")
 
-    val future = adapter.streamsInitialize(ctx, request)
+    val future = adapter.streamsGroupInitialize(ctx, request)
 
     assertTrue(future.isDone)
     assertTrue(future.isCompletedExceptionally)
@@ -95,7 +95,7 @@ class GroupCoordinatorAdapterTest {
   }
 
   @Test
-  def testStreamsHeartbeat(): Unit = {
+  def testStreamsGroupHeartbeat(): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
     val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
@@ -103,7 +103,7 @@ class GroupCoordinatorAdapterTest {
     val request = new StreamsGroupHeartbeatRequestData()
       .setGroupId("group")
 
-    val future = adapter.streamsHeartbeat(ctx, request)
+    val future = adapter.streamsGroupHeartbeat(ctx, request)
 
     assertTrue(future.isDone)
     assertTrue(future.isCompletedExceptionally)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 202149fe747..1241287965f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11243,7 +11243,7 @@ class KafkaApisTest extends Logging {
     val requestChannelRequest = buildRequest(new 
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, 
true).build())
 
     val future = new CompletableFuture[StreamsGroupInitializeResponseData]()
-    when(groupCoordinator.streamsInitialize(
+    when(groupCoordinator.streamsGroupInitialize(
       requestChannelRequest.context,
       streamsGroupInitializeRequest
     )).thenReturn(future)
@@ -11269,7 +11269,7 @@ class KafkaApisTest extends Logging {
     val requestChannelRequest = buildRequest(new 
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, 
true).build())
 
     val future = new CompletableFuture[StreamsGroupInitializeResponseData]()
-    when(groupCoordinator.streamsInitialize(
+    when(groupCoordinator.streamsGroupInitialize(
       requestChannelRequest.context,
       streamsGroupInitializeRequest
     )).thenReturn(future)
@@ -11316,7 +11316,7 @@ class KafkaApisTest extends Logging {
     val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
 
     val future = new CompletableFuture[StreamsGroupHeartbeatResponseData]()
-    when(groupCoordinator.streamsHeartbeat(
+    when(groupCoordinator.streamsGroupHeartbeat(
       requestChannelRequest.context,
       streamsGroupHeartbeatRequest
     )).thenReturn(future)
@@ -11343,7 +11343,7 @@ class KafkaApisTest extends Logging {
     val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
 
     val future = new CompletableFuture[StreamsGroupHeartbeatResponseData]()
-    when(groupCoordinator.streamsHeartbeat(
+    when(groupCoordinator.streamsGroupHeartbeat(
       requestChannelRequest.context,
       streamsGroupHeartbeatRequest
     )).thenReturn(future)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index cea5da08608..7c78db21761 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -91,12 +91,12 @@ public interface GroupCoordinator {
      * Initialize a Streams Group.
      *
      * @param context           The request context.
-     * @param request           The StreamsHeartbeatRequest data.
+     * @param request           The StreamsGroupInitializeRequest data.
      *
      * @return  A future yielding the response.
      *          The error code(s) of the response are set to indicate the 
error(s) occurred during the execution.
      */
-    CompletableFuture<StreamsGroupInitializeResponseData> streamsInitialize(
+    CompletableFuture<StreamsGroupInitializeResponseData> 
streamsGroupInitialize(
         RequestContext context,
         StreamsGroupInitializeRequestData request
     );
@@ -105,12 +105,12 @@ public interface GroupCoordinator {
      * Heartbeat to a Streams Group.
      *
      * @param context           The request context.
-     * @param request           The StreamsHeartbeatResponseData data.
+     * @param request           The StreamsGroupHeartbeatResponseData data.
      *
      * @return  A future yielding the response.
      *          The error code(s) of the response are set to indicate the 
error(s) occurred during the execution.
      */
-    CompletableFuture<StreamsGroupHeartbeatResponseData> streamsHeartbeat(
+    CompletableFuture<StreamsGroupHeartbeatResponseData> streamsGroupHeartbeat(
         RequestContext context,
         StreamsGroupHeartbeatRequestData request
     );
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 5d632528961..c6631945add 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -351,10 +351,10 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
     }
 
     /**
-     * See {@link GroupCoordinator#streamsInitialize(RequestContext, 
org.apache.kafka.common.message.StreamsGroupInitializeRequestData)}.
+     * See {@link GroupCoordinator#streamsGroupInitialize(RequestContext, 
org.apache.kafka.common.message.StreamsGroupInitializeRequestData)}.
      */
     @Override
-    public CompletableFuture<StreamsGroupInitializeResponseData> 
streamsInitialize(
+    public CompletableFuture<StreamsGroupInitializeResponseData> 
streamsGroupInitialize(
         RequestContext context,
         StreamsGroupInitializeRequestData request
     ) {
@@ -368,7 +368,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             "streams-group-initialize",
             topicPartitionFor(request.groupId()),
             Duration.ofMillis(config.offsetCommitTimeoutMs()),
-            coordinator -> coordinator.streamsInitialize(context, request)
+            coordinator -> coordinator.streamsGroupInitialize(context, request)
         ).exceptionally(exception -> handleOperationException(
             "streams-group-initialize",
             request,
@@ -380,10 +380,10 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
     }
 
     /**
-     * See {@link GroupCoordinator#streamsHeartbeat(RequestContext, 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData)}.
+     * See {@link GroupCoordinator#streamsGroupHeartbeat(RequestContext, 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData)}.
      */
     @Override
-    public CompletableFuture<StreamsGroupHeartbeatResponseData> 
streamsHeartbeat(
+    public CompletableFuture<StreamsGroupHeartbeatResponseData> 
streamsGroupHeartbeat(
         RequestContext context,
         StreamsGroupHeartbeatRequestData request
     ) {
@@ -397,7 +397,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             "streams-heartbeat",
             topicPartitionFor(request.groupId()),
             Duration.ofMillis(config.offsetCommitTimeoutMs()),
-            coordinator -> coordinator.streamsHeartbeat(context, request)
+            coordinator -> coordinator.streamsGroupHeartbeat(context, request)
         ).exceptionally(exception -> handleOperationException(
             "streams-heartbeat",
             request,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index cb2833f7029..225b56f58e1 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -107,7 +107,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
-import org.apache.kafka.coordinator.group.taskassignor.MockAssignor;
 import org.apache.kafka.coordinator.group.taskassignor.StickyTaskAssignor;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
@@ -391,35 +390,35 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     }
 
     /**
-     * Handles a StreamsInitialize request.
+     * Handles a StreamsGroupInitialize request.
      *
      * @param context The request context.
-     * @param request The actual StreamsInitialize request.
+     * @param request The actual StreamsGroupInitialize request.
      *
-     * @return A Result containing the StreamsInitialize response and
+     * @return A Result containing the StreamsGroupInitialize response and
      *         a list of records to update the state machine.
      */
-    public CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsInitialize(
+    public CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsGroupInitialize(
         RequestContext context,
         StreamsGroupInitializeRequestData request
     ) {
-        return groupMetadataManager.streamsInitialize(context, request);
+        return groupMetadataManager.streamsGroupInitialize(context, request);
     }
 
     /**
-     * Handles a StreamsHeartbeat request.
+     * Handles a StreamsGroupHeartbeat request.
      *
      * @param context The request context.
-     * @param request The actual StreamsHeartbeat request.
+     * @param request The actual StreamsGroupHeartbeat request.
      *
-     * @return A Result containing the StreamsHeartbeat response and
+     * @return A Result containing the StreamsGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
-    public CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> streamsHeartbeat(
+    public CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> streamsGroupHeartbeat(
         RequestContext context,
         StreamsGroupHeartbeatRequestData request
     ) {
-        return groupMetadataManager.streamsHeartbeat(context, request);
+        return groupMetadataManager.streamsGroupHeartbeat(context, request);
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 3e51e4b798a..40c9e652a1b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1672,7 +1672,7 @@ public class GroupMetadataManager {
      * @throws InvalidRequestException if the request is not valid.
      * @throws UnsupportedAssignorException if the assignor is not supported.
      */
-    private void throwIfStreamsInitializeRequestIsInvalid(
+    private void throwIfStreamsGroupInitializeRequestIsInvalid(
         StreamsGroupInitializeRequestData request
     ) throws InvalidRequestException, UnsupportedAssignorException {
         throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
@@ -1694,7 +1694,7 @@ public class GroupMetadataManager {
      * @throws InvalidRequestException if the request is not valid.
      * @throws UnsupportedAssignorException if the assignor is not supported.
      */
-    private void throwIfStreamsHeartbeatRequestIsInvalid(
+    private void throwIfStreamsGroupHeartbeatRequestIsInvalid(
         StreamsGroupHeartbeatRequestData request
     ) throws InvalidRequestException, UnsupportedAssignorException {
         throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
@@ -2243,9 +2243,9 @@ public class GroupMetadataManager {
      * @param ownedWarmupTasks   The list of owned warmup tasks from the 
request or null.
      * @param userEndpoint
      * @param clientTags
-     * @return A Result containing the StreamsHeartbeat response and a list of 
records to update the state machine.
+     * @return A Result containing the StreamsGroupHeartbeat response and a 
list of records to update the state machine.
      */
-    private CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> streamsHeartbeat(
+    private CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> streamsGroupHeartbeat(
         String groupId,
         String memberId,
         int memberEpoch,
@@ -2422,15 +2422,15 @@ public class GroupMetadataManager {
             || hasAssignedStandbyTasksChanged(member, updatedMember)
             || hasAssignedWarmupTasksChanged(member, updatedMember)
         ) {
-            
response.setActiveTasks(createStreamsHeartbeatResponseTaskIds(updatedMember.assignedActiveTasks()));
-            
response.setStandbyTasks(createStreamsHeartbeatResponseTaskIds(updatedMember.assignedStandbyTasks()));
-            
response.setWarmupTasks(createStreamsHeartbeatResponseTaskIds(updatedMember.assignedWarmupTasks()));
+            
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedActiveTasks()));
+            
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedStandbyTasks()));
+            
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedWarmupTasks()));
         }
 
         return new CoordinatorResult<>(records, response);
     }
 
-    private List<StreamsGroupHeartbeatResponseData.TaskIds> 
createStreamsHeartbeatResponseTaskIds(final Map<String, Set<Integer>> taskIds) {
+    private List<StreamsGroupHeartbeatResponseData.TaskIds> 
createStreamsGroupHeartbeatResponseTaskIds(final Map<String, Set<Integer>> 
taskIds) {
         return taskIds.entrySet().stream()
             .map(entry -> new StreamsGroupHeartbeatResponseData.TaskIds()
                 .setSubtopology(entry.getKey())
@@ -2655,10 +2655,10 @@ public class GroupMetadataManager {
      *
      * @param groupId       The group id from the request.
      * @param subtopologies The list of subtopologies
-     * @return A Result containing the StreamsInitialize response and a list 
of records to update the state machine.
+     * @return A Result containing the StreamsGroupInitialize response and a 
list of records to update the state machine.
      */
-    private CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsInitialize(String groupId,
-                                                                               
                        List<StreamsGroupInitializeRequestData.Subtopology> 
subtopologies)
+    private CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsGroupInitialize(String groupId,
+                                                                               
                             
List<StreamsGroupInitializeRequestData.Subtopology> subtopologies)
         throws ApiException {
         final List<CoordinatorRecord> records = new ArrayList<>();
 
@@ -3656,13 +3656,13 @@ public class GroupMetadataManager {
      * @param targetAssignmentEpoch The target assignment epoch.
      * @param targetAssignment      The target assignment.
      * @param ownedActiveTasks      The list of active tasks owned by the 
member. This
-     *                              is reported in the StreamsHeartbeat API and
+     *                              is reported in the StreamsGroupHeartbeat 
API and
      *                              it could be null if not provided.
      * @param ownedStandbyTasks     The list of standby owned by the member. 
This
-     *                              is reported in the StreamsHeartbeat API and
+     *                              is reported in the StreamsGroupHeartbeat 
API and
      *                              it could be null if not provided.
      * @param ownedWarmupTasks      The list of warmup tasks owned by the 
member. This
-     *                              is reported in the StreamsHeartbeat API and
+     *                              is reported in the StreamsGroupHeartbeat 
API and
      *                              it could be null if not provided.
      * @param records               The list to accumulate any new records.
      * @return The received member if no changes have been made; or a new
@@ -3950,7 +3950,7 @@ public class GroupMetadataManager {
      * @param memberId      The member id from the request.
      * @param memberEpoch   The member epoch from the request.
      *
-     * @return A Result containing the StreamsHeartbeat response and
+     * @return A Result containing the StreamsGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> streamsGroupLeave(
@@ -4777,40 +4777,40 @@ public class GroupMetadataManager {
     }
 
     /**
-     * Handles a StreamsInitialize request.
+     * Handles a StreamsGroupInitialize request.
      *
      * @param context The request context.
-     * @param request The actual StreamsInitialize request.
+     * @param request The actual StreamsGroupInitialize request.
      *
-     * @return A Result containing the StreamsInitialize response and
+     * @return A Result containing the StreamsGroupInitialize response and
      *         a list of records to update the state machine.
      */
-    public CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsInitialize(
+    public CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsGroupInitialize(
         RequestContext context,
         StreamsGroupInitializeRequestData request
     ) throws ApiException {
-        throwIfStreamsInitializeRequestIsInvalid(request);
+        throwIfStreamsGroupInitializeRequestIsInvalid(request);
 
-        return streamsInitialize(
+        return streamsGroupInitialize(
             request.groupId(),
             request.topology()
         );
     }
 
     /**
-     * Handles a StreamsHeartbeat request.
+     * Handles a StreamsGroupHeartbeat request.
      *
      * @param context The request context.
-     * @param request The actual StreamsHeartbeat request.
+     * @param request The actual StreamsGroupHeartbeat request.
      *
-     * @return A Result containing the StreamsHeartbeat response and
+     * @return A Result containing the StreamsGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
-    public CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> streamsHeartbeat(
+    public CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> streamsGroupHeartbeat(
         RequestContext context,
         StreamsGroupHeartbeatRequestData request
     ) throws ApiException {
-        throwIfStreamsHeartbeatRequestIsInvalid(request);
+        throwIfStreamsGroupHeartbeatRequestIsInvalid(request);
 
         if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || 
request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
             // -2 means that a static member wants to leave the group.
@@ -4821,7 +4821,7 @@ public class GroupMetadataManager {
                 request.memberEpoch()
             );
         } else {
-            return streamsHeartbeat(
+            return streamsGroupHeartbeat(
                 request.groupId(),
                 request.memberId(),
                 request.memberEpoch(),
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
index f7690381708..4ff347c66b7 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
@@ -56,17 +56,17 @@ public class CurrentAssignmentBuilder {
     private BiFunction<String, Integer, Integer> currentTaskEpoch;
 
     /**
-     * The active tasks owned by the streams. This is directly provided by the 
member in the StreamsHeartbeat request.
+     * The active tasks owned by the streams. This is directly provided by the 
member in the StreamsGroupHeartbeat request.
      */
     private List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks;
 
     /**
-     * The standby tasks owned by the streams. This is directly provided by 
the member in the StreamsHeartbeat request.
+     * The standby tasks owned by the streams. This is directly provided by 
the member in the StreamsGroupHeartbeat request.
      */
     private List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks;
 
     /**
-     * The warmup tasks owned by the streams. This is directly provided by the 
member in the StreamsHeartbeat request.
+     * The warmup tasks owned by the streams. This is directly provided by the 
member in the StreamsGroupHeartbeat request.
      */
     private List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks;
 
@@ -110,7 +110,7 @@ public class CurrentAssignmentBuilder {
     }
 
     /**
-     * Sets the active tasks currently owned by the member. This comes 
directly from the last StreamsHeartbeat request. This is used to
+     * Sets the active tasks currently owned by the member. This comes 
directly from the last StreamsGroupHeartbeat request. This is used to
      * determine if the member has revoked the necessary tasks.
      *
      * @param ownedActiveTasks A list of topic-tasks.
@@ -124,7 +124,7 @@ public class CurrentAssignmentBuilder {
     }
 
     /**
-     * Sets the standby tasks currently owned by the member. This comes 
directly from the last StreamsHeartbeat request. This is used to
+     * Sets the standby tasks currently owned by the member. This comes 
directly from the last StreamsGroupHeartbeat request. This is used to
      * determine if the member has revoked the necessary tasks.
      *
      * @param ownedStandbyTasks A list of topic-tasks.
@@ -138,7 +138,7 @@ public class CurrentAssignmentBuilder {
     }
 
     /**
-     * Sets the warmup tasks currently owned by the member. This comes 
directly from the last StreamsHeartbeat request. This is used to
+     * Sets the warmup tasks currently owned by the member. This comes 
directly from the last StreamsGroupHeartbeat request. This is used to
      * determine if the member has revoked the necessary tasks.
      *
      * @param ownedWarmupTasks A list of topic-tasks.
@@ -178,7 +178,7 @@ public class CurrentAssignmentBuilder {
                 // When the member is in the UNREVOKED_TASKS state, we wait
                 // until the member has revoked the necessary tasks. They are
                 // considered revoked when they are not anymore reported in the
-                // owned tasks set in the StreamsHeartbeat API.
+                // owned tasks set in the StreamsGroupHeartbeat API.
 
                 // If the member provides its owned tasks. We verify if it 
still
                 // owns any of the revoked tasks. If it does, we cannot 
progress.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index dfd898c225c..7505aa8c791 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -267,7 +267,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testStreamsInitializeWhenNotStarted() throws 
ExecutionException, InterruptedException {
+    public void testStreamsGroupInitializeWhenNotStarted() throws 
ExecutionException, InterruptedException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         GroupCoordinatorService service = new GroupCoordinatorService(
             new LogContext(),
@@ -280,7 +280,7 @@ public class GroupCoordinatorServiceTest {
         StreamsGroupInitializeRequestData request = new 
StreamsGroupInitializeRequestData()
             .setGroupId("foo");
 
-        CompletableFuture<StreamsGroupInitializeResponseData> future = 
service.streamsInitialize(
+        CompletableFuture<StreamsGroupInitializeResponseData> future = 
service.streamsGroupInitialize(
             requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE),
             request
         );
@@ -293,7 +293,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testStreamsInitialize() throws ExecutionException, 
InterruptedException, TimeoutException {
+    public void testStreamsGroupInitialize() throws ExecutionException, 
InterruptedException, TimeoutException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         GroupCoordinatorService service = new GroupCoordinatorService(
             new LogContext(),
@@ -317,7 +317,7 @@ public class GroupCoordinatorServiceTest {
             new StreamsGroupInitializeResponseData()
         ));
 
-        CompletableFuture<StreamsGroupInitializeResponseData> future = 
service.streamsInitialize(
+        CompletableFuture<StreamsGroupInitializeResponseData> future = 
service.streamsGroupInitialize(
             requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE),
             request
         );
@@ -325,7 +325,7 @@ public class GroupCoordinatorServiceTest {
         assertEquals(new StreamsGroupInitializeResponseData(), future.get(5, 
TimeUnit.SECONDS));
     }
 
-    private static Stream<Arguments> 
testStreamsInitializeWithExceptionSource() {
+    private static Stream<Arguments> 
testStreamsGroupInitializeWithExceptionSource() {
         return Stream.of(
             Arguments.arguments(new UnknownTopicOrPartitionException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
             Arguments.arguments(new NotEnoughReplicasException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
@@ -341,8 +341,8 @@ public class GroupCoordinatorServiceTest {
     }
 
     @ParameterizedTest
-    @MethodSource("testStreamsInitializeWithExceptionSource")
-    public void testStreamsInitializeWithException(
+    @MethodSource("testStreamsGroupInitializeWithExceptionSource")
+    public void testStreamsGroupInitializeWithException(
         Throwable exception,
         short expectedErrorCode,
         String expectedErrorMessage
@@ -368,7 +368,7 @@ public class GroupCoordinatorServiceTest {
             ArgumentMatchers.any()
         )).thenReturn(FutureUtils.failedFuture(exception));
 
-        CompletableFuture<StreamsGroupInitializeResponseData> future = 
service.streamsInitialize(
+        CompletableFuture<StreamsGroupInitializeResponseData> future = 
service.streamsGroupInitialize(
             requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE),
             request
         );
@@ -382,7 +382,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testStreamsHeartbeatWhenNotStarted() throws 
ExecutionException, InterruptedException {
+    public void testStreamsGroupHeartbeatWhenNotStarted() throws 
ExecutionException, InterruptedException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         GroupCoordinatorService service = new GroupCoordinatorService(
             new LogContext(),
@@ -395,7 +395,7 @@ public class GroupCoordinatorServiceTest {
         StreamsGroupHeartbeatRequestData request = new 
StreamsGroupHeartbeatRequestData()
             .setGroupId("foo");
 
-        CompletableFuture<StreamsGroupHeartbeatResponseData> future = 
service.streamsHeartbeat(
+        CompletableFuture<StreamsGroupHeartbeatResponseData> future = 
service.streamsGroupHeartbeat(
             requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT),
             request
         );
@@ -408,7 +408,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testStreamsHeartbeat() throws ExecutionException, 
InterruptedException, TimeoutException {
+    public void testStreamsGroupHeartbeat() throws ExecutionException, 
InterruptedException, TimeoutException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         GroupCoordinatorService service = new GroupCoordinatorService(
             new LogContext(),
@@ -432,7 +432,7 @@ public class GroupCoordinatorServiceTest {
             new StreamsGroupHeartbeatResponseData()
         ));
 
-        CompletableFuture<StreamsGroupHeartbeatResponseData> future = 
service.streamsHeartbeat(
+        CompletableFuture<StreamsGroupHeartbeatResponseData> future = 
service.streamsGroupHeartbeat(
             requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT),
             request
         );
@@ -440,7 +440,7 @@ public class GroupCoordinatorServiceTest {
         assertEquals(new StreamsGroupHeartbeatResponseData(), future.get(5, 
TimeUnit.SECONDS));
     }
 
-    private static Stream<Arguments> testStreamsHeartbeatWithExceptionSource() 
{
+    private static Stream<Arguments> 
testStreamsGroupHeartbeatWithExceptionSource() {
         return Stream.of(
             Arguments.arguments(new UnknownTopicOrPartitionException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
             Arguments.arguments(new NotEnoughReplicasException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
@@ -455,8 +455,8 @@ public class GroupCoordinatorServiceTest {
     }
 
     @ParameterizedTest
-    @MethodSource("testStreamsHeartbeatWithExceptionSource")
-    public void testStreamsHeartbeatWithException(
+    @MethodSource("testStreamsGroupHeartbeatWithExceptionSource")
+    public void testStreamsGroupHeartbeatWithException(
         Throwable exception,
         short expectedErrorCode,
         String expectedErrorMessage
@@ -482,7 +482,7 @@ public class GroupCoordinatorServiceTest {
             ArgumentMatchers.any()
         )).thenReturn(FutureUtils.failedFuture(exception));
 
-        CompletableFuture<StreamsGroupHeartbeatResponseData> future = 
service.streamsHeartbeat(
+        CompletableFuture<StreamsGroupHeartbeatResponseData> future = 
service.streamsGroupHeartbeat(
             requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT),
             request
         );
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index e7fffb90c8c..263b2ab76ea 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -136,7 +136,7 @@ public class GroupCoordinatorShardTest {
     }
 
     @Test
-    public void testStreamsInitialize() {
+    public void testStreamsGroupInitialize() {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
@@ -159,16 +159,16 @@ public class GroupCoordinatorShardTest {
             new StreamsGroupInitializeResponseData()
         );
 
-        when(groupMetadataManager.streamsInitialize(
+        when(groupMetadataManager.streamsGroupInitialize(
             context,
             request
         )).thenReturn(result);
 
-        assertEquals(result, coordinator.streamsInitialize(context, request));
+        assertEquals(result, coordinator.streamsGroupInitialize(context, 
request));
     }
 
     @Test
-    public void testStreamsHeartbeat() {
+    public void testStreamsGroupHeartbeat() {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
         OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
         CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
@@ -191,12 +191,12 @@ public class GroupCoordinatorShardTest {
             new StreamsGroupHeartbeatResponseData()
         );
 
-        when(groupMetadataManager.streamsHeartbeat(
+        when(groupMetadataManager.streamsGroupHeartbeat(
             context,
             request
         )).thenReturn(result);
 
-        assertEquals(result, coordinator.streamsHeartbeat(context, request));
+        assertEquals(result, coordinator.streamsGroupHeartbeat(context, 
request));
     }
     
     @Test
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index bbbfe346539..05a94ea49a6 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -272,25 +272,25 @@ public class GroupMetadataManagerTest {
         Exception ex;
 
         // GroupId must be present in all requests.
-        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()));
         assertEquals("GroupId can't be empty.", ex.getMessage());
 
         // GroupId can't be all whitespaces.
-        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("   ")));
         assertEquals("GroupId can't be empty.", ex.getMessage());
 
         // RebalanceTimeoutMs must be present in the first request (epoch == 
0).
-        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
                 .setMemberEpoch(0)));
         assertEquals("RebalanceTimeoutMs must be provided in first request.", 
ex.getMessage());
 
         // ActiveTasks must be present and empty in the first request (epoch 
== 0).
-        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
                 .setMemberEpoch(0)
@@ -300,7 +300,7 @@ public class GroupMetadataManagerTest {
         assertEquals("ActiveTasks must be empty when (re-)joining.", 
ex.getMessage());
 
         // StandbyTasks must be present and empty in the first request (epoch 
== 0).
-        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
                 .setMemberEpoch(0)
@@ -310,7 +310,7 @@ public class GroupMetadataManagerTest {
         assertEquals("StandbyTasks must be empty when (re-)joining.", 
ex.getMessage());
 
         // WarmupTasks must be present and empty in the first request (epoch 
== 0).
-        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
                 .setMemberEpoch(0)
@@ -321,14 +321,14 @@ public class GroupMetadataManagerTest {
 
         // MemberId must be non-empty in all requests except for the first one 
where it
         // could be empty (epoch != 0).
-        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
                 .setMemberEpoch(1)));
         assertEquals("MemberId can't be empty.", ex.getMessage());
 
         // InstanceId must be non-empty if provided in all requests.
-        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
                 .setMemberId(Uuid.randomUuid().toString())
@@ -337,7 +337,7 @@ public class GroupMetadataManagerTest {
         assertEquals("InstanceId can't be empty.", ex.getMessage());
 
         // RackId must be non-empty if provided in all requests.
-        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
                 .setMemberId(Uuid.randomUuid().toString())
@@ -346,8 +346,8 @@ public class GroupMetadataManagerTest {
         assertEquals("RackId can't be empty.", ex.getMessage());
 
 //       TODO: // ServerAssignor must exist if provided in all requests.
-//        ex = assertThrows(UnsupportedAssignorException.class, () -> 
context.streamsHeartbeat(
-//            new StreamsHeartbeatRequestData()
+//        ex = assertThrows(UnsupportedAssignorException.class, () -> 
context.streamsGroupHeartbeat(
+//            new StreamsGroupHeartbeatRequestData()
 //                .setGroupId("foo")
 //                .setMemberId(Uuid.randomUuid().toString())
 //                .setMemberEpoch(1)
@@ -16016,15 +16016,15 @@ public class GroupMetadataManagerTest {
 //                    )
 //                )
 //        );
-//        CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
result =
-//            context.streamsInitialize(
-//                new StreamsInitializeRequestData()
+//        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result =
+//            context.streamsGroupInitialize(
+//                new streamsGroupInitializeRequestData()
 //                    .setGroupId(groupId)
 //                    .setTopology(topology)
 //            );
 //
 //        assertEquals(
-//            new StreamsInitializeResponseData(),
+//            new StreamsGroupInitializeResponseData(),
 //            result.response()
 //        );
 //
@@ -16086,15 +16086,15 @@ public class GroupMetadataManagerTest {
 //                    )
 //                )
 //        );
-//        CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
result =
-//            context.streamsInitialize(
-//                new StreamsInitializeRequestData()
+//        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result =
+//            context.streamsGroupInitialize(
+//                new streamsGroupInitializeRequestData()
 //                    .setGroupId(groupId)
 //                    .setTopology(topology)
 //            );
 //
 //        assertEquals(
-//            new StreamsInitializeResponseData()
+//            new StreamsGroupInitializeResponseData()
 //                .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code())
 //                .setErrorMessage("Internal topics changelog do not exist."),
 //            result.response()
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 45654736316..d77f5414ee9 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -727,7 +727,7 @@ public class GroupMetadataManagerTestContext {
     }
 
 
-    public CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsInitialize(
+    public CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsGroupInitialize(
         StreamsGroupInitializeRequestData request
     ) {
         RequestContext context = new RequestContext(
@@ -746,7 +746,7 @@ public class GroupMetadataManagerTestContext {
             false
         );
 
-        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result = groupMetadataManager.streamsInitialize(
+        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result = groupMetadataManager.streamsGroupInitialize(
             context,
             request
         );
@@ -757,7 +757,7 @@ public class GroupMetadataManagerTestContext {
         return result;
     }
 
-    public CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> streamsHeartbeat(
+    public CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> streamsGroupHeartbeat(
         StreamsGroupHeartbeatRequestData request
     ) {
         RequestContext context = new RequestContext(
@@ -776,7 +776,7 @@ public class GroupMetadataManagerTestContext {
             false
         );
 
-        CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> result = groupMetadataManager.streamsHeartbeat(
+        CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> result = groupMetadataManager.streamsGroupHeartbeat(
             context,
             request
         );

Reply via email to