This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c1a597b211e1583ec9a12020b3c45e4d7030cc36
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jun 3 17:23:43 2024 +0200

    Resolve merge conflict from 11/25 trunk rebase Implement InitStreamsApp RPC 
in the group coordinator
    
    Implement init streams app call in group coordinator without creating 
internal topics.
    
    Create topology metadata record
    Verify existence of all required internal topics
    
    See https://github.com/lucasbru/kafka/pull/15
---
 .../group/GroupCoordinatorAdapter.scala            |  11 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  28 +++++
 .../group/GroupCoordinatorAdapterTest.scala        |  18 ++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  62 +++++++++
 .../kafka/coordinator/group/GroupCoordinator.java  |  16 +++
 .../group/GroupCoordinatorRecordHelpers.java       |  42 +++++++
 .../coordinator/group/GroupCoordinatorService.java |  31 +++++
 .../coordinator/group/GroupCoordinatorShard.java   |  18 +++
 .../coordinator/group/GroupMetadataManager.java    | 116 +++++++++++++++++
 .../group/GroupCoordinatorRecordHelpersTest.java   |  82 ++++++++++++
 .../group/GroupCoordinatorServiceTest.java         | 115 +++++++++++++++++
 .../group/GroupCoordinatorShardTest.java           |  34 +++++
 .../group/GroupMetadataManagerTest.java            | 140 +++++++++++++++++++++
 .../group/GroupMetadataManagerTestContext.java     |  42 +++++++
 14 files changed, 753 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 970d283953e..d45221d1e71 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -18,7 +18,7 @@ package kafka.coordinator.group
 
 import kafka.server.{KafkaConfig, ReplicaManager}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
+import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
@@ -77,6 +77,15 @@ private[group] class GroupCoordinatorAdapter(
     ))
   }
 
+  override def streamsInitialize(
+                                   context: RequestContext,
+                                   request: StreamsInitializeRequestData
+                                 ): 
CompletableFuture[StreamsInitializeResponseData] = {
+    FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+      s"The old group coordinator does not support 
${ApiKeys.STREAMS_INITIALIZE.name} API."
+    ))
+  }
+
   override def shareGroupHeartbeat(
     context: RequestContext,
     request: ShareGroupHeartbeatRequestData
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 21942229d87..046f1570707 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -276,6 +276,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.WRITE_SHARE_GROUP_STATE => 
handleWriteShareGroupStateRequest(request)
         case ApiKeys.DELETE_SHARE_GROUP_STATE => 
handleDeleteShareGroupStateRequest(request)
         case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => 
handleReadShareGroupStateSummaryRequest(request)
+        case ApiKeys.STREAMS_INITIALIZE => 
handleStreamsInitialize(request).exceptionally(handleError)
         case _ => throw new IllegalStateException(s"No handler for request api 
key ${request.header.apiKey}")
       }
     } catch {
@@ -3883,6 +3884,33 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   }
 
+  def handleStreamsInitialize(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val streamsInitializeRequest = request.body[StreamsInitializeRequest]
+
+    // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
streamsInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsInitializeRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
streamsInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      groupCoordinator.streamsInitialize(
+        request.context,
+        streamsInitializeRequest.data,
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
streamsInitializeRequest.getErrorResponse(exception))
+        } else {
+          requestHelper.sendMaybeThrottle(request, new 
StreamsInitializeResponse(response))
+        }
+      }
+    }
+  }
+
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
     val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 7a9de453740..0d57eea15f9 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -19,7 +19,7 @@ package kafka.coordinator.group
 import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, 
SyncGroupCallback}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.errors.{InvalidGroupIdException, 
UnsupportedVersionException}
-import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...]
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...]
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
 import 
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition,
 OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
@@ -78,6 +78,22 @@ class GroupCoordinatorAdapterTest {
     assertFutureThrows(future, classOf[UnsupportedVersionException])
   }
 
+  @Test
+  def testStreamsInitialize(): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
+
+    val ctx = makeContext(ApiKeys.STREAMS_INITIALIZE, 
ApiKeys.STREAMS_INITIALIZE.latestVersion)
+    val request = new StreamsInitializeRequestData()
+      .setGroupId("group")
+
+    val future = adapter.streamsInitialize(ctx, request)
+
+    assertTrue(future.isDone)
+    assertTrue(future.isCompletedExceptionally)
+    assertFutureThrows(future, classOf[UnsupportedVersionException])
+  }
+
   @Test
   def testJoinShareGroup(): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5771a17d5ea..4c0f3421e3f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11233,6 +11233,68 @@ class KafkaApisTest extends Logging {
     val response = 
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
+  
+  def testStreamsInitializeRequest(): Unit = {
+    val streamsInitializeRequest = new 
StreamsInitializeRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build())
+
+    val future = new CompletableFuture[StreamsInitializeResponseData]()
+    when(groupCoordinator.streamsInitialize(
+      requestChannelRequest.context,
+      streamsInitializeRequest
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(overrideProperties = Map(
+      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
+    ))
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val streamsInitializeResponse = new StreamsInitializeResponseData()
+
+    future.complete(streamsInitializeResponse)
+    val response = 
verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest)
+    assertEquals(streamsInitializeResponse, response.data)
+  }
+
+  @Test
+  def testStreamsInitializeRequestFutureFailed(): Unit = {
+    val streamsInitializeRequest = new 
StreamsInitializeRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build())
+
+    val future = new CompletableFuture[StreamsInitializeResponseData]()
+    when(groupCoordinator.streamsInitialize(
+      requestChannelRequest.context,
+      streamsInitializeRequest
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(overrideProperties = Map(
+      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
+    ))
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
+    val response = 
verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest)
+    assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode)
+  }
+
+  @Test
+  def testStreamsInitializeRequestAuthorizationFailed(): Unit = {
+    val streamsInitializeRequest = new 
StreamsInitializeRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      overrideProperties = 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true")
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest)
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
+  }
 
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 223cff9720e..8a5032451b1 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -39,6 +39,8 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -82,6 +84,20 @@ public interface GroupCoordinator {
         ConsumerGroupHeartbeatRequestData request
     );
 
+    /**
+     * Initialize a Streams Group.
+     *
+     * @param context           The request context.
+     * @param request           The StreamsHeartbeatRequest data.
+     *
+     * @return  A future yielding the response.
+     *          The error code(s) of the response are set to indicate the 
error(s) occurred during the execution.
+     */
+    CompletableFuture<StreamsInitializeResponseData> streamsInitialize(
+        RequestContext context,
+        StreamsInitializeRequestData request
+    );
+
     /**
      * Heartbeat to a Share Group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index 05cf4d5bd3a..98cf7f937e8 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
@@ -51,6 +52,8 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
@@ -63,11 +66,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This class contains helper methods to create records stored in
  * the __consumer_offsets topic.
  */
+@SuppressWarnings("ClassDataAbstractionCoupling")
 public class GroupCoordinatorRecordHelpers {
     private GroupCoordinatorRecordHelpers() {}
 
@@ -937,4 +942,41 @@ public class GroupCoordinatorRecordHelpers {
         );
         return topics;
     }
+
+    /**
+     * Creates a StreamsTopology record.
+     *
+     * @param groupId                   The consumer group id.
+     * @param subtopologies             The subtopologies in the new topology.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId,
+                                                                  
List<StreamsInitializeRequestData.Subtopology> subtopologies) {
+        StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
+        subtopologies.forEach(subtopology -> {
+            List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics 
= subtopology.repartitionSourceTopics().stream()
+                .map(topicInfo -> {
+                    List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
topicInfo.topicConfigs().stream()
+                        .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
+                        .collect(Collectors.toList());
+                    return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs)
+                        .setPartitions(topicInfo.partitions());
+                }).collect(Collectors.toList());
+
+            List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = 
subtopology.stateChangelogTopics().stream().map(topicInfo -> {
+                List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
topicInfo.topicConfigs().stream()
+                    .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
+                    .collect(Collectors.toList());
+                return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs);
+            }).collect(Collectors.toList());
+
+            value.topology().add(new 
StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopology())
+                
.setSourceTopics(subtopology.sourceTopics()).setSinkTopics(subtopology.sinkTopics())
+                
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
+        });
+
+        return new CoordinatorRecord(new ApiMessageAndVersion(new 
StreamsGroupTopologyKey().setGroupId(groupId), (short) 15),
+            new ApiMessageAndVersion(value, (short) 0));
+    }
+
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 96d0253a09e..2a3eef88a04 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -44,6 +44,8 @@ import 
org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import 
org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -344,6 +346,35 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         ));
     }
 
+    /**
+     * See {@link GroupCoordinator#streamsInitialize(RequestContext, 
StreamsInitializeRequestData)}.
+     */
+    @Override
+    public CompletableFuture<StreamsInitializeResponseData> streamsInitialize(
+        RequestContext context,
+        StreamsInitializeRequestData request
+    ) {
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(new 
StreamsInitializeResponseData()
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            );
+        }
+
+        return runtime.scheduleWriteOperation(
+            "streams-group-initialize",
+            topicPartitionFor(request.groupId()),
+            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            coordinator -> coordinator.streamsInitialize(context, request)
+        ).exceptionally(exception -> handleOperationException(
+            "streams-group-initialize",
+            request,
+            exception,
+            (error, message) -> new StreamsInitializeResponseData()
+                .setErrorCode(error.code())
+                .setErrorMessage(message)
+        ));
+    }
+
     /**
      * See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, 
ShareGroupHeartbeatRequestData)}.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 099755710b9..b002f96b60b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -39,6 +39,8 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -361,6 +363,22 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return groupMetadataManager.consumerGroupHeartbeat(context, request);
     }
 
+    /**
+     * Handles a StreamsInitialize request.
+     *
+     * @param context The request context.
+     * @param request The actual StreamsInitialize request.
+     *
+     * @return A Result containing the StreamsInitialize response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
streamsInitialize(
+        RequestContext context,
+        StreamsInitializeRequestData request
+    ) {
+        return groupMetadataManager.streamsInitialize(context, request);
+    }
+
     /**
      * Handles a ShareGroupHeartbeat request.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 5a7e98ea3cc..c7cf02f7974 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -53,6 +53,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.protocol.Errors;
@@ -103,6 +105,8 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberState;
@@ -150,6 +154,7 @@ import java.util.stream.Stream;
 import static 
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
 import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
+import static org.apache.kafka.common.protocol.Errors.STREAMS_INVALID_TOPOLOGY;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
@@ -172,6 +177,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
 import static org.apache.kafka.coordinator.group.Utils.assignmentToString;
 import static org.apache.kafka.coordinator.group.Utils.ofSentinel;
 import static 
org.apache.kafka.coordinator.group.Utils.toConsumerProtocolAssignment;
@@ -1398,7 +1404,29 @@ public class GroupMetadataManager {
             throw new InvalidRequestException("MemberEpoch is invalid.");
         }
     }
+    
+    /**
+     * Validates the request.
+     *
+     * @param request The request to validate.
+     *
+     * @throws InvalidRequestException if the request is not valid.
+     * @throws UnsupportedAssignorException if the assignor is not supported.
+     */
+    private void throwIfStreamsInitializeRequestIsInvalid(
+        StreamsInitializeRequestData request
+    ) throws InvalidRequestException, UnsupportedAssignorException {
+        throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
+
+        if (request.topology().isEmpty()) {
+            throw new InvalidRequestException("There must be at least one 
subtopology.");
+        }
+
+        // TODO: Check invariants
+    }
+    
 
+    /**
     /**
      * Verifies that the partitions currently owned by the member (the ones 
set in the
      * request) matches the ones that the member should own. It matches if the 
consumer
@@ -1963,6 +1991,54 @@ public class GroupMetadataManager {
         return new CoordinatorResult<>(records, response);
     }
 
+
+    /**
+     * Handles the initialization of the topology information on the broker 
side, that will be reused by all members of the group.
+     *
+     * @param groupId       The group id from the request.
+     * @param subtopologies The list of subtopologies
+     * @return A Result containing the StreamsInitialize response and a list 
of records to update the state machine.
+     */
+    private CoordinatorResult<StreamsInitializeResponseData, 
CoordinatorRecord> streamsInitialize(String groupId,
+                                                                               
                   List<StreamsInitializeRequestData.Subtopology> subtopologies)
+        throws ApiException {
+        final List<CoordinatorRecord> records = new ArrayList<>();
+
+        // TODO: Throw if group does not exist or is not a streams group. 
Needs model of
+        //       similar to  final StreamsGroup group = 
getOrMaybeCreateStreamsGroup(groupId, createIfNotExists, records);
+        //                   throwIfNull(group);
+
+        // TODO: For the POC, only check if internal topics exist
+        Set<String> missingTopics = new HashSet<>();
+        for (StreamsInitializeRequestData.Subtopology subtopology : 
subtopologies) {
+            for (StreamsInitializeRequestData.TopicInfo topic : 
subtopology.stateChangelogTopics()) {
+                if (metadataImage.topics().getTopic(topic.name()) == null) {
+                    missingTopics.add(topic.name());
+                }
+            }
+            for (StreamsInitializeRequestData.TopicInfo topic : 
subtopology.repartitionSourceTopics()) {
+                if (metadataImage.topics().getTopic(topic.name()) == null) {
+                    missingTopics.add(topic.name());
+                }
+            }
+        }
+        if (!missingTopics.isEmpty()) {
+            StreamsInitializeResponseData response =
+                new StreamsInitializeResponseData()
+                    .setErrorCode(STREAMS_INVALID_TOPOLOGY.code())
+                    .setErrorMessage("Internal topics " + String.join(", ", 
missingTopics) + " do not exist.");
+
+            return new CoordinatorResult<>(records, response);
+        } else {
+            records.add(newStreamsGroupTopologyRecord(groupId, subtopologies));
+
+            StreamsInitializeResponseData response = new 
StreamsInitializeResponseData();
+
+            return new CoordinatorResult<>(records, response);
+        }
+
+    }
+
     /**
      * Handle a JoinGroupRequest to a ConsumerGroup.
      *
@@ -3563,6 +3639,46 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Handles a ConsumerGroupHeartbeat request.
+     *
+     * @param context The request context.
+     * @param request The actual ConsumerGroupHeartbeat request.
+     *
+     * @return A Result containing the ConsumerGroupHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
streamsInitialize(
+        RequestContext context,
+        StreamsInitializeRequestData request
+    ) throws ApiException {
+        throwIfStreamsInitializeRequestIsInvalid(request);
+
+        return streamsInitialize(
+            request.groupId(),
+            request.topology()
+        );
+    }
+
+    /**
+     * Replays StreamsGroupTopologyKey/Value to update the hard state of
+     * the streams group.
+     *
+     * @param key   A StreamsGroupTopologyKey key.
+     * @param value A StreamsGroupTopologyValue record.
+     */
+    public void replay(
+        StreamsGroupTopologyKey key,
+        StreamsGroupTopologyValue value
+    ) {
+
+        // TODO: Insert the topology information to the in-memory 
representation. Needs the notion
+        //       of a Streams group
+//        String groupId = key.groupId();
+//        StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
+//        streamsGroup.setTopology(value);
+    }
+
     /**
      * Handles a ShareGroupHeartbeat request.
      *
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index 812ee093c2c..480c1576440 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group;
 import org.apache.kafka.common.Uuid;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -45,6 +46,8 @@ import 
org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
 import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
@@ -89,6 +92,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -214,6 +218,84 @@ public class GroupCoordinatorRecordHelpersTest {
         ));
     }
 
+    @Test
+    public void testNewStreamsGroupTopologyRecord() {
+        List<StreamsInitializeRequestData.Subtopology> topology =
+            Collections.singletonList(new 
StreamsInitializeRequestData.Subtopology()
+                .setSubtopology("subtopology-id")
+                .setSinkTopics(Collections.singletonList("foo"))
+                .setSourceTopics(Collections.singletonList("bar"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(
+                        new StreamsInitializeRequestData.TopicInfo()
+                            .setName("repartition")
+                            .setPartitions(4)
+                            .setTopicConfigs(Collections.singletonList(
+                                new StreamsInitializeRequestData.TopicConfig()
+                                    .setKey("config-name1")
+                                    .setValue("config-value1")
+                            ))
+                    )
+                )
+                .setStateChangelogTopics(
+                    Collections.singletonList(
+                        new StreamsInitializeRequestData.TopicInfo()
+                            .setName("changelog")
+                            .setTopicConfigs(Collections.singletonList(
+                                new StreamsInitializeRequestData.TopicConfig()
+                                    .setKey("config-name2")
+                                    .setValue("config-value2")
+                            ))
+                    )
+                )
+            );
+
+        List<StreamsGroupTopologyValue.Subtopology> expectedTopology =
+            Collections.singletonList(new 
StreamsGroupTopologyValue.Subtopology()
+                .setSubtopology("subtopology-id")
+                .setSinkTopics(Collections.singletonList("foo"))
+                .setSourceTopics(Collections.singletonList("bar"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(
+                        new StreamsGroupTopologyValue.TopicInfo()
+                            .setName("repartition")
+                            .setPartitions(4)
+                            .setTopicConfigs(Collections.singletonList(
+                                new StreamsGroupTopologyValue.TopicConfig()
+                                    .setKey("config-name1")
+                                    .setValue("config-value1")
+                            ))
+                    )
+                )
+                .setStateChangelogTopics(
+                    Collections.singletonList(
+                        new StreamsGroupTopologyValue.TopicInfo()
+                            .setName("changelog")
+                            .setTopicConfigs(Collections.singletonList(
+                                new StreamsGroupTopologyValue.TopicConfig()
+                                    .setKey("config-name2")
+                                    .setValue("config-value2")
+                            ))
+                    )
+                )
+            );
+
+        CoordinatorRecord expectedRecord = new CoordinatorRecord(
+            new ApiMessageAndVersion(
+                new StreamsGroupTopologyKey()
+                    .setGroupId("group-id"),
+                (short) 15),
+            new ApiMessageAndVersion(
+                new StreamsGroupTopologyValue()
+                    .setTopology(expectedTopology),
+                (short) 0));
+
+        assertEquals(expectedRecord, newStreamsGroupTopologyRecord(
+            "group-id",
+            topology
+        ));
+    }
+
     @Test
     public void 
testEmptyPartitionMetadataWhenRacksUnavailableGroupSubscriptionMetadataRecord() 
{
         Uuid fooTopicId = Uuid.randomUuid();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index bccfa57397c..96273e368e1 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.Topic;
@@ -53,6 +54,8 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -260,6 +263,118 @@ public class GroupCoordinatorServiceTest {
         );
     }
 
+    @Test
+    public void testStreamsInitializeWhenNotStarted() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+
+        StreamsInitializeRequestData request = new 
StreamsInitializeRequestData()
+            .setGroupId("foo");
+
+        CompletableFuture<StreamsInitializeResponseData> future = 
service.streamsInitialize(
+            requestContext(ApiKeys.STREAMS_INITIALIZE),
+            request
+        );
+
+        assertEquals(
+            new StreamsInitializeResponseData()
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+            future.get()
+        );
+    }
+
+    @Test
+    public void testStreamsInitialize() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+
+        StreamsInitializeRequestData request = new 
StreamsInitializeRequestData()
+            .setGroupId("foo");
+
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("streams-group-initialize"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            new StreamsInitializeResponseData()
+        ));
+
+        CompletableFuture<StreamsInitializeResponseData> future = 
service.streamsInitialize(
+            requestContext(ApiKeys.STREAMS_INITIALIZE),
+            request
+        );
+
+        assertEquals(new StreamsInitializeResponseData(), future.get(5, 
TimeUnit.SECONDS));
+    }
+
+    private static Stream<Arguments> 
testStreamsInitializeWithExceptionSource() {
+        return Stream.of(
+            Arguments.arguments(new UnknownTopicOrPartitionException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new NotEnoughReplicasException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new 
org.apache.kafka.common.errors.TimeoutException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new NotLeaderOrFollowerException(), 
Errors.NOT_COORDINATOR.code(), null),
+            Arguments.arguments(new KafkaStorageException(), 
Errors.NOT_COORDINATOR.code(), null),
+            Arguments.arguments(new RecordTooLargeException(), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new RecordBatchTooLargeException(), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new InvalidFetchSizeException(""), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new InvalidRequestException("Invalid"), 
Errors.INVALID_REQUEST.code(), "Invalid"),
+            Arguments.arguments(new 
StreamsInvalidTopologyException("Invalid"), 
Errors.STREAMS_INVALID_TOPOLOGY.code(), "Invalid")
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("testStreamsInitializeWithExceptionSource")
+    public void testStreamsInitializeWithException(
+        Throwable exception,
+        short expectedErrorCode,
+        String expectedErrorMessage
+    ) throws ExecutionException, InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+
+        StreamsInitializeRequestData request = new 
StreamsInitializeRequestData()
+            .setGroupId("foo");
+
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("streams-group-initialize"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(exception));
+
+        CompletableFuture<StreamsInitializeResponseData> future = 
service.streamsInitialize(
+            requestContext(ApiKeys.STREAMS_INITIALIZE),
+            request
+        );
+
+        assertEquals(
+            new StreamsInitializeResponseData()
+                .setErrorCode(expectedErrorCode)
+                .setErrorMessage(expectedErrorMessage),
+            future.get(5, TimeUnit.SECONDS)
+        );
+    }
+
     @Test
     public void testPartitionFor() {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index cb68771c8a5..33756fe53ac 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -24,6 +24,8 @@ import 
org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -131,6 +133,38 @@ public class GroupCoordinatorShardTest {
         assertEquals(result, coordinator.consumerGroupHeartbeat(context, 
request));
     }
 
+    @Test
+    public void testStreamsInitialize() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        RequestContext context = requestContext(ApiKeys.STREAMS_INITIALIZE);
+        StreamsInitializeRequestData request = new 
StreamsInitializeRequestData();
+        CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
result = new CoordinatorResult<>(
+            Collections.emptyList(),
+            new StreamsInitializeResponseData()
+        );
+
+        when(groupMetadataManager.streamsInitialize(
+            context,
+            request
+        )).thenReturn(result);
+
+        assertEquals(result, coordinator.streamsInitialize(context, request));
+    }
+
     @Test
     public void testCommitOffset() {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index c5bb26d4ba7..c9f2d80ca2b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -54,6 +54,11 @@ import 
org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import 
org.apache.kafka.common.message.StreamsInitializeRequestData.Subtopology;
+import 
org.apache.kafka.common.message.StreamsInitializeRequestData.TopicConfig;
+import org.apache.kafka.common.message.StreamsInitializeRequestData.TopicInfo;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
 import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -15757,4 +15762,139 @@ public class GroupMetadataManagerTest {
         assertEquals(expectedSuccessCount, successCount);
         return memberIds;
     }
+
+    @Test
+    public void testTopologyInitialization() {
+        String groupId = "fooup";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "repartition";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "changelog";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .build();
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.consumerGroup(groupId));
+
+        final List<Subtopology> topology = Collections.singletonList(
+            new Subtopology()
+                .setSubtopology("subtopology-id")
+                .setSinkTopics(Collections.singletonList("foo"))
+                .setSourceTopics(Collections.singletonList("bar"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(
+                        new TopicInfo()
+                            .setName("repartition")
+                            .setPartitions(4)
+                            .setTopicConfigs(Collections.singletonList(
+                                new TopicConfig()
+                                    .setKey("config-name1")
+                                    .setValue("config-value1")
+                            ))
+                    )
+                )
+                .setStateChangelogTopics(
+                    Collections.singletonList(
+                        new TopicInfo()
+                            .setName("changelog")
+                            .setTopicConfigs(Collections.singletonList(
+                                new TopicConfig()
+                                    .setKey("config-name2")
+                                    .setValue("config-value2")
+                            ))
+                    )
+                )
+        );
+        CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
result =
+            context.streamsInitialize(
+                new StreamsInitializeRequestData()
+                    .setGroupId(groupId)
+                    .setTopology(topology)
+            );
+
+        assertEquals(
+            new StreamsInitializeResponseData(),
+            result.response()
+        );
+
+        List<CoordinatorRecord> expectedRecords = Arrays.asList(
+            CoordinatorRecordHelpers.newStreamsGroupTopologyRecord(
+                groupId,
+                topology
+            )
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testTopologyInitializationMissingInternalTopics() {
+        String groupId = "fooup";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "repartition";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.consumerGroup(groupId));
+
+        final List<Subtopology> topology = Collections.singletonList(
+            new Subtopology()
+                .setSubtopology("subtopology-id")
+                .setSinkTopics(Collections.singletonList("foo"))
+                .setSourceTopics(Collections.singletonList("bar"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(
+                        new TopicInfo()
+                            .setName("repartition")
+                            .setPartitions(4)
+                            .setTopicConfigs(Collections.singletonList(
+                                new TopicConfig()
+                                    .setKey("config-name1")
+                                    .setValue("config-value1")
+                            ))
+                    )
+                )
+                .setStateChangelogTopics(
+                    Collections.singletonList(
+                        new TopicInfo()
+                            .setName("changelog")
+                            .setTopicConfigs(Collections.singletonList(
+                                new TopicConfig()
+                                    .setKey("config-name2")
+                                    .setValue("config-value2")
+                            ))
+                    )
+                )
+        );
+        CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
result =
+            context.streamsInitialize(
+                new StreamsInitializeRequestData()
+                    .setGroupId(groupId)
+                    .setTopology(topology)
+            );
+
+        assertEquals(
+            new StreamsInitializeResponseData()
+                .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code())
+                .setErrorMessage("Internal topics changelog do not exist."),
+            result.response()
+        );
+
+        assertTrue(result.records().isEmpty());
+
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 4230934a499..bdcc0c9e012 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.network.ClientInformation;
@@ -84,6 +86,8 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
@@ -673,6 +677,37 @@ public class GroupMetadataManagerTestContext {
         return result;
     }
 
+
+    public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
streamsInitialize(
+        StreamsInitializeRequestData request
+    ) {
+        RequestContext context = new RequestContext(
+            new RequestHeader(
+                ApiKeys.STREAMS_INITIALIZE,
+                ApiKeys.STREAMS_INITIALIZE.latestVersion(),
+                "client",
+                0
+            ),
+            "1",
+            InetAddress.getLoopbackAddress(),
+            KafkaPrincipal.ANONYMOUS,
+            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+            SecurityProtocol.PLAINTEXT,
+            ClientInformation.EMPTY,
+            false
+        );
+
+        CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> 
result = groupMetadataManager.streamsInitialize(
+            context,
+            request
+        );
+
+        if (result.replayRecords()) {
+            result.records().forEach(this::replay);
+        }
+        return result;
+    }
+
     public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
sleep(long ms) {
         time.sleep(ms);
         List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
timeouts = timer.poll();
@@ -1583,6 +1618,13 @@ public class GroupMetadataManagerTestContext {
                 );
                 break;
 
+            case StreamsGroupTopologyKey.HIGHEST_SUPPORTED_VERSION:
+                groupMetadataManager.replay(
+                    (StreamsGroupTopologyKey) key.message(),
+                    (StreamsGroupTopologyValue) messageOrNull(value)
+                );
+                break;
+
             default:
                 throw new IllegalStateException("Received an unknown record 
type " + key.version()
                     + " in " + record);

Reply via email to