This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit fd4600b9b621603a0f443f7a284bfbd8fbb35640
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jun 3 17:23:43 2024 +0200

    Implement InitStreamsApp RPC in the group coordinator
    
    Implement init streams app call in group coordinator without creating 
internal topics.
    
    Create topology metadata record
    Verify existence of all required internal topics
    
    See https://github.com/lucasbru/kafka/pull/15
---
 .../group/GroupCoordinatorAdapter.scala            |  11 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  28 +++++
 .../group/GroupCoordinatorAdapterTest.scala        |  18 ++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  62 +++++++++
 .../kafka/coordinator/group/GroupCoordinator.java  |  16 +++
 .../group/GroupCoordinatorRecordHelpers.java       |  42 +++++++
 .../coordinator/group/GroupCoordinatorService.java |  31 +++++
 .../coordinator/group/GroupCoordinatorShard.java   |  18 +++
 .../coordinator/group/GroupMetadataManager.java    | 116 +++++++++++++++++
 .../group/GroupCoordinatorRecordHelpersTest.java   |  82 ++++++++++++
 .../group/GroupCoordinatorServiceTest.java         | 115 +++++++++++++++++
 .../group/GroupCoordinatorShardTest.java           |  34 +++++
 .../group/GroupMetadataManagerTest.java            | 140 +++++++++++++++++++++
 .../group/GroupMetadataManagerTestContext.java     |  42 +++++++
 14 files changed, 753 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 01756550569..47955e727a0 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata
 import kafka.server.{KafkaConfig, ReplicaManager}
 import kafka.utils.Implicits.MapExtensionMethods
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
+import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
@@ -78,6 +78,15 @@ private[group] class GroupCoordinatorAdapter(
     ))
   }
 
+  override def streamsInitialize(
+                                   context: RequestContext,
+                                   request: StreamsInitializeRequestData
+                                 ): 
CompletableFuture[StreamsInitializeResponseData] = {
+    FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+      s"The old group coordinator does not support 
${ApiKeys.STREAMS_INITIALIZE.name} API."
+    ))
+  }
+
   override def shareGroupHeartbeat(
     context: RequestContext,
     request: ShareGroupHeartbeatRequestData
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2acface315d..acad87c7c66 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -275,6 +275,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.WRITE_SHARE_GROUP_STATE => 
handleWriteShareGroupStateRequest(request)
         case ApiKeys.DELETE_SHARE_GROUP_STATE => 
handleDeleteShareGroupStateRequest(request)
         case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => 
handleReadShareGroupStateSummaryRequest(request)
+        case ApiKeys.STREAMS_INITIALIZE => 
handleStreamsInitialize(request).exceptionally(handleError)
         case _ => throw new IllegalStateException(s"No handler for request api 
key ${request.header.apiKey}")
       }
     } catch {
@@ -3865,6 +3866,33 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   }
 
+  def handleStreamsInitialize(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val streamsInitializeRequest = request.body[StreamsInitializeRequest]
+
+    // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
streamsInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsInitializeRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
streamsInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      groupCoordinator.streamsInitialize(
+        request.context,
+        streamsInitializeRequest.data,
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
streamsInitializeRequest.getErrorResponse(exception))
+        } else {
+          requestHelper.sendMaybeThrottle(request, new 
StreamsInitializeResponse(response))
+        }
+      }
+    }
+  }
+
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
     val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index d88ee68abcf..0b4db0f3245 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata
 import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, 
SyncGroupCallback}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.errors.{InvalidGroupIdException, 
UnsupportedVersionException}
-import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...]
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...]
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
 import 
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition,
 OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
@@ -78,6 +78,22 @@ class GroupCoordinatorAdapterTest {
     assertFutureThrows(future, classOf[UnsupportedVersionException])
   }
 
+  @Test
+  def testStreamsInitialize(): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
+
+    val ctx = makeContext(ApiKeys.STREAMS_INITIALIZE, 
ApiKeys.STREAMS_INITIALIZE.latestVersion)
+    val request = new StreamsInitializeRequestData()
+      .setGroupId("group")
+
+    val future = adapter.streamsInitialize(ctx, request)
+
+    assertTrue(future.isDone)
+    assertTrue(future.isCompletedExceptionally)
+    assertFutureThrows(future, classOf[UnsupportedVersionException])
+  }
+
   @Test
   def testJoinShareGroup(): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b978389743b..4b5d6ae84d5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11175,6 +11175,68 @@ class KafkaApisTest extends Logging {
     val response = 
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
+  
+  def testStreamsInitializeRequest(): Unit = {
+    val streamsInitializeRequest = new 
StreamsInitializeRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build())
+
+    val future = new CompletableFuture[StreamsInitializeResponseData]()
+    when(groupCoordinator.streamsInitialize(
+      requestChannelRequest.context,
+      streamsInitializeRequest
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(overrideProperties = Map(
+      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
+    ))
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val streamsInitializeResponse = new StreamsInitializeResponseData()
+
+    future.complete(streamsInitializeResponse)
+    val response = 
verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest)
+    assertEquals(streamsInitializeResponse, response.data)
+  }
+
+  @Test
+  def testStreamsInitializeRequestFutureFailed(): Unit = {
+    val streamsInitializeRequest = new 
StreamsInitializeRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build())
+
+    val future = new CompletableFuture[StreamsInitializeResponseData]()
+    when(groupCoordinator.streamsInitialize(
+      requestChannelRequest.context,
+      streamsInitializeRequest
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(overrideProperties = Map(
+      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
+    ))
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
+    val response = 
verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest)
+    assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode)
+  }
+
+  @Test
+  def testStreamsInitializeRequestAuthorizationFailed(): Unit = {
+    val streamsInitializeRequest = new 
StreamsInitializeRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      overrideProperties = 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true")
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest)
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
+  }
 
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 87efb530dc8..f4db71e65f6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -39,6 +39,8 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -82,6 +84,20 @@ public interface GroupCoordinator {
         ConsumerGroupHeartbeatRequestData request
     );
 
+    /**
+     * Initialize a Streams Group.
+     *
+     * @param context           The request context.
+     * @param request           The StreamsHeartbeatRequest data.
+     *
+     * @return  A future yielding the response.
+     *          The error code(s) of the response are set to indicate the 
error(s) occurred during the execution.
+     */
+    CompletableFuture<StreamsInitializeResponseData> streamsInitialize(
+        RequestContext context,
+        StreamsInitializeRequestData request
+    );
+
     /**
      * Heartbeat to a Share Group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index 67364470b0f..4424c161a3b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
@@ -49,6 +50,8 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
@@ -60,11 +63,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This class contains helper methods to create records stored in
  * the __consumer_offsets topic.
  */
+@SuppressWarnings("ClassDataAbstractionCoupling")
 public class GroupCoordinatorRecordHelpers {
     private GroupCoordinatorRecordHelpers() {}
 
@@ -908,4 +913,41 @@ public class GroupCoordinatorRecordHelpers {
         );
         return topics;
     }
+
+    /**
+     * Creates a StreamsTopology record.
+     *
+     * @param groupId                   The consumer group id.
+     * @param subtopologies             The subtopologies in the new topology.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId,
+                                                                  
List<StreamsInitializeRequestData.Subtopology> subtopologies) {
+        StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
+        subtopologies.forEach(subtopology -> {
+            List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics 
= subtopology.repartitionSourceTopics().stream()
+                .map(topicInfo -> {
+                    List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
topicInfo.topicConfigs().stream()
+                        .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
+                        .collect(Collectors.toList());
+                    return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs)
+                        .setPartitions(topicInfo.partitions());
+                }).collect(Collectors.toList());
+
+            List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = 
subtopology.stateChangelogTopics().stream().map(topicInfo -> {
+                List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
topicInfo.topicConfigs().stream()
+                    .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
+                    .collect(Collectors.toList());
+                return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs);
+            }).collect(Collectors.toList());
+
+            value.topology().add(new 
StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopology())
+                
.setSourceTopics(subtopology.sourceTopics()).setSinkTopics(subtopology.sinkTopics())
+                
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
+        });
+
+        return new CoordinatorRecord(new ApiMessageAndVersion(new 
StreamsGroupTopologyKey().setGroupId(groupId), (short) 15),
+            new ApiMessageAndVersion(value, (short) 0));
+    }
+
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 9ac43783be6..28a2b73ef1a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -44,6 +44,8 @@ import 
org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import 
org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -342,6 +344,35 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         ));
     }
 
+    /**
+     * See {@link GroupCoordinator#streamsInitialize(RequestContext, 
StreamsInitializeRequestData)}.
+     */
+    @Override
+    public CompletableFuture<StreamsInitializeResponseData> streamsInitialize(
+        RequestContext context,
+        StreamsInitializeRequestData request
+    ) {
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(new 
StreamsInitializeResponseData()
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            );
+        }
+
+        return runtime.scheduleWriteOperation(
+            "streams-group-initialize",
+            topicPartitionFor(request.groupId()),
+            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            coordinator -> coordinator.streamsInitialize(context, request)
+        ).exceptionally(exception -> handleOperationException(
+            "streams-group-initialize",
+            request,
+            exception,
+            (error, message) -> new StreamsInitializeResponseData()
+                .setErrorCode(error.code())
+                .setErrorMessage(message)
+        ));
+    }
+
     /**
      * See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, 
ShareGroupHeartbeatRequestData)}.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index c6eced2158e..98965c28c1b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -39,6 +39,8 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -333,6 +335,22 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return groupMetadataManager.consumerGroupHeartbeat(context, request);
     }
 
+    /**
+     * Handles a StreamsInitialize request.
+     *
+     * @param context The request context.
+     * @param request The actual StreamsInitialize request.
+     *
+     * @return A Result containing the StreamsInitialize response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
streamsInitialize(
+        RequestContext context,
+        StreamsInitializeRequestData request
+    ) {
+        return groupMetadataManager.streamsInitialize(context, request);
+    }
+
     /**
      * Handles a ShareGroupHeartbeat request.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ecf7efaad49..812b87aaaac 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -52,6 +52,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.protocol.Errors;
@@ -99,6 +101,8 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberState;
@@ -140,6 +144,7 @@ import java.util.stream.Stream;
 import static 
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
 import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
+import static org.apache.kafka.common.protocol.Errors.STREAMS_INVALID_TOPOLOGY;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
@@ -161,6 +166,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
 import static org.apache.kafka.coordinator.group.Utils.assignmentToString;
 import static org.apache.kafka.coordinator.group.Utils.ofSentinel;
 import static 
org.apache.kafka.coordinator.group.Utils.toConsumerProtocolAssignment;
@@ -1332,7 +1338,29 @@ public class GroupMetadataManager {
             throw new InvalidRequestException("MemberEpoch is invalid.");
         }
     }
+    
+    /**
+     * Validates the request.
+     *
+     * @param request The request to validate.
+     *
+     * @throws InvalidRequestException if the request is not valid.
+     * @throws UnsupportedAssignorException if the assignor is not supported.
+     */
+    private void throwIfStreamsInitializeRequestIsInvalid(
+        StreamsInitializeRequestData request
+    ) throws InvalidRequestException, UnsupportedAssignorException {
+        throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
+
+        if (request.topology().isEmpty()) {
+            throw new InvalidRequestException("There must be at least one 
subtopology.");
+        }
+
+        // TODO: Check invariants
+    }
+    
 
+    /**
     /**
      * Verifies that the partitions currently owned by the member (the ones 
set in the
      * request) matches the ones that the member should own. It matches if the 
consumer
@@ -1864,6 +1892,54 @@ public class GroupMetadataManager {
         return new CoordinatorResult<>(records, response);
     }
 
+
+    /**
+     * Handles the initialization of the topology information on the broker 
side, that will be reused by all members of the group.
+     *
+     * @param groupId       The group id from the request.
+     * @param subtopologies The list of subtopologies
+     * @return A Result containing the StreamsInitialize response and a list 
of records to update the state machine.
+     */
+    private CoordinatorResult<StreamsInitializeResponseData, 
CoordinatorRecord> streamsInitialize(String groupId,
+                                                                               
                   List<StreamsInitializeRequestData.Subtopology> subtopologies)
+        throws ApiException {
+        final List<CoordinatorRecord> records = new ArrayList<>();
+
+        // TODO: Throw if group does not exist or is not a streams group. 
Needs model of
+        //       similar to  final StreamsGroup group = 
getOrMaybeCreateStreamsGroup(groupId, createIfNotExists, records);
+        //                   throwIfNull(group);
+
+        // TODO: For the POC, only check if internal topics exist
+        Set<String> missingTopics = new HashSet<>();
+        for (StreamsInitializeRequestData.Subtopology subtopology : 
subtopologies) {
+            for (StreamsInitializeRequestData.TopicInfo topic : 
subtopology.stateChangelogTopics()) {
+                if (metadataImage.topics().getTopic(topic.name()) == null) {
+                    missingTopics.add(topic.name());
+                }
+            }
+            for (StreamsInitializeRequestData.TopicInfo topic : 
subtopology.repartitionSourceTopics()) {
+                if (metadataImage.topics().getTopic(topic.name()) == null) {
+                    missingTopics.add(topic.name());
+                }
+            }
+        }
+        if (!missingTopics.isEmpty()) {
+            StreamsInitializeResponseData response =
+                new StreamsInitializeResponseData()
+                    .setErrorCode(STREAMS_INVALID_TOPOLOGY.code())
+                    .setErrorMessage("Internal topics " + String.join(", ", 
missingTopics) + " do not exist.");
+
+            return new CoordinatorResult<>(records, response);
+        } else {
+            records.add(newStreamsGroupTopologyRecord(groupId, subtopologies));
+
+            StreamsInitializeResponseData response = new 
StreamsInitializeResponseData();
+
+            return new CoordinatorResult<>(records, response);
+        }
+
+    }
+
     /**
      * Handle a JoinGroupRequest to a ConsumerGroup.
      *
@@ -3171,6 +3247,46 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Handles a ConsumerGroupHeartbeat request.
+     *
+     * @param context The request context.
+     * @param request The actual ConsumerGroupHeartbeat request.
+     *
+     * @return A Result containing the ConsumerGroupHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
streamsInitialize(
+        RequestContext context,
+        StreamsInitializeRequestData request
+    ) throws ApiException {
+        throwIfStreamsInitializeRequestIsInvalid(request);
+
+        return streamsInitialize(
+            request.groupId(),
+            request.topology()
+        );
+    }
+
+    /**
+     * Replays StreamsGroupTopologyKey/Value to update the hard state of
+     * the streams group.
+     *
+     * @param key   A StreamsGroupTopologyKey key.
+     * @param value A StreamsGroupTopologyValue record.
+     */
+    public void replay(
+        StreamsGroupTopologyKey key,
+        StreamsGroupTopologyValue value
+    ) {
+
+        // TODO: Insert the topology information to the in-memory 
representation. Needs the notion
+        //       of a Streams group
+//        String groupId = key.groupId();
+//        StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
+//        streamsGroup.setTopology(value);
+    }
+
     /**
      * Handles a ShareGroupHeartbeat request.
      *
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index 38e2b9d68cf..7152dea8ffe 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group;
 import org.apache.kafka.common.Uuid;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -43,6 +44,8 @@ import 
org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
 import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
@@ -87,6 +90,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
@@ -213,6 +217,84 @@ public class GroupCoordinatorRecordHelpersTest {
         ));
     }
 
+    @Test
+    public void testNewStreamsGroupTopologyRecord() {
+        List<StreamsInitializeRequestData.Subtopology> topology =
+            Collections.singletonList(new 
StreamsInitializeRequestData.Subtopology()
+                .setSubtopology("subtopology-id")
+                .setSinkTopics(Collections.singletonList("foo"))
+                .setSourceTopics(Collections.singletonList("bar"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(
+                        new StreamsInitializeRequestData.TopicInfo()
+                            .setName("repartition")
+                            .setPartitions(4)
+                            .setTopicConfigs(Collections.singletonList(
+                                new StreamsInitializeRequestData.TopicConfig()
+                                    .setKey("config-name1")
+                                    .setValue("config-value1")
+                            ))
+                    )
+                )
+                .setStateChangelogTopics(
+                    Collections.singletonList(
+                        new StreamsInitializeRequestData.TopicInfo()
+                            .setName("changelog")
+                            .setTopicConfigs(Collections.singletonList(
+                                new StreamsInitializeRequestData.TopicConfig()
+                                    .setKey("config-name2")
+                                    .setValue("config-value2")
+                            ))
+                    )
+                )
+            );
+
+        List<StreamsGroupTopologyValue.Subtopology> expectedTopology =
+            Collections.singletonList(new 
StreamsGroupTopologyValue.Subtopology()
+                .setSubtopology("subtopology-id")
+                .setSinkTopics(Collections.singletonList("foo"))
+                .setSourceTopics(Collections.singletonList("bar"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(
+                        new StreamsGroupTopologyValue.TopicInfo()
+                            .setName("repartition")
+                            .setPartitions(4)
+                            .setTopicConfigs(Collections.singletonList(
+                                new StreamsGroupTopologyValue.TopicConfig()
+                                    .setKey("config-name1")
+                                    .setValue("config-value1")
+                            ))
+                    )
+                )
+                .setStateChangelogTopics(
+                    Collections.singletonList(
+                        new StreamsGroupTopologyValue.TopicInfo()
+                            .setName("changelog")
+                            .setTopicConfigs(Collections.singletonList(
+                                new StreamsGroupTopologyValue.TopicConfig()
+                                    .setKey("config-name2")
+                                    .setValue("config-value2")
+                            ))
+                    )
+                )
+            );
+
+        CoordinatorRecord expectedRecord = new CoordinatorRecord(
+            new ApiMessageAndVersion(
+                new StreamsGroupTopologyKey()
+                    .setGroupId("group-id"),
+                (short) 15),
+            new ApiMessageAndVersion(
+                new StreamsGroupTopologyValue()
+                    .setTopology(expectedTopology),
+                (short) 0));
+
+        assertEquals(expectedRecord, newStreamsGroupTopologyRecord(
+            "group-id",
+            topology
+        ));
+    }
+
     @Test
     public void 
testEmptyPartitionMetadataWhenRacksUnavailableGroupSubscriptionMetadataRecord() 
{
         Uuid fooTopicId = Uuid.randomUuid();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 1056d80dd38..c3d13d28857 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
@@ -52,6 +53,8 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -254,6 +257,118 @@ public class GroupCoordinatorServiceTest {
         );
     }
 
+    @Test
+    public void testStreamsInitializeWhenNotStarted() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+
+        StreamsInitializeRequestData request = new 
StreamsInitializeRequestData()
+            .setGroupId("foo");
+
+        CompletableFuture<StreamsInitializeResponseData> future = 
service.streamsInitialize(
+            requestContext(ApiKeys.STREAMS_INITIALIZE),
+            request
+        );
+
+        assertEquals(
+            new StreamsInitializeResponseData()
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+            future.get()
+        );
+    }
+
+    @Test
+    public void testStreamsInitialize() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+
+        StreamsInitializeRequestData request = new 
StreamsInitializeRequestData()
+            .setGroupId("foo");
+
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("streams-group-initialize"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            new StreamsInitializeResponseData()
+        ));
+
+        CompletableFuture<StreamsInitializeResponseData> future = 
service.streamsInitialize(
+            requestContext(ApiKeys.STREAMS_INITIALIZE),
+            request
+        );
+
+        assertEquals(new StreamsInitializeResponseData(), future.get(5, 
TimeUnit.SECONDS));
+    }
+
+    private static Stream<Arguments> 
testStreamsInitializeWithExceptionSource() {
+        return Stream.of(
+            Arguments.arguments(new UnknownTopicOrPartitionException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new NotEnoughReplicasException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new 
org.apache.kafka.common.errors.TimeoutException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new NotLeaderOrFollowerException(), 
Errors.NOT_COORDINATOR.code(), null),
+            Arguments.arguments(new KafkaStorageException(), 
Errors.NOT_COORDINATOR.code(), null),
+            Arguments.arguments(new RecordTooLargeException(), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new RecordBatchTooLargeException(), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new InvalidFetchSizeException(""), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new InvalidRequestException("Invalid"), 
Errors.INVALID_REQUEST.code(), "Invalid"),
+            Arguments.arguments(new 
StreamsInvalidTopologyException("Invalid"), 
Errors.STREAMS_INVALID_TOPOLOGY.code(), "Invalid")
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("testStreamsInitializeWithExceptionSource")
+    public void testStreamsInitializeWithException(
+        Throwable exception,
+        short expectedErrorCode,
+        String expectedErrorMessage
+    ) throws ExecutionException, InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+
+        StreamsInitializeRequestData request = new 
StreamsInitializeRequestData()
+            .setGroupId("foo");
+
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("streams-group-initialize"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(exception));
+
+        CompletableFuture<StreamsInitializeResponseData> future = 
service.streamsInitialize(
+            requestContext(ApiKeys.STREAMS_INITIALIZE),
+            request
+        );
+
+        assertEquals(
+            new StreamsInitializeResponseData()
+                .setErrorCode(expectedErrorCode)
+                .setErrorMessage(expectedErrorMessage),
+            future.get(5, TimeUnit.SECONDS)
+        );
+    }
+
     @Test
     public void testPartitionFor() {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 9cf3de6f821..65fb545f641 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -24,6 +24,8 @@ import 
org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -126,6 +128,38 @@ public class GroupCoordinatorShardTest {
         assertEquals(result, coordinator.consumerGroupHeartbeat(context, 
request));
     }
 
+    @Test
+    public void testStreamsInitialize() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        RequestContext context = requestContext(ApiKeys.STREAMS_INITIALIZE);
+        StreamsInitializeRequestData request = new 
StreamsInitializeRequestData();
+        CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
result = new CoordinatorResult<>(
+            Collections.emptyList(),
+            new StreamsInitializeResponseData()
+        );
+
+        when(groupMetadataManager.streamsInitialize(
+            context,
+            request
+        )).thenReturn(result);
+
+        assertEquals(result, coordinator.streamsInitialize(context, request));
+    }
+
     @Test
     public void testCommitOffset() {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 653662b859b..3a574798241 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -53,6 +53,11 @@ import 
org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import 
org.apache.kafka.common.message.StreamsInitializeRequestData.Subtopology;
+import 
org.apache.kafka.common.message.StreamsInitializeRequestData.TopicConfig;
+import org.apache.kafka.common.message.StreamsInitializeRequestData.TopicInfo;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
 import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -14745,4 +14750,139 @@ public class GroupMetadataManagerTest {
         assertEquals(expectedSuccessCount, successCount);
         return memberIds;
     }
+
+    @Test
+    public void testTopologyInitialization() {
+        String groupId = "fooup";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "repartition";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "changelog";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .build();
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.consumerGroup(groupId));
+
+        final List<Subtopology> topology = Collections.singletonList(
+            new Subtopology()
+                .setSubtopology("subtopology-id")
+                .setSinkTopics(Collections.singletonList("foo"))
+                .setSourceTopics(Collections.singletonList("bar"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(
+                        new TopicInfo()
+                            .setName("repartition")
+                            .setPartitions(4)
+                            .setTopicConfigs(Collections.singletonList(
+                                new TopicConfig()
+                                    .setKey("config-name1")
+                                    .setValue("config-value1")
+                            ))
+                    )
+                )
+                .setStateChangelogTopics(
+                    Collections.singletonList(
+                        new TopicInfo()
+                            .setName("changelog")
+                            .setTopicConfigs(Collections.singletonList(
+                                new TopicConfig()
+                                    .setKey("config-name2")
+                                    .setValue("config-value2")
+                            ))
+                    )
+                )
+        );
+        CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
result =
+            context.streamsInitialize(
+                new StreamsInitializeRequestData()
+                    .setGroupId(groupId)
+                    .setTopology(topology)
+            );
+
+        assertEquals(
+            new StreamsInitializeResponseData(),
+            result.response()
+        );
+
+        List<CoordinatorRecord> expectedRecords = Arrays.asList(
+            CoordinatorRecordHelpers.newStreamsGroupTopologyRecord(
+                groupId,
+                topology
+            )
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testTopologyInitializationMissingInternalTopics() {
+        String groupId = "fooup";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "repartition";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.consumerGroup(groupId));
+
+        final List<Subtopology> topology = Collections.singletonList(
+            new Subtopology()
+                .setSubtopology("subtopology-id")
+                .setSinkTopics(Collections.singletonList("foo"))
+                .setSourceTopics(Collections.singletonList("bar"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(
+                        new TopicInfo()
+                            .setName("repartition")
+                            .setPartitions(4)
+                            .setTopicConfigs(Collections.singletonList(
+                                new TopicConfig()
+                                    .setKey("config-name1")
+                                    .setValue("config-value1")
+                            ))
+                    )
+                )
+                .setStateChangelogTopics(
+                    Collections.singletonList(
+                        new TopicInfo()
+                            .setName("changelog")
+                            .setTopicConfigs(Collections.singletonList(
+                                new TopicConfig()
+                                    .setKey("config-name2")
+                                    .setValue("config-value2")
+                            ))
+                    )
+                )
+        );
+        CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
result =
+            context.streamsInitialize(
+                new StreamsInitializeRequestData()
+                    .setGroupId(groupId)
+                    .setTopology(topology)
+            );
+
+        assertEquals(
+            new StreamsInitializeResponseData()
+                .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code())
+                .setErrorMessage("Internal topics changelog do not exist."),
+            result.response()
+        );
+
+        assertTrue(result.records().isEmpty());
+
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index b9ba9410497..a2dde4cc33c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.network.ClientInformation;
@@ -81,6 +83,8 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
@@ -657,6 +661,37 @@ public class GroupMetadataManagerTestContext {
         return result;
     }
 
+
+    public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
streamsInitialize(
+        StreamsInitializeRequestData request
+    ) {
+        RequestContext context = new RequestContext(
+            new RequestHeader(
+                ApiKeys.STREAMS_INITIALIZE,
+                ApiKeys.STREAMS_INITIALIZE.latestVersion(),
+                "client",
+                0
+            ),
+            "1",
+            InetAddress.getLoopbackAddress(),
+            KafkaPrincipal.ANONYMOUS,
+            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+            SecurityProtocol.PLAINTEXT,
+            ClientInformation.EMPTY,
+            false
+        );
+
+        CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
result = groupMetadataManager.streamsInitialize(
+            context,
+            request
+        );
+
+        if (result.replayRecords()) {
+            result.records().forEach(this::replay);
+        }
+        return result;
+    }
+    
     public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
sleep(long ms) {
         time.sleep(ms);
         List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
timeouts = timer.poll();
@@ -1550,6 +1585,13 @@ public class GroupMetadataManagerTestContext {
                 );
                 break;
 
+            case StreamsGroupTopologyKey.HIGHEST_SUPPORTED_VERSION:
+                groupMetadataManager.replay(
+                    (StreamsGroupTopologyKey) key.message(),
+                    (StreamsGroupTopologyValue) messageOrNull(value)
+                );
+                break;
+                
             default:
                 throw new IllegalStateException("Received an unknown record 
type " + key.version()
                     + " in " + record);

Reply via email to