This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 6c36a43fe9d6f9bc69a279ba418f094900e3321a
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Jul 16 16:02:52 2024 +0200

    Resolve conflicts for 11/25 trunk rebase - Rebased on AK trunk 2024-07-16
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  2 -
 .../requests/StreamsInstallAssignmentRequest.java  | 77 -----------------
 .../requests/StreamsInstallAssignmentResponse.java | 75 -----------------
 .../requests/StreamsPrepareAssignmentRequest.java  | 77 -----------------
 .../requests/StreamsPrepareAssignmentResponse.java | 75 -----------------
 .../message/StreamsGroupDescribeRequest.json       |  2 +-
 .../message/StreamsGroupDescribeResponse.json      |  2 +-
 .../common/message/StreamsHeartbeatRequest.json    |  2 +-
 .../common/message/StreamsHeartbeatResponse.json   |  2 +-
 .../common/message/StreamsInitializeRequest.json   |  2 +-
 .../common/message/StreamsInitializeResponse.json  |  2 +-
 .../message/StreamsInstallAssignmentRequest.json   | 39 ---------
 .../message/StreamsInstallAssignmentResponse.json  | 26 ------
 .../message/StreamsPrepareAssignmentRequest.json   | 16 ----
 .../message/StreamsPrepareAssignmentResponse.json  | 98 ----------------------
 .../kafka/common/requests/RequestResponseTest.java | 35 ++++----
 .../group/GroupCoordinatorRecordHelpers.java       | 73 ++++------------
 .../coordinator/group/GroupCoordinatorService.java |  4 +-
 .../coordinator/group/GroupCoordinatorShard.java   | 28 ++++---
 .../coordinator/group/GroupMetadataManager.java    | 64 ++++++++++++++
 .../coordinator/group/streams/StreamsGroup.java    |  3 +-
 .../StreamsGroupCurrentMemberAssignmentKey.json    |  6 +-
 .../message/StreamsGroupMemberMetadataKey.json     |  6 +-
 .../common/message/StreamsGroupMetadataKey.json    |  4 +-
 .../message/StreamsGroupPartitionMetadataKey.json  |  4 +-
 .../StreamsGroupTargetAssignmentMemberKey.json     |  6 +-
 .../StreamsGroupTargetAssignmentMetadataKey.json   |  4 +-
 .../common/message/StreamsGroupTopologyKey.json    |  4 +-
 .../group/GroupCoordinatorRecordHelpersTest.java   |  2 +-
 .../group/streams/StreamsGroupTest.java            | 17 ++--
 .../SmokeTestDriverIntegrationTest.java            |  3 +-
 31 files changed, 151 insertions(+), 609 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 34ed496e1ec..35548b52ebf 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -133,8 +133,6 @@ public enum ApiKeys {
     
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, 
true),
     STREAMS_HEARTBEAT(ApiMessageType.STREAMS_HEARTBEAT),
     STREAMS_INITIALIZE(ApiMessageType.STREAMS_INITIALIZE),
-    STREAMS_INSTALL_ASSIGNMENT(ApiMessageType.STREAMS_INSTALL_ASSIGNMENT),
-    STREAMS_PREPARE_ASSIGNMENT(ApiMessageType.STREAMS_PREPARE_ASSIGNMENT),
     STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE);
     
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java
deleted file mode 100644
index 7063ec61676..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData;
-import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.common.protocol.Errors;
-
-import java.nio.ByteBuffer;
-
-public class StreamsInstallAssignmentRequest extends AbstractRequest  {
-
-    public static class Builder extends 
AbstractRequest.Builder<StreamsInstallAssignmentRequest> {
-        private final StreamsInstallAssignmentRequestData data;
-
-        public Builder(StreamsInstallAssignmentRequestData data) {
-            this(data, false);
-        }
-
-        public Builder(StreamsInstallAssignmentRequestData data, boolean 
enableUnstableLastVersion) {
-            super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT, 
enableUnstableLastVersion);
-            this.data = data;
-        }
-
-        @Override
-        public StreamsInstallAssignmentRequest build(short version) {
-            return new StreamsInstallAssignmentRequest(data, version);
-        }
-
-        @Override
-        public String toString() {
-            return data.toString();
-        }
-    }
-
-    private final StreamsInstallAssignmentRequestData data;
-
-    public StreamsInstallAssignmentRequest(StreamsInstallAssignmentRequestData 
data, short version) {
-        super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT, version);
-        this.data = data;
-    }
-
-    @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        return new StreamsInstallAssignmentResponse(
-            new StreamsInstallAssignmentResponseData()
-                .setThrottleTimeMs(throttleTimeMs)
-                .setErrorCode(Errors.forException(e).code())
-        );
-    }
-
-    @Override
-    public StreamsInstallAssignmentRequestData data() {
-        return data;
-    }
-
-    public static StreamsInstallAssignmentRequest parse(ByteBuffer buffer, 
short version) {
-        return new StreamsInstallAssignmentRequest(new 
StreamsInstallAssignmentRequestData(
-            new ByteBufferAccessor(buffer), version), version);
-    }
-}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java
deleted file mode 100644
index cc389760577..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.common.protocol.Errors;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Possible error codes.
- *
- * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
- * - {@link Errors#NOT_COORDINATOR}
- * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
- * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
- * - {@link Errors#INVALID_REQUEST}
- * - {@link Errors#INVALID_GROUP_ID}
- * - {@link Errors#GROUP_ID_NOT_FOUND}
- * - {@link Errors#UNKNOWN_MEMBER_ID}
- * - {@link Errors#STALE_MEMBER_EPOCH}
- * - {@link Errors#STREAMS_INVALID_ASSIGNMENT}
- */
-public class StreamsInstallAssignmentResponse extends AbstractResponse {
-
-    private final StreamsInstallAssignmentResponseData data;
-
-    public 
StreamsInstallAssignmentResponse(StreamsInstallAssignmentResponseData data) {
-        super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT);
-        this.data = data;
-    }
-
-    @Override
-    public StreamsInstallAssignmentResponseData data() {
-        return data;
-    }
-
-    @Override
-    public Map<Errors, Integer> errorCounts() {
-        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
-    }
-
-    @Override
-    public int throttleTimeMs() {
-        return data.throttleTimeMs();
-    }
-
-    @Override
-    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
-        data.setThrottleTimeMs(throttleTimeMs);
-    }
-
-    public static StreamsInstallAssignmentResponse parse(ByteBuffer buffer, 
short version) {
-        return new StreamsInstallAssignmentResponse(new 
StreamsInstallAssignmentResponseData(
-            new ByteBufferAccessor(buffer), version));
-    }
-}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java
deleted file mode 100644
index adeef79393b..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData;
-import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.common.protocol.Errors;
-
-import java.nio.ByteBuffer;
-
-public class StreamsPrepareAssignmentRequest extends AbstractRequest  {
-
-    public static class Builder extends 
AbstractRequest.Builder<StreamsPrepareAssignmentRequest> {
-        private final StreamsPrepareAssignmentRequestData data;
-
-        public Builder(StreamsPrepareAssignmentRequestData data) {
-            this(data, false);
-        }
-
-        public Builder(StreamsPrepareAssignmentRequestData data, boolean 
enableUnstableLastVersion) {
-            super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT, 
enableUnstableLastVersion);
-            this.data = data;
-        }
-
-        @Override
-        public StreamsPrepareAssignmentRequest build(short version) {
-            return new StreamsPrepareAssignmentRequest(data, version);
-        }
-
-        @Override
-        public String toString() {
-            return data.toString();
-        }
-    }
-
-    private final StreamsPrepareAssignmentRequestData data;
-
-    public StreamsPrepareAssignmentRequest(StreamsPrepareAssignmentRequestData 
data, short version) {
-        super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT, version);
-        this.data = data;
-    }
-
-    @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        return new StreamsPrepareAssignmentResponse(
-            new StreamsPrepareAssignmentResponseData()
-                .setThrottleTimeMs(throttleTimeMs)
-                .setErrorCode(Errors.forException(e).code())
-        );
-    }
-
-    @Override
-    public StreamsPrepareAssignmentRequestData data() {
-        return data;
-    }
-
-    public static StreamsPrepareAssignmentRequest parse(ByteBuffer buffer, 
short version) {
-        return new StreamsPrepareAssignmentRequest(new 
StreamsPrepareAssignmentRequestData(
-            new ByteBufferAccessor(buffer), version), version);
-    }
-}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java
deleted file mode 100644
index f0c478ec81b..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ByteBufferAccessor;
-import org.apache.kafka.common.protocol.Errors;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Possible error codes.
- *
- * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
- * - {@link Errors#NOT_COORDINATOR}
- * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
- * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
- * - {@link Errors#INVALID_REQUEST}
- * - {@link Errors#INVALID_GROUP_ID}
- * - {@link Errors#GROUP_ID_NOT_FOUND}
- * - {@link Errors#UNKNOWN_MEMBER_ID}
- * - {@link Errors#STALE_MEMBER_EPOCH}
- */
-public class StreamsPrepareAssignmentResponse extends AbstractResponse {
-
-    private final StreamsPrepareAssignmentResponseData data;
-
-    public 
StreamsPrepareAssignmentResponse(StreamsPrepareAssignmentResponseData data) {
-        super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT);
-        this.data = data;
-    }
-
-    @Override
-    public StreamsPrepareAssignmentResponseData data() {
-        return data;
-    }
-
-    @Override
-    public Map<Errors, Integer> errorCounts() {
-        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
-    }
-
-    @Override
-    public int throttleTimeMs() {
-        return data.throttleTimeMs();
-    }
-
-    @Override
-    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
-        data.setThrottleTimeMs(throttleTimeMs);
-    }
-
-    public static StreamsPrepareAssignmentResponse parse(ByteBuffer buffer, 
short version) {
-        return new StreamsPrepareAssignmentResponse(new 
StreamsPrepareAssignmentResponseData(
-            new ByteBufferAccessor(buffer), version));
-    }
-    
-}
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
index 426e9dd81ed..2c828455a67 100644
--- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
+++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
@@ -14,7 +14,7 @@
 // limitations under the License.
 
 {
-  "apiKey": 80,
+  "apiKey": 90,
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "StreamsGroupDescribeRequest",
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index 3080631d66d..ae4b3f8ea35 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -14,7 +14,7 @@
 // limitations under the License.
 
 {
-  "apiKey": 80,
+  "apiKey": 90,
   "type": "response",
   "name": "StreamsGroupDescribeResponse",
   "validVersions": "0",
diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
index 2ac6ce2e6e9..4121133c73c 100644
--- a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
@@ -14,7 +14,7 @@
 // limitations under the License.
 
 {
-  "apiKey": 76,
+  "apiKey": 89,
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "StreamsHeartbeatRequest",
diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
index 2a3e7f1ea80..9cbbdee2a58 100644
--- a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
@@ -14,7 +14,7 @@
 // limitations under the License.
 
 {
-  "apiKey": 76,
+  "apiKey": 89,
   "type": "response",
   "name": "StreamsHeartbeatResponse",
   "validVersions": "0",
diff --git 
a/clients/src/main/resources/common/message/StreamsInitializeRequest.json 
b/clients/src/main/resources/common/message/StreamsInitializeRequest.json
index 5acc772321c..b93fd3d7a7f 100644
--- a/clients/src/main/resources/common/message/StreamsInitializeRequest.json
+++ b/clients/src/main/resources/common/message/StreamsInitializeRequest.json
@@ -14,7 +14,7 @@
 // limitations under the License.
 
 {
-  "apiKey": 77,
+  "apiKey": 88,
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "StreamsInitializeRequest",
diff --git 
a/clients/src/main/resources/common/message/StreamsInitializeResponse.json 
b/clients/src/main/resources/common/message/StreamsInitializeResponse.json
index ff73577303e..2b4be5f778d 100644
--- a/clients/src/main/resources/common/message/StreamsInitializeResponse.json
+++ b/clients/src/main/resources/common/message/StreamsInitializeResponse.json
@@ -14,7 +14,7 @@
 // limitations under the License.
 
 {
-  "apiKey": 77,
+  "apiKey": 88,
   "type": "response",
   "name": "StreamsInitializeResponse",
   "validVersions": "0",
diff --git 
a/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json
 
b/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json
deleted file mode 100644
index 847bb9bb215..00000000000
--- 
a/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json
+++ /dev/null
@@ -1,39 +0,0 @@
-{
-  "apiKey": 78,
-  "type": "request",
-  "listeners": ["zkBroker", "broker"],
-  "name": "StreamsInstallAssignmentRequest",
-  "validVersions": "0",
-  "flexibleVersions": "0+",
-  "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
-      "about": "The group identifier." },
-    { "name": "MemberId", "type": "string", "versions": "0+",
-      "about": "The member id assigned by the group coordinator." },
-    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
-      "about": "The member epoch." },
-    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
-      "about": "The group epoch." },
-    { "name": "Error", "type": "int8", "versions": "0+",
-      "about": "The assignment error; or zero if the assignment is 
successful." },
-    { "name": "Members", "type": "[]Member", "versions": "0+",
-      "about": "The members.", "fields": [
-      { "name": "MemberId", "type": "string", "versions": "0+",
-        "about": "The member ID." },
-      { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+",
-        "about": "Local assignment for this consumer." },
-      { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+",
-        "about": "Local standby tasks for this consumer." },
-      { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+",
-        "about": "Local warming up tasks for this consumer." }
-    ]}
-  ],
-  "commonStructs": [
-    { "name": "TaskIds", "versions": "0+", "fields": [
-      { "name": "Subtopology", "type": "string", "versions": "0+",
-        "about": "subtopology ID" },
-      { "name": "Partitions", "type": "[]int32", "versions": "0+",
-        "about": "partitions" }
-    ]}
-  ]
-}
\ No newline at end of file
diff --git 
a/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json
 
b/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json
deleted file mode 100644
index b1fe8e00644..00000000000
--- 
a/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
-  "apiKey": 78,
-  "type": "response",
-  "name": "StreamsInstallAssignmentResponse",
-  "validVersions": "0",
-  "flexibleVersions": "0+",
-  // Supported errors:
-  // - GROUP_AUTHORIZATION_FAILED
-  // - NOT_COORDINATOR
-  // - COORDINATOR_NOT_AVAILABLE
-  // - COORDINATOR_LOAD_IN_PROGRESS
-  // - INVALID_REQUEST
-  // - INVALID_GROUP_ID
-  // - GROUP_ID_NOT_FOUND
-  // - UNKNOWN_MEMBER_ID
-  // - STALE_MEMBER_EPOCH
-  // - STREAMS_INVALID_ASSIGNMENT
-  "fields": [
-    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
-      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-    { "name": "ErrorCode", "type": "int16", "versions": "0+",
-      "about": "The top-level error code, or 0 if there was no error" },
-    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-      "about": "The top-level error message, or null if there was no error." }
-  ]
-}
diff --git 
a/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json
 
b/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json
deleted file mode 100644
index 12b9098940c..00000000000
--- 
a/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json
+++ /dev/null
@@ -1,16 +0,0 @@
-{
-  "apiKey": 79,
-  "type": "request",
-  "listeners": ["zkBroker", "broker"],
-  "name": "StreamsPrepareAssignmentRequest",
-  "validVersions": "0",
-  "flexibleVersions": "0+",
-  "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
-      "about": "The group identifier." },
-    { "name": "MemberId", "type": "string", "versions": "0+",
-      "about": "The member id assigned by the group coordinator." },
-    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
-      "about": "The member epoch." }
-  ]
-}
\ No newline at end of file
diff --git 
a/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json
 
b/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json
deleted file mode 100644
index 1d9622caa9e..00000000000
--- 
a/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json
+++ /dev/null
@@ -1,98 +0,0 @@
-{
-  "apiKey": 79,
-  "type": "response",
-  "name": "StreamsPrepareAssignmentResponse",
-  "validVersions": "0",
-  "flexibleVersions": "0+",
-  // Supported errors:
-  // - GROUP_AUTHORIZATION_FAILED
-  // - NOT_COORDINATOR
-  // - COORDINATOR_NOT_AVAILABLE
-  // - COORDINATOR_LOAD_IN_PROGRESS
-  // - INVALID_REQUEST
-  // - INVALID_GROUP_ID
-  // - GROUP_ID_NOT_FOUND
-  // - UNKNOWN_MEMBER_ID
-  // - STALE_MEMBER_EPOCH
-  "fields": [
-    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
-      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-    { "name": "ErrorCode", "type": "int16", "versions": "0+",
-      "about": "The top-level error code, or 0 if there was no error" },
-    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-      "about": "The top-level error message, or null if there was no error." },
-    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
-      "about": "The group epoch." },
-    { "name": "AssignorName", "type": "string", "versions": "0+",
-      "about": "The selected assignor." },
-    { "name": "Members", "type": "[]Member", "versions": "0+",
-      "about": "The members.", "fields": [
-      { "name": "MemberId", "type": "string", "versions": "0+",
-        "about": "The member ID." },
-      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
-        "about": "The member epoch." },
-      { "name": "InstanceId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-        "about": "The member instance ID." },
-      { "name": "RackId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-        "about": "The rack ID." },
-
-      { "name": "TopologyHash", "type": "bytes", "versions": "0+",
-        "about": "The hash of the topology." },
-
-      { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+",
-        "about": "Target active tasks for this consumer." },
-      { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+",
-        "about": "Target standby tasks for this consumer." },
-      { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+",
-        "about": "Target warming up tasks for this consumer. " },
-
-      { "name": "ProcessId", "type": "uuid", "versions": "0+",
-        "about": "Identity of the streams instance that may have multiple 
consumers. " },
-      { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
-        "about": "Used for rack-aware assignment algorithm." },
-      { "name": "TaskOffsetUpdateReason", "type":  "int8", "versions": "0+", 
"default": 0,
-        "about": "Reason for updating task offset. 0 if no update, 1 if task 
catches up with acceptable.recovery.lag, 2 if task falls behind 
acceptable.recovery.lag, 3 if a standby task is added or removed, 4 if triggerd 
by a time-interval" },
-      { "name": "TaskOffset", "type": "[]TaskOffset", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-        "about": "Cumulative offsets for assigned and dormant standby tasks. 
Only updatable when TaskOffsetUpdateReason is not zero. Null if unchanged since 
last heartbeat.",
-        "fields": [
-          { "name": "Subtopology", "type": "string", "versions": "0+",
-            "about": "A unique identifier of the subtopology." },
-          { "name": "Partition", "type": "int32", "versions": "0+",
-            "about": "The partition of the input topics." },
-          { "name": "Offset", "type": "int64", "versions": "0+",
-            "about": "The cumulative offset for the task" }
-        ]
-      },
-      { "name": "UserData", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-        "about": "Opaque user data to be passed to the client-side assignor. 
Null if unchanged since last heartbeat." },
-      { "name": "AssignmentConfigs", "type": "[]KeyValue", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-        "about": "Generic array of assignment configuration strings."}
-    ]},
-    { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+",
-      "about": "The topic-partition metadata.",
-      "fields": [
-        { "name": "TopicId", "type": "uuid", "versions": "0+",
-          "about": "The topic ID." },
-        { "name": "TopicName", "type": "string", "versions": "0+",
-          "about": "The topic name." },
-        { "name": "NumPartitions", "type": "int32", "versions": "0+",
-          "about": "The number of partitions." }
-      ]}
-  ],
-  "commonStructs": [
-    { "name": "TaskIds", "versions": "0+", "fields": [
-      { "name": "Subtopology", "type": "string", "versions": "0+",
-        "about": "subtopology ID" },
-      { "name": "Partitions", "type": "[]int32", "versions": "0+",
-        "about": "partitions" }
-    ]},
-    { "name": "KeyValue", "versions": "0+",
-      "fields": [
-        { "name": "Key", "type": "string", "versions": "0+",
-          "about": "key of the config" },
-        { "name": "Value", "type": "string", "versions": "0+",
-          "about": "value of the config" }
-      ]
-    }
-  ]
-}
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index da4ae96af47..1772de05e85 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -238,10 +238,6 @@ import 
org.apache.kafka.common.message.StreamsHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.message.StreamsInitializeResponseData;
-import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData;
-import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
-import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData;
-import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
 import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -4027,13 +4023,13 @@ public class RequestResponseTest {
         return new ReadShareGroupStateSummaryResponse(data);
     }
 
-    private AbstractRequest createStreamsPrepareAssignmentRequest(final short 
version) {
-        return new StreamsPrepareAssignmentRequest.Builder(new 
StreamsPrepareAssignmentRequestData()).build(version);
-    }
-
-    private AbstractRequest createStreamsInstallAssignmentRequest(final short 
version) {
-        return new StreamsInstallAssignmentRequest.Builder(new 
StreamsInstallAssignmentRequestData()).build(version);
-    }
+//    private AbstractRequest createStreamsPrepareAssignmentRequest(final 
short version) {
+//        return new StreamsPrepareAssignmentRequest.Builder(new 
StreamsPrepareAssignmentRequestData()).build(version);
+//    }
+//
+//    private AbstractRequest createStreamsInstallAssignmentRequest(final 
short version) {
+//        return new StreamsInstallAssignmentRequest.Builder(new 
StreamsInstallAssignmentRequestData()).build(version);
+//    }
 
     private AbstractRequest createStreamsInitializeRequest(final short 
version) {
         return new StreamsInitializeRequest.Builder(new 
StreamsInitializeRequestData()).build(version);
@@ -4043,20 +4039,21 @@ public class RequestResponseTest {
         return new StreamsHeartbeatRequest.Builder(new 
StreamsHeartbeatRequestData()).build(version);
     }
 
-    private AbstractResponse createStreamsPrepareAssignmentResponse() {
-        return new StreamsPrepareAssignmentResponse(new 
StreamsPrepareAssignmentResponseData());
-    }
-
-    private AbstractResponse createStreamsInstallAssignmentResponse() {
-        return new StreamsInstallAssignmentResponse(new 
StreamsInstallAssignmentResponseData());
-    }
-
+//    private AbstractResponse createStreamsPrepareAssignmentResponse() {
+//        return new StreamsPrepareAssignmentResponse(new 
StreamsPrepareAssignmentResponseData());
+//    }
+//
+//    private AbstractResponse createStreamsInstallAssignmentResponse() {
+//        return new StreamsInstallAssignmentResponse(new 
StreamsInstallAssignmentResponseData());
+//    }
+//
     private AbstractResponse createStreamsInitializeResponse() {
         return new StreamsInitializeResponse(new 
StreamsInitializeResponseData());
     }
 
     private AbstractResponse createStreamsHeartbeatResponse() {
         return new StreamsHeartbeatResponse(new 
StreamsHeartbeatResponseData());
+    }
 
     @Test
     public void testInvalidSaslHandShakeRequest() {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index 842df9d7cbb..78c17bb0863 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -155,7 +155,7 @@ public class GroupCoordinatorRecordHelpers {
                 new StreamsGroupMemberMetadataKey()
                     .setGroupId(groupId)
                     .setMemberId(member.memberId()),
-                (short) 11
+                (short) 14
             ),
             new ApiMessageAndVersion(
                 new StreamsGroupMemberMetadataValue()
@@ -200,7 +200,7 @@ public class GroupCoordinatorRecordHelpers {
                 new StreamsGroupMemberMetadataKey()
                     .setGroupId(groupId)
                     .setMemberId(memberId),
-                (short) 11
+                (short) 14
             ),
             null // Tombstone.
         );
@@ -293,7 +293,7 @@ public class GroupCoordinatorRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupPartitionMetadataKey()
                     .setGroupId(groupId),
-                (short) 10
+                (short) 13
             ),
             new ApiMessageAndVersion(
                 value,
@@ -315,7 +315,7 @@ public class GroupCoordinatorRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupPartitionMetadataKey()
                     .setGroupId(groupId),
-                (short) 10
+                (short) 13
             ),
             null // Tombstone.
         );
@@ -346,25 +346,6 @@ public class GroupCoordinatorRecordHelpers {
         );
     }
 
-    /**
-     * Creates a ConsumerGroupMetadata tombstone.
-     *
-     * @param groupId The consumer group id.
-     * @return The record.
-     */
-    public static CoordinatorRecord newGroupEpochTombstoneRecord(
-        String groupId
-    ) {
-        return new CoordinatorRecord(
-            new ApiMessageAndVersion(
-                new ConsumerGroupMetadataKey()
-                    .setGroupId(groupId),
-                (short) 3
-            ),
-            null // Tombstone.
-        );
-    }
-
     public static CoordinatorRecord newStreamsGroupEpochRecord(
         String groupId,
         int newGroupEpoch
@@ -373,7 +354,7 @@ public class GroupCoordinatorRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupMetadataKey()
                     .setGroupId(groupId),
-                (short) 9
+                (short) 12
             ),
             new ApiMessageAndVersion(
                 new StreamsGroupMetadataValue()
@@ -396,16 +377,16 @@ public class GroupCoordinatorRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupMetadataKey()
                     .setGroupId(groupId),
-                (short) 9
+                (short) 12
             ),
             null // Tombstone.
         );
     }
 
     /**
-     * Creates a StreamsGroupMetadata tombstone.
+     * Creates a ConsumerGroupMetadata tombstone.
      *
-     * @param groupId The streams group id.
+     * @param groupId The consumer group id.
      * @return The record.
      */
     public static CoordinatorRecord newConsumerGroupEpochTombstoneRecord(
@@ -519,7 +500,7 @@ public class GroupCoordinatorRecordHelpers {
                 new StreamsGroupTargetAssignmentMemberKey()
                     .setGroupId(groupId)
                     .setMemberId(memberId),
-                (short) 13
+                (short) 16
             ),
             new ApiMessageAndVersion(
                 new StreamsGroupTargetAssignmentMemberValue()
@@ -547,7 +528,7 @@ public class GroupCoordinatorRecordHelpers {
                 new StreamsGroupTargetAssignmentMemberKey()
                     .setGroupId(groupId)
                     .setMemberId(memberId),
-                (short) 13
+                (short) 16
             ),
             null // Tombstone.
         );
@@ -606,7 +587,7 @@ public class GroupCoordinatorRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupTargetAssignmentMetadataKey()
                     .setGroupId(groupId),
-                (short) 12
+                (short) 15
             ),
             new ApiMessageAndVersion(
                 new StreamsGroupTargetAssignmentMetadataValue()
@@ -629,7 +610,7 @@ public class GroupCoordinatorRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupTargetAssignmentMetadataKey()
                     .setGroupId(groupId),
-                (short) 12
+                (short) 15
             ),
             null // Tombstone.
         );
@@ -665,28 +646,6 @@ public class GroupCoordinatorRecordHelpers {
         );
     }
 
-    /**
-     * Creates a ConsumerGroupCurrentMemberAssignment tombstone.
-     *
-     * @param groupId  The consumer group id.
-     * @param memberId The consumer group member id.
-     * @return The record.
-     */
-    public static CoordinatorRecord newCurrentAssignmentTombstoneRecord(
-        String groupId,
-        String memberId
-    ) {
-        return new CoordinatorRecord(
-            new ApiMessageAndVersion(
-                new ConsumerGroupCurrentMemberAssignmentKey()
-                    .setGroupId(groupId)
-                    .setMemberId(memberId),
-                (short) 8
-            ),
-            null // Tombstone
-        );
-    }
-
     public static CoordinatorRecord newStreamsCurrentAssignmentRecord(
         String groupId,
         StreamsGroupMember member
@@ -696,7 +655,7 @@ public class GroupCoordinatorRecordHelpers {
                 new StreamsGroupCurrentMemberAssignmentKey()
                     .setGroupId(groupId)
                     .setMemberId(member.memberId()),
-                (short) 14
+                (short) 17
             ),
             new ApiMessageAndVersion(
                 new StreamsGroupCurrentMemberAssignmentValue()
@@ -783,7 +742,7 @@ public class GroupCoordinatorRecordHelpers {
                 new StreamsGroupCurrentMemberAssignmentKey()
                     .setGroupId(groupId)
                     .setMemberId(memberId),
-                (short) 14
+                (short) 17
             ),
             null // Tombstone
         );
@@ -1356,7 +1315,7 @@ public class GroupCoordinatorRecordHelpers {
                 
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
         });
 
-        return new CoordinatorRecord(new ApiMessageAndVersion(new 
StreamsGroupTopologyKey().setGroupId(groupId), (short) 15),
+        return new CoordinatorRecord(new ApiMessageAndVersion(new 
StreamsGroupTopologyKey().setGroupId(groupId), (short) 18),
             new ApiMessageAndVersion(value, (short) 0));
     }
 
@@ -1373,7 +1332,7 @@ public class GroupCoordinatorRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupTopologyKey()
                     .setGroupId(groupId),
-                (short) 15
+                (short) 18
             ),
             null // Tombstone
         );
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 9f180c5798a..20042ebfb06 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -365,7 +365,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         return runtime.scheduleWriteOperation(
             "streams-group-initialize",
             topicPartitionFor(request.groupId()),
-            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.streamsInitialize(context, request)
         ).exceptionally(exception -> handleOperationException(
             "streams-group-initialize",
@@ -394,7 +394,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         return runtime.scheduleWriteOperation(
             "streams-heartbeat",
             topicPartitionFor(request.groupId()),
-            Duration.ofMillis(config.offsetCommitTimeoutMs),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.streamsHeartbeat(context, request)
         ).exceptionally(exception -> handleOperationException(
             "streams-heartbeat",
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 2d8e5229230..aa0a18d0bff 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -122,6 +122,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.coordinator.group.Utils.messageOrNull;
+
 /**
  * The group coordinator shard is a replicated state machine that manages the 
metadata of all
  * classic and consumer groups. It holds the hard and the soft state of the 
groups. This class
@@ -254,9 +256,9 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 
.withShareGroupHeartbeatInterval(config.shareGroupHeartbeatIntervalMs())
                // TODO: Do we need separate configs for streams groups?
                 .withStreamsGroupAssignors(Collections.singletonList(new 
MockAssignor()))
-                .withStreamsGroupMaxSize(config.consumerGroupMaxSize)
-                
.withStreamsGroupSessionTimeout(config.consumerGroupSessionTimeoutMs)
-                
.withStreamsGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs)
+                .withStreamsGroupMaxSize(config.consumerGroupMaxSize())
+                
.withStreamsGroupSessionTimeout(config.consumerGroupSessionTimeoutMs())
+                
.withStreamsGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs())
                 .withGroupCoordinatorMetricsShard(metricsShard)
                 .build();
 
@@ -851,56 +853,56 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                     offset,
                     producerId,
                     (OffsetCommitKey) key.message(),
-                    (OffsetCommitValue) Utils.messageOrNull(value)
+                    (OffsetCommitValue) messageOrNull(value)
                 );
                 break;
 
             case 2:
                 groupMetadataManager.replay(
                     (GroupMetadataKey) key.message(),
-                    (GroupMetadataValue) Utils.messageOrNull(value)
+                    (GroupMetadataValue) messageOrNull(value)
                 );
                 break;
 
             case 3:
                 groupMetadataManager.replay(
                     (ConsumerGroupMetadataKey) key.message(),
-                    (ConsumerGroupMetadataValue) Utils.messageOrNull(value)
+                    (ConsumerGroupMetadataValue) messageOrNull(value)
                 );
                 break;
 
             case 4:
                 groupMetadataManager.replay(
                     (ConsumerGroupPartitionMetadataKey) key.message(),
-                    (ConsumerGroupPartitionMetadataValue) 
Utils.messageOrNull(value)
+                    (ConsumerGroupPartitionMetadataValue) messageOrNull(value)
                 );
                 break;
 
             case 5:
                 groupMetadataManager.replay(
                     (ConsumerGroupMemberMetadataKey) key.message(),
-                    (ConsumerGroupMemberMetadataValue) 
Utils.messageOrNull(value)
+                    (ConsumerGroupMemberMetadataValue) messageOrNull(value)
                 );
                 break;
 
             case 6:
                 groupMetadataManager.replay(
                     (ConsumerGroupTargetAssignmentMetadataKey) key.message(),
-                    (ConsumerGroupTargetAssignmentMetadataValue) 
Utils.messageOrNull(value)
+                    (ConsumerGroupTargetAssignmentMetadataValue) 
messageOrNull(value)
                 );
                 break;
 
             case 7:
                 groupMetadataManager.replay(
                     (ConsumerGroupTargetAssignmentMemberKey) key.message(),
-                    (ConsumerGroupTargetAssignmentMemberValue) 
Utils.messageOrNull(value)
+                    (ConsumerGroupTargetAssignmentMemberValue) 
messageOrNull(value)
                 );
                 break;
 
             case 8:
                 groupMetadataManager.replay(
                     (ConsumerGroupCurrentMemberAssignmentKey) key.message(),
-                    (ConsumerGroupCurrentMemberAssignmentValue) 
Utils.messageOrNull(value)
+                    (ConsumerGroupCurrentMemberAssignmentValue) 
messageOrNull(value)
                 );
                 break;
 
@@ -914,14 +916,14 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
             case 10:
                 groupMetadataManager.replay(
                     (ShareGroupMemberMetadataKey) key.message(),
-                    (ShareGroupMemberMetadataValue) Utils.messageOrNull(value)
+                    (ShareGroupMemberMetadataValue) messageOrNull(value)
                 );
                 break;
 
             case 11:
                 groupMetadataManager.replay(
                     (ShareGroupMetadataKey) key.message(),
-                    (ShareGroupMetadataValue) Utils.messageOrNull(value)
+                    (ShareGroupMetadataValue) messageOrNull(value)
                 );
                 break;
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 5997591b812..09dd21913e6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -5238,6 +5238,69 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Replays StreamsGroupMetadataKey/Value to update the hard state of
+     * the Streams group. It updates the group epoch of the Streams
+     * group or deletes the Streams group.
+     *
+     * @param key   A StreamsGroupMetadataKey key.
+     * @param value A StreamsGroupMetadataValue record.
+     */
+    public void replay(
+        StreamsGroupMetadataKey key,
+        StreamsGroupMetadataValue value
+    ) {
+        String groupId = key.groupId();
+
+        if (value != null) {
+            StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, true);
+            streamsGroup.setGroupEpoch(value.epoch());
+        } else {
+            StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, false);
+            if (!streamsGroup.members().isEmpty()) {
+                throw new IllegalStateException("Received a tombstone record 
to delete group " + groupId
+                    + " but the group still has " + 
streamsGroup.members().size() + " members.");
+            }
+            if (!streamsGroup.targetAssignment().isEmpty()) {
+                throw new IllegalStateException("Received a tombstone record 
to delete group " + groupId
+                    + " but the target assignment still has " + 
streamsGroup.targetAssignment().size()
+                    + " members.");
+            }
+            if (streamsGroup.assignmentEpoch() != -1) {
+                throw new IllegalStateException("Received a tombstone record 
to delete group " + groupId
+                    + " but did not receive 
StreamsGroupTargetAssignmentMetadataValue tombstone.");
+            }
+            removeGroup(groupId);
+        }
+
+    }
+
+    /**
+     * Replays StreamsGroupPartitionMetadataKey/Value to update the hard state 
of
+     * the streams group. It updates the subscription metadata of the streams
+     * group.
+     *
+     * @param key   A StreamsGroupPartitionMetadataKey key.
+     * @param value A StreamsGroupPartitionMetadataValue record.
+     */
+    public void replay(
+        StreamsGroupPartitionMetadataKey key,
+        StreamsGroupPartitionMetadataValue value
+    ) {
+        String groupId = key.groupId();
+        StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, false);
+
+        if (value != null) {
+            Map<String, 
org.apache.kafka.coordinator.group.streams.TopicMetadata> subscriptionMetadata 
= new HashMap<>();
+            value.topics().forEach(topicMetadata -> {
+                subscriptionMetadata.put(topicMetadata.topicName(), 
org.apache.kafka.coordinator.group.streams.TopicMetadata.fromRecord(topicMetadata));
+            });
+            streamsGroup.setSubscriptionMetadata(subscriptionMetadata);
+        } else {
+            streamsGroup.setSubscriptionMetadata(Collections.emptyMap());
+        }
+    }
+
     /**
      * Replays ShareGroupMemberMetadataKey/Value to update the hard state of
      * the share group. It updates the subscription part of the member or
@@ -5299,6 +5362,7 @@ public class GroupMetadataManager {
             if (!shareGroup.members().isEmpty()) {
                 throw new IllegalStateException("Received a tombstone record 
to delete group " + groupId
                     + " but the group still has " + 
shareGroup.members().size() + " members.");
+            }
             if (!shareGroup.targetAssignment().isEmpty()) {
                 throw new IllegalStateException("Received a tombstone record 
to delete group " + groupId
                     + " but the target assignment still has " + 
shareGroup.targetAssignment().size()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index f83305e3e1a..7d3465c7ad2 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -811,7 +811,8 @@ public class StreamsGroup implements Group {
         String memberId,
         String groupInstanceId,
         int memberEpoch,
-        boolean isTransactional
+        boolean isTransactional,
+        short apiVersion
     ) throws UnknownMemberIdException, StaleMemberEpochException {
         // When the member epoch is -1, the request comes from either the 
admin client
         // or a consumer which does not use the group management facility. In 
this case,
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
index 30f775fa1f8..771f7324a9e 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
@@ -17,12 +17,12 @@
 {
   "type": "data",
   "name": "StreamsGroupCurrentMemberAssignmentKey",
-  "validVersions": "14",
+  "validVersions": "17",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "14",
+    { "name": "GroupId", "type": "string", "versions": "17",
       "about": "The group id." },
-    { "name": "MemberId", "type": "string", "versions": "14",
+    { "name": "MemberId", "type": "string", "versions": "17",
       "about": "The member id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
index 0834b3e41ee..bac9ac247cf 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
@@ -17,12 +17,12 @@
 {
   "type": "data",
   "name": "StreamsGroupMemberMetadataKey",
-  "validVersions": "11",
+  "validVersions": "14",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "11",
+    { "name": "GroupId", "type": "string", "versions": "14",
       "about": "The group id." },
-    { "name": "MemberId", "type": "string", "versions": "11",
+    { "name": "MemberId", "type": "string", "versions": "14",
       "about": "The member id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
index 8f287c05652..26121bf2ba2 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
@@ -17,10 +17,10 @@
 {
   "type": "data",
   "name": "StreamsGroupMetadataKey",
-  "validVersions": "9",
+  "validVersions": "12",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "9",
+    { "name": "GroupId", "type": "string", "versions": "12",
       "about": "The group id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
index 954e5fd1182..546a8f80535 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
@@ -17,10 +17,10 @@
 {
   "type": "data",
   "name": "StreamsGroupPartitionMetadataKey",
-  "validVersions": "10",
+  "validVersions": "13",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "10",
+    { "name": "GroupId", "type": "string", "versions": "13",
       "about": "The group id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
index 128bf44bba7..4fc8231ec3d 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
@@ -17,12 +17,12 @@
 {
   "type": "data",
   "name": "StreamsGroupTargetAssignmentMemberKey",
-  "validVersions": "13",
+  "validVersions": "16",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "13",
+    { "name": "GroupId", "type": "string", "versions": "16",
       "about": "The group id." },
-    { "name": "MemberId", "type": "string", "versions": "13",
+    { "name": "MemberId", "type": "string", "versions": "16",
       "about": "The member id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
index b583b3224b4..02b40f727c4 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
@@ -17,10 +17,10 @@
 {
   "type": "data",
   "name": "StreamsGroupTargetAssignmentMetadataKey",
-  "validVersions": "12",
+  "validVersions": "15",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "12",
+    { "name": "GroupId", "type": "string", "versions": "15",
       "about": "The group id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
index 11873d4403c..261b755cd51 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
@@ -17,10 +17,10 @@
 {
   "type": "data",
   "name": "StreamsGroupTopologyKey",
-  "validVersions": "15",
+  "validVersions": "18",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "15",
+    { "name": "GroupId", "type": "string", "versions": "18",
       "about": "The group id." }
   ]
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index 480c1576440..d6258e13db0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -284,7 +284,7 @@ public class GroupCoordinatorRecordHelpersTest {
             new ApiMessageAndVersion(
                 new StreamsGroupTopologyKey()
                     .setGroupId("group-id"),
-                (short) 15),
+                (short) 18),
             new ApiMessageAndVersion(
                 new StreamsGroupTopologyValue()
                     .setTopology(expectedTopology),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 4dd727b8050..68fddc0654f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -20,8 +20,10 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.apache.kafka.coordinator.group.OffsetAndMetadata;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
@@ -605,31 +607,32 @@ public class StreamsGroupTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void testValidateOffsetCommit(boolean isTransactional) {
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommit(short version) {
+        boolean isTransactional = false;
         StreamsGroup group = createStreamsGroup("group-foo");
 
         // Simulate a call from the admin client without member id and member 
epoch.
         // This should pass only if the group is empty.
-        group.validateOffsetCommit("", "", -1, isTransactional);
+        group.validateOffsetCommit("", "", -1, isTransactional, version);
 
         // The member does not exist.
         assertThrows(UnknownMemberIdException.class, () ->
-            group.validateOffsetCommit("member-id", null, 0, isTransactional));
+            group.validateOffsetCommit("member-id", null, 0, isTransactional, 
version));
 
         // Create a member.
         group.updateMember(new 
StreamsGroupMember.Builder("member-id").build());
 
         // A call from the admin client should fail as the group is not empty.
         assertThrows(UnknownMemberIdException.class, () ->
-            group.validateOffsetCommit("", "", -1, isTransactional));
+            group.validateOffsetCommit("", "", -1, isTransactional, version));
 
         // The member epoch is stale.
         assertThrows(StaleMemberEpochException.class, () ->
-            group.validateOffsetCommit("member-id", "", 10, isTransactional));
+            group.validateOffsetCommit("member-id", "", 10, isTransactional, 
version));
 
         // This should succeed.
-        group.validateOffsetCommit("member-id", "", 0, isTransactional);
+        group.validateOffsetCommit("member-id", "", 0, isTransactional, 
version);
     }
 
     @Test
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index a747f0aa2a8..a2cb052e7f2 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -105,7 +105,8 @@ public class SmokeTestDriverIntegrationTest extends 
IntegrationTestHarness {
     // We set 2 timeout condition to fail the test before passing the 
verification:
     // (1) 10 min timeout, (2) 30 tries of polling without getting any data
     @ParameterizedTest
-    @CsvSource({"false, false", "true, false", "true, true"})
+    @CsvSource({"false, false, true"})
+//    @CsvSource({"false, false", "true, false", "true, true"})
     public void shouldWorkWithRebalance(
         final boolean stateUpdaterEnabled,
         final boolean processingThreadsEnabled,

Reply via email to