This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4c64b358d6279d50dc3173ba24f21adf5e31f012
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Jun 7 15:32:08 2024 +0200

    Resolve conflict from 11/25 rebase Basic heartbeat RPC handler
    
    See https://github.com/lucasbru/kafka/pull/18
---
 .../group/GroupCoordinatorAdapter.scala            |  11 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  26 +++++
 .../group/GroupCoordinatorAdapterTest.scala        |  18 +++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  68 ++++++++++++-
 .../kafka/coordinator/group/GroupCoordinator.java  |  16 +++
 .../coordinator/group/GroupCoordinatorService.java |  31 ++++++
 .../coordinator/group/GroupCoordinatorShard.java   |  18 ++++
 .../coordinator/group/GroupMetadataManager.java    | 102 ++++++++++++++++++-
 .../coordinator/group/streams/Assignment.java      |   6 +-
 .../group/GroupCoordinatorServiceTest.java         | 113 +++++++++++++++++++++
 .../group/GroupCoordinatorShardTest.java           |  34 +++++++
 .../group/GroupMetadataManagerTest.java            |  94 +++++++++++++++++
 .../group/GroupMetadataManagerTestContext.java     |  32 ++++++
 .../coordinator/group/streams/AssignmentTest.java  |  18 ++--
 14 files changed, 569 insertions(+), 18 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index d45221d1e71..f7b456699d0 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -18,7 +18,7 @@ package kafka.coordinator.group
 
 import kafka.server.{KafkaConfig, ReplicaManager}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
+import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
@@ -86,6 +86,15 @@ private[group] class GroupCoordinatorAdapter(
     ))
   }
 
+  override def streamsHeartbeat(
+                                 context: RequestContext,
+                                 request: StreamsHeartbeatRequestData
+                               ): 
CompletableFuture[StreamsHeartbeatResponseData] = {
+    FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+      s"The old group coordinator does not support 
${ApiKeys.STREAMS_HEARTBEAT.name} API."
+    ))
+  }
+
   override def shareGroupHeartbeat(
     context: RequestContext,
     request: ShareGroupHeartbeatRequestData
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 046f1570707..3bd368bf1e5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -277,6 +277,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DELETE_SHARE_GROUP_STATE => 
handleDeleteShareGroupStateRequest(request)
         case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => 
handleReadShareGroupStateSummaryRequest(request)
         case ApiKeys.STREAMS_INITIALIZE => 
handleStreamsInitialize(request).exceptionally(handleError)
+        case ApiKeys.STREAMS_HEARTBEAT => 
handleStreamsHeartbeat(request).exceptionally(handleError)
         case _ => throw new IllegalStateException(s"No handler for request api 
key ${request.header.apiKey}")
       }
     } catch {
@@ -3911,6 +3912,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleStreamsGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val streamsHeartbeatRequest = request.body[StreamsHeartbeatRequest]
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
streamsHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsHeartbeatRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
streamsHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      groupCoordinator.streamsHeartbeat(
+        request.context,
+        streamsHeartbeatRequest.data,
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
streamsHeartbeatRequest.getErrorResponse(exception))
+        } else {
+          requestHelper.sendMaybeThrottle(request, new 
StreamsHeartbeatResponse(response))
+        }
+      }
+    }
+  }
+
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
     val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 0d57eea15f9..03fcf3ff21d 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -19,7 +19,7 @@ package kafka.coordinator.group
 import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, 
SyncGroupCallback}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.errors.{InvalidGroupIdException, 
UnsupportedVersionException}
-import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...]
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...]
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
 import 
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition,
 OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
@@ -94,6 +94,22 @@ class GroupCoordinatorAdapterTest {
     assertFutureThrows(future, classOf[UnsupportedVersionException])
   }
 
+  @Test
+  def testStreamsHeartbeat(): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
+
+    val ctx = makeContext(ApiKeys.STREAMS_HEARTBEAT, 
ApiKeys.STREAMS_HEARTBEAT.latestVersion)
+    val request = new StreamsHeartbeatRequestData()()
+      .setGroupId("group")
+
+    val future = adapter.streamsHeartbeat(ctx, request)
+
+    assertTrue(future.isDone)
+    assertTrue(future.isCompletedExceptionally)
+    assertFutureThrows(future, classOf[UnsupportedVersionException])
+  }
+
   @Test
   def testJoinShareGroup(): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 4c0f3421e3f..c0bfa5d0f31 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11233,7 +11233,8 @@ class KafkaApisTest extends Logging {
     val response = 
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
-  
+
+  @Test
   def testStreamsInitializeRequest(): Unit = {
     val streamsInitializeRequest = new 
StreamsInitializeRequestData().setGroupId("group")
 
@@ -11296,6 +11297,71 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+
+  @Test
+  def testStreamsHeartbeatRequest(): Unit = {
+    val streamsHeartbeatRequest = new 
StreamsHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build())
+
+    val future = new CompletableFuture[StreamsHeartbeatResponseData]()
+    when(groupCoordinator.streamsHeartbeat(
+      requestChannelRequest.context,
+      streamsHeartbeatRequest
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(overrideProperties = Map(
+      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
+    ))
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val streamsHeartbeatResponse = new StreamsHeartbeatResponseData()
+      .setMemberId("member")
+
+    future.complete(streamsHeartbeatResponse)
+    val response = 
verifyNoThrottling[StreamsHeartbeatResponse](requestChannelRequest)
+    assertEquals(streamsHeartbeatResponse, response.data)
+  }
+
+  @Test
+  def testStreamsHeartbeatRequestFutureFailed(): Unit = {
+    val streamsHeartbeatRequest = new 
StreamsHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build())
+
+    val future = new CompletableFuture[StreamsHeartbeatResponseData]()
+    when(groupCoordinator.streamsHeartbeat(
+      requestChannelRequest.context,
+      streamsHeartbeatRequest
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(overrideProperties = Map(
+      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
+    ))
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
+    val response = 
verifyNoThrottling[StreamsHeartbeatResponse](requestChannelRequest)
+    assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode)
+  }
+
+  @Test
+  def testStreamsHeartbeatRequestAuthorizationFailed(): Unit = {
+    val streamsHeartbeatRequest = new 
StreamsHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      overrideProperties = 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true")
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[StreamsHeartbeatResponse](requestChannelRequest)
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 8a5032451b1..900a789f2c0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -39,6 +39,8 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -98,6 +100,20 @@ public interface GroupCoordinator {
         StreamsInitializeRequestData request
     );
 
+    /**
+     * Heartbeat to a Streams Group.
+     *
+     * @param context           The request context.
+     * @param request           The StreamsHeartbeatResponseData data.
+     *
+     * @return  A future yielding the response.
+     *          The error code(s) of the response are set to indicate the 
error(s) occurred during the execution.
+     */
+    CompletableFuture<StreamsHeartbeatResponseData> streamsHeartbeat(
+        RequestContext context,
+        StreamsHeartbeatRequestData request
+    );
+
     /**
      * Heartbeat to a Share Group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 2a3eef88a04..9f180c5798a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -44,6 +44,8 @@ import 
org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import 
org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -375,6 +377,35 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         ));
     }
 
+    /**
+     * See {@link GroupCoordinator#streamsHeartbeat(RequestContext, 
StreamsHeartbeatRequestData)}.
+     */
+    @Override
+    public CompletableFuture<StreamsHeartbeatResponseData> streamsHeartbeat(
+        RequestContext context,
+        StreamsHeartbeatRequestData request
+    ) {
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(new 
StreamsHeartbeatResponseData()
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            );
+        }
+
+        return runtime.scheduleWriteOperation(
+            "streams-heartbeat",
+            topicPartitionFor(request.groupId()),
+            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            coordinator -> coordinator.streamsHeartbeat(context, request)
+        ).exceptionally(exception -> handleOperationException(
+            "streams-heartbeat",
+            request,
+            exception,
+            (error, message) -> new StreamsHeartbeatResponseData()
+                .setErrorCode(error.code())
+                .setErrorMessage(message)
+        ));
+    }
+
     /**
      * See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, 
ShareGroupHeartbeatRequestData)}.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index b002f96b60b..2eed6f395e3 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -39,6 +39,8 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -379,6 +381,22 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return groupMetadataManager.streamsInitialize(context, request);
     }
 
+    /**
+     * Handles a StreamsHeartbeat request.
+     *
+     * @param context The request context.
+     * @param request The actual StreamsHeartbeat request.
+     *
+     * @return A Result containing the StreamsHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<StreamsHeartbeatResponseData, CoordinatorRecord> 
streamsHeartbeat(
+        RequestContext context,
+        StreamsHeartbeatRequestData request
+    ) {
+        return groupMetadataManager.streamsHeartbeat(context, request);
+    }
+
     /**
      * Handles a ShareGroupHeartbeat request.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index c7cf02f7974..f95b9693ca8 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -53,6 +53,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -1427,6 +1429,52 @@ public class GroupMetadataManager {
     
 
     /**
+    /**
+     * Validates the request.
+     *
+     * @param request The request to validate.
+     *
+     * @throws InvalidRequestException if the request is not valid.
+     * @throws UnsupportedAssignorException if the assignor is not supported.
+     */
+    private void throwIfStreamsHeartbeatRequestIsInvalid(
+        StreamsHeartbeatRequestData request
+    ) throws InvalidRequestException, UnsupportedAssignorException {
+        throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
+        throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
+        throwIfEmptyString(request.rackId(), "RackId can't be empty.");
+
+        if (request.memberEpoch() > 0 || request.memberEpoch() == 
LEAVE_GROUP_MEMBER_EPOCH) {
+            throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+        } else if (request.memberEpoch() == 0) {
+            if (request.rebalanceTimeoutMs() == -1) {
+                throw new InvalidRequestException("RebalanceTimeoutMs must be 
provided in first request.");
+            }
+            if (request.activeTasks() == null || 
!request.activeTasks().isEmpty()) {
+                throw new InvalidRequestException("ActiveTasks must be empty 
when (re-)joining.");
+            }
+            if (request.standbyTasks() == null || 
!request.standbyTasks().isEmpty()) {
+                throw new InvalidRequestException("StandbyTasks must be empty 
when (re-)joining.");
+            }
+            if (request.warmupTasks() == null || 
!request.warmupTasks().isEmpty()) {
+                throw new InvalidRequestException("WarmupTasks must be empty 
when (re-)joining.");
+            }
+            // TODO: check that active, standby and warmup do not intersect
+        } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+            throwIfNull(request.instanceId(), "InstanceId can't be null.");
+        } else {
+            throw new InvalidRequestException("MemberEpoch is invalid.");
+        }
+
+        // TODO: Check that task assignor exists.
+//        if (request.assignor() != null && 
!assignors.containsKey(request.assignor())) {
+//            throw new UnsupportedAssignorException("Assignor " + 
request.assignor()
+//                + " is not supported. Supported assignors: " + 
String.join(", ", assignors.keySet())
+//                + ".");
+//        }
+    }
+
     /**
      * Verifies that the partitions currently owned by the member (the ones 
set in the
      * request) matches the ones that the member should own. It matches if the 
consumer
@@ -3640,12 +3688,12 @@ public class GroupMetadataManager {
     }
 
     /**
-     * Handles a ConsumerGroupHeartbeat request.
+     * Handles a StreamsInitialize request.
      *
      * @param context The request context.
-     * @param request The actual ConsumerGroupHeartbeat request.
+     * @param request The actual StreamsInitialize request.
      *
-     * @return A Result containing the ConsumerGroupHeartbeat response and
+     * @return A Result containing the StreamsInitialize response and
      *         a list of records to update the state machine.
      */
     public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
streamsInitialize(
@@ -3660,6 +3708,54 @@ public class GroupMetadataManager {
         );
     }
 
+    /**
+     * Handles a StreamsHeartbeat request.
+     *
+     * @param context The request context.
+     * @param request The actual StreamsHeartbeat request.
+     *
+     * @return A Result containing the StreamsHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<StreamsHeartbeatResponseData, CoordinatorRecord> 
streamsHeartbeat(
+        RequestContext context,
+        StreamsHeartbeatRequestData request
+    ) throws ApiException {
+        throwIfStreamsHeartbeatRequestIsInvalid(request);
+
+        return new CoordinatorResult<>(
+            Collections.emptyList(),
+            new StreamsHeartbeatResponseData()
+                .setErrorCode(Errors.NONE.code())
+        );
+
+//        if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || 
request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+//            // TODO: -1 means that the member wants to leave the group.
+//            // -2 means that a static member wants to leave the group.
+//            return consumerGroupLeave(
+//                request.groupId(),
+//                request.instanceId(),
+//                request.memberId(),
+//                request.memberEpoch()
+//            );
+//        } else {
+//            // TODO: Otherwise, it is a regular heartbeat.
+//            return consumerGroupHeartbeat(
+//                request.groupId(),
+//                request.memberId(),
+//                request.memberEpoch(),
+//                request.instanceId(),
+//                request.rackId(),
+//                request.rebalanceTimeoutMs(),
+//                context.clientId(),
+//                context.clientAddress.toString(),
+//                request.subscribedTopicNames(),
+//                request.serverAssignor(),
+//                request.topicPartitions()
+//            );
+//        }
+    }
+
     /**
      * Replays StreamsGroupTopologyKey/Value to update the hard state of
      * the streams group.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
index 735ee414785..54e4cbc63a4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
@@ -103,19 +103,19 @@ public class Assignment {
         return new Assignment(
             record.activeTasks().stream()
                 .collect(Collectors.toMap(
-                    
StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology,
+                    
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
                     taskId -> new HashSet<>(taskId.partitions())
                     )
                 ),
             record.standbyTasks().stream()
                 .collect(Collectors.toMap(
-                        
StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology,
+                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
                         taskId -> new HashSet<>(taskId.partitions())
                     )
                 ),
             record.warmupTasks().stream()
                 .collect(Collectors.toMap(
-                        
StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology,
+                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
                         taskId -> new HashSet<>(taskId.partitions())
                     )
                 )
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 96273e368e1..fa19409b208 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -54,6 +54,8 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -375,6 +377,117 @@ public class GroupCoordinatorServiceTest {
         );
     }
 
+    @Test
+    public void testStreamsHeartbeatWhenNotStarted() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+
+        StreamsHeartbeatRequestData request = new StreamsHeartbeatRequestData()
+            .setGroupId("foo");
+
+        CompletableFuture<StreamsHeartbeatResponseData> future = 
service.streamsHeartbeat(
+            requestContext(ApiKeys.STREAMS_HEARTBEAT),
+            request
+        );
+
+        assertEquals(
+            new StreamsHeartbeatResponseData()
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+            future.get()
+        );
+    }
+
+    @Test
+    public void testStreamsHeartbeat() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+
+        StreamsHeartbeatRequestData request = new StreamsHeartbeatRequestData()
+            .setGroupId("foo");
+
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("streams-heartbeat"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            new StreamsHeartbeatResponseData()
+        ));
+
+        CompletableFuture<StreamsHeartbeatResponseData> future = 
service.streamsHeartbeat(
+            requestContext(ApiKeys.STREAMS_HEARTBEAT),
+            request
+        );
+
+        assertEquals(new StreamsHeartbeatResponseData(), future.get(5, 
TimeUnit.SECONDS));
+    }
+
+    private static Stream<Arguments> testStreamsHeartbeatWithExceptionSource() 
{
+        return Stream.of(
+            Arguments.arguments(new UnknownTopicOrPartitionException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new NotEnoughReplicasException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new 
org.apache.kafka.common.errors.TimeoutException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new NotLeaderOrFollowerException(), 
Errors.NOT_COORDINATOR.code(), null),
+            Arguments.arguments(new KafkaStorageException(), 
Errors.NOT_COORDINATOR.code(), null),
+            Arguments.arguments(new RecordTooLargeException(), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new RecordBatchTooLargeException(), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new InvalidFetchSizeException(""), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new InvalidRequestException("Invalid"), 
Errors.INVALID_REQUEST.code(), "Invalid")
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("testStreamsHeartbeatWithExceptionSource")
+    public void testStreamsHeartbeatWithException(
+        Throwable exception,
+        short expectedErrorCode,
+        String expectedErrorMessage
+    ) throws ExecutionException, InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+
+        StreamsHeartbeatRequestData request = new StreamsHeartbeatRequestData()
+            .setGroupId("foo");
+
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("streams-heartbeat"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(exception));
+
+        CompletableFuture<StreamsHeartbeatResponseData> future = 
service.streamsHeartbeat(
+            requestContext(ApiKeys.STREAMS_HEARTBEAT),
+            request
+        );
+
+        assertEquals(
+            new StreamsHeartbeatResponseData()
+                .setErrorCode(expectedErrorCode)
+                .setErrorMessage(expectedErrorMessage),
+            future.get(5, TimeUnit.SECONDS)
+        );
+    }
+    
     @Test
     public void testPartitionFor() {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 33756fe53ac..da7fcc52de5 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -24,6 +24,8 @@ import 
org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -165,6 +167,38 @@ public class GroupCoordinatorShardTest {
         assertEquals(result, coordinator.streamsInitialize(context, request));
     }
 
+    @Test
+    public void testStreamsHeartbeat() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        RequestContext context = requestContext(ApiKeys.STREAMS_HEARTBEAT);
+        StreamsHeartbeatRequestData request = new 
StreamsHeartbeatRequestData();
+        CoordinatorResult<StreamsHeartbeatResponseData, CoordinatorRecord> 
result = new CoordinatorResult<>(
+            Collections.emptyList(),
+            new StreamsHeartbeatResponseData()
+        );
+
+        when(groupMetadataManager.streamsHeartbeat(
+            context,
+            request
+        )).thenReturn(result);
+
+        assertEquals(result, coordinator.streamsHeartbeat(context, request));
+    }
+    
     @Test
     public void testCommitOffset() {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index c9f2d80ca2b..48afadb6ce7 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -54,6 +54,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import 
org.apache.kafka.common.message.StreamsInitializeRequestData.Subtopology;
 import 
org.apache.kafka.common.message.StreamsInitializeRequestData.TopicConfig;
@@ -261,6 +262,99 @@ public class GroupMetadataManagerTest {
         assertEquals("InstanceId can't be null.", ex.getMessage());
     }
 
+
+    @Test
+    public void testStreamsRequestValidation() {
+//        TODO MockTaskAssignor assignor = new MockTaskAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+//            .withAssignors(Collections.singletonList(assignor))
+            .build();
+        Exception ex;
+
+        // GroupId must be present in all requests.
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+            new StreamsHeartbeatRequestData()));
+        assertEquals("GroupId can't be empty.", ex.getMessage());
+
+        // GroupId can't be all whitespaces.
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+            new StreamsHeartbeatRequestData()
+                .setGroupId("   ")));
+        assertEquals("GroupId can't be empty.", ex.getMessage());
+
+        // RebalanceTimeoutMs must be present in the first request (epoch == 
0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+            new StreamsHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(0)));
+        assertEquals("RebalanceTimeoutMs must be provided in first request.", 
ex.getMessage());
+
+        // ActiveTasks must be present and empty in the first request (epoch 
== 0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+            new StreamsHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setStandbyTasks(Collections.emptyList())
+                .setWarmupTasks(Collections.emptyList())));
+        assertEquals("ActiveTasks must be empty when (re-)joining.", 
ex.getMessage());
+
+        // StandbyTasks must be present and empty in the first request (epoch 
== 0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+            new StreamsHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setActiveTasks(Collections.emptyList())
+                .setWarmupTasks(Collections.emptyList())));
+        assertEquals("StandbyTasks must be empty when (re-)joining.", 
ex.getMessage());
+
+        // WarmupTasks must be present and empty in the first request (epoch 
== 0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+            new StreamsHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setActiveTasks(Collections.emptyList())
+                .setStandbyTasks(Collections.emptyList())));
+        assertEquals("WarmupTasks must be empty when (re-)joining.", 
ex.getMessage());
+
+        // MemberId must be non-empty in all requests except for the first one 
where it
+        // could be empty (epoch != 0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+            new StreamsHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(1)));
+        assertEquals("MemberId can't be empty.", ex.getMessage());
+
+        // InstanceId must be non-empty if provided in all requests.
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+            new StreamsHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(Uuid.randomUuid().toString())
+                .setMemberEpoch(1)
+                .setInstanceId("")));
+        assertEquals("InstanceId can't be empty.", ex.getMessage());
+
+        // RackId must be non-empty if provided in all requests.
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsHeartbeat(
+            new StreamsHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(Uuid.randomUuid().toString())
+                .setMemberEpoch(1)
+                .setRackId("")));
+        assertEquals("RackId can't be empty.", ex.getMessage());
+
+//       TODO: // ServerAssignor must exist if provided in all requests.
+//        ex = assertThrows(UnsupportedAssignorException.class, () -> 
context.streamsHeartbeat(
+//            new StreamsHeartbeatRequestData()
+//                .setGroupId("foo")
+//                .setMemberId(Uuid.randomUuid().toString())
+//                .setMemberEpoch(1)
+//                .setServerAssignor("bar")));
+//        assertEquals("ServerAssignor bar is not supported. Supported 
assignors: range.", ex.getMessage());
+    }
+
     @Test
     public void testConsumerHeartbeatRegexValidation() {
         String memberId = Uuid.randomUuid().toString();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index bdcc0c9e012..e2e663466b3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -708,6 +710,36 @@ public class GroupMetadataManagerTestContext {
         return result;
     }
 
+    public CoordinatorResult<StreamsHeartbeatResponseData, CoordinatorRecord> 
streamsHeartbeat(
+        StreamsHeartbeatRequestData request
+    ) {
+        RequestContext context = new RequestContext(
+            new RequestHeader(
+                ApiKeys.STREAMS_HEARTBEAT,
+                ApiKeys.STREAMS_HEARTBEAT.latestVersion(),
+                "client",
+                0
+            ),
+            "1",
+            InetAddress.getLoopbackAddress(),
+            KafkaPrincipal.ANONYMOUS,
+            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+            SecurityProtocol.PLAINTEXT,
+            ClientInformation.EMPTY,
+            false
+        );
+
+        CoordinatorResult<StreamsHeartbeatResponseData, CoordinatorRecord> 
result = groupMetadataManager.streamsHeartbeat(
+            context,
+            request
+        );
+
+        if (result.replayRecords()) {
+            result.records().forEach(this::replay);
+        }
+        return result;
+    }
+
     public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
sleep(long ms) {
         time.sleep(ms);
         List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
timeouts = timer.poll();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
index baff38bceaf..49ba49753bc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
@@ -62,25 +62,25 @@ public class AssignmentTest {
     public void testFromTargetAssignmentRecord() {
         String subtopology1 = "subtopology1";
         String subtopology2 = "subtopology2";
-        List<StreamsGroupTargetAssignmentMemberValue.TaskId> activeTasks = new 
ArrayList<>();
-        activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTasks = 
new ArrayList<>();
+        activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
             .setSubtopology(subtopology1)
             .setPartitions(Arrays.asList(1, 2, 3)));
-        activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+        activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
             .setSubtopology(subtopology2)
             .setPartitions(Arrays.asList(4, 5, 6)));
-        List<StreamsGroupTargetAssignmentMemberValue.TaskId> standbyTasks = 
new ArrayList<>();
-        standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTasks = 
new ArrayList<>();
+        standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
             .setSubtopology(subtopology1)
             .setPartitions(Arrays.asList(7, 8, 9)));
-        standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+        standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
             .setSubtopology(subtopology2)
             .setPartitions(Arrays.asList(1, 2, 3)));
-        List<StreamsGroupTargetAssignmentMemberValue.TaskId> warmupTasks = new 
ArrayList<>();
-        warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTasks = 
new ArrayList<>();
+        warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
             .setSubtopology(subtopology1)
             .setPartitions(Arrays.asList(4, 5, 6)));
-        warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+        warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
             .setSubtopology(subtopology2)
             .setPartitions(Arrays.asList(7, 8, 9)));
 


Reply via email to