This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 497072a5644 KAFKA-19869: Mark streams groups records and APIs stable 
(#20841)
497072a5644 is described below

commit 497072a5644d26debc6471e6315177ec694f944a
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Nov 14 13:42:32 2025 +0100

    KAFKA-19869: Mark streams groups records and APIs stable (#20841)
    
    With 4.2, we are making records and APIs stable, guaranteeing no
    backwards-incompatible changes.
---
 .../internals/DescribeStreamsGroupsHandler.java    |  2 +-
 .../StreamsGroupHeartbeatRequestManager.java       |  2 +-
 .../requests/StreamsGroupDescribeRequest.java      |  6 +---
 .../requests/StreamsGroupHeartbeatRequest.java     |  6 +---
 .../message/StreamsGroupDescribeRequest.json       |  4 ---
 .../message/StreamsGroupHeartbeatRequest.json      |  4 ---
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 36 +++++++++++-----------
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  4 +--
 .../StreamsGroupCurrentMemberAssignmentKey.json    |  1 -
 .../StreamsGroupCurrentMemberAssignmentValue.json  |  1 -
 .../message/StreamsGroupMemberMetadataKey.json     |  1 -
 .../message/StreamsGroupMemberMetadataValue.json   |  1 -
 .../common/message/StreamsGroupMetadataKey.json    |  1 -
 .../common/message/StreamsGroupMetadataValue.json  |  1 -
 .../StreamsGroupTargetAssignmentMemberKey.json     |  1 -
 .../StreamsGroupTargetAssignmentMemberValue.json   |  1 -
 .../StreamsGroupTargetAssignmentMetadataKey.json   |  1 -
 .../StreamsGroupTargetAssignmentMetadataValue.json |  1 -
 .../common/message/StreamsGroupTopologyKey.json    |  1 -
 .../common/message/StreamsGroupTopologyValue.json  |  1 -
 .../apache/kafka/server/common/StreamsVersion.java |  8 ++---
 .../kafka/server/metrics/NodeMetricsTest.java      |  7 ++---
 .../org/apache/kafka/streams/StreamsConfig.java    |  2 --
 .../apache/kafka/streams/StreamsConfigTest.java    | 14 ---------
 24 files changed, 29 insertions(+), 78 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
index 9c037d7dd46..490ef89d4e2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
@@ -92,7 +92,7 @@ public class DescribeStreamsGroupsHandler extends 
AdminApiHandler.Batched<Coordi
         StreamsGroupDescribeRequestData data = new 
StreamsGroupDescribeRequestData()
             .setGroupIds(groupIds)
             .setIncludeAuthorizedOperations(includeAuthorizedOperations);
-        return new StreamsGroupDescribeRequest.Builder(data, true);
+        return new StreamsGroupDescribeRequest.Builder(data);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index eee77b7ae9a..c45e3da1b5e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -506,7 +506,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
     private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs) {
         NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-            new 
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData(), 
true),
+            new 
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
             coordinatorRequestManager.coordinator()
         );
         heartbeatRequestState.onSendAttempt(currentTimeMs);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
index bf7f1576cb0..d4ea35bb64c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
@@ -32,11 +32,7 @@ public class StreamsGroupDescribeRequest extends 
AbstractRequest {
         private final StreamsGroupDescribeRequestData data;
 
         public Builder(StreamsGroupDescribeRequestData data) {
-            this(data, false);
-        }
-
-        public Builder(StreamsGroupDescribeRequestData data, boolean 
enableUnstableLastVersion) {
-            super(ApiKeys.STREAMS_GROUP_DESCRIBE, enableUnstableLastVersion);
+            super(ApiKeys.STREAMS_GROUP_DESCRIBE);
             this.data = data;
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java
index ad6e8414636..26d1592f040 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java
@@ -39,11 +39,7 @@ public class StreamsGroupHeartbeatRequest extends 
AbstractRequest {
         private final StreamsGroupHeartbeatRequestData data;
 
         public Builder(StreamsGroupHeartbeatRequestData data) {
-            this(data, false);
-        }
-
-        public Builder(StreamsGroupHeartbeatRequestData data, boolean 
enableUnstableLastVersion) {
-            super(ApiKeys.STREAMS_GROUP_HEARTBEAT, enableUnstableLastVersion);
+            super(ApiKeys.STREAMS_GROUP_HEARTBEAT);
             this.data = data;
         }
 
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
index e3dad6fa8ec..6e36479043a 100644
--- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
+++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
@@ -20,10 +20,6 @@
   "name": "StreamsGroupDescribeRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
-  // The StreamsGroupDescribeRequest API is added as part of KIP-1071 and is 
still under
-  // development. Hence, the API is not exposed by default by brokers unless
-  // explicitly enabled.
-  "latestVersionUnstable": true,
   "fields": [
     { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": 
"groupId",
       "about": "The ids of the groups to describe" },
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index 8b63e037fc0..a2cba46e763 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -20,10 +20,6 @@
   "name": "StreamsGroupHeartbeatRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
-  // The StreamsGroupHeartbeatRequest API is added as part of KIP-1071 and is 
still under
-  // development. Hence, the API is not exposed by default by brokers unless
-  // explicitly enabled.
-  "latestVersionUnstable": true,
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
       "about": "The group identifier." },
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 284f92b6136..de9daf773bc 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -10828,7 +10828,7 @@ class KafkaApisTest extends Logging {
   def testStreamsGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
     metadataCache = {
       val cache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
       val delta = new MetadataDelta(MetadataImage.EMPTY)
@@ -10858,7 +10858,7 @@ class KafkaApisTest extends Logging {
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
     when(groupCoordinator.streamsGroupHeartbeat(
@@ -10907,7 +10907,7 @@ class KafkaApisTest extends Logging {
         )
     )
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
     val acls = Map(
@@ -10956,7 +10956,7 @@ class KafkaApisTest extends Logging {
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
     when(groupCoordinator.streamsGroupHeartbeat(
@@ -10981,7 +10981,7 @@ class KafkaApisTest extends Logging {
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
     when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
@@ -11022,7 +11022,7 @@ class KafkaApisTest extends Logging {
         )
     )
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
     val acls = Map(
@@ -11057,7 +11057,7 @@ class KafkaApisTest extends Logging {
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     kafkaApis = createKafkaApis(
       overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,consumer")
@@ -11078,7 +11078,7 @@ class KafkaApisTest extends Logging {
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -11108,7 +11108,7 @@ class KafkaApisTest extends Logging {
         )
     )
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -11138,7 +11138,7 @@ class KafkaApisTest extends Logging {
         )
     )
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -11158,7 +11158,7 @@ class KafkaApisTest extends Logging {
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
     when(groupCoordinator.streamsGroupHeartbeat(
@@ -11189,7 +11189,7 @@ class KafkaApisTest extends Logging {
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
     when(groupCoordinator.streamsGroupHeartbeat(
@@ -11245,7 +11245,7 @@ class KafkaApisTest extends Logging {
     when(metadataCache.features()).thenReturn(features)
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
 
     val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
     when(groupCoordinator.streamsGroupHeartbeat(
@@ -11476,7 +11476,7 @@ class KafkaApisTest extends Logging {
     val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
       .setIncludeAuthorizedOperations(includeAuthorizedOperations)
     streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
-    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData).build())
 
     val future = new 
CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
     when(groupCoordinator.streamsGroupDescribe(
@@ -11549,7 +11549,7 @@ class KafkaApisTest extends Logging {
     val groupId = "group0"
     val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
     streamsGroupDescribeRequestData.groupIds.add(groupId)
-    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData).build())
 
     val errorCode = Errors.UNSUPPORTED_VERSION.code
     val expectedDescribedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupId).setErrorCode(errorCode)
@@ -11582,7 +11582,7 @@ class KafkaApisTest extends Logging {
 
     val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
     streamsGroupDescribeRequestData.groupIds.add("group-id")
-    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData).build())
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
     when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
@@ -11613,7 +11613,7 @@ class KafkaApisTest extends Logging {
 
     val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
     streamsGroupDescribeRequestData.groupIds.add("group-id")
-    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData).build())
 
     val future = new 
CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
     when(groupCoordinator.streamsGroupDescribe(
@@ -11645,7 +11645,7 @@ class KafkaApisTest extends Logging {
     val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
       .setIncludeAuthorizedOperations(includeAuthorizedOperations)
     streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
-    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, 
true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData).build())
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
     val acls = Map(
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index e188889556b..ae892a219f6 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -746,10 +746,10 @@ class RequestQuotaTest extends BaseRequestTest {
           new ReadShareGroupStateSummaryRequest.Builder(new 
ReadShareGroupStateSummaryRequestData())
           
         case ApiKeys.STREAMS_GROUP_HEARTBEAT =>
-          new StreamsGroupHeartbeatRequest.Builder(new 
StreamsGroupHeartbeatRequestData(), true)
+          new StreamsGroupHeartbeatRequest.Builder(new 
StreamsGroupHeartbeatRequestData())
 
         case ApiKeys.STREAMS_GROUP_DESCRIBE =>
-          new StreamsGroupDescribeRequest.Builder(new 
StreamsGroupDescribeRequestData(), true)
+          new StreamsGroupDescribeRequest.Builder(new 
StreamsGroupDescribeRequestData())
 
         case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS =>
           new DescribeShareGroupOffsetsRequest.Builder(new 
DescribeShareGroupOffsetsRequestData())
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
index 236d66de03c..077939ceb50 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 22,
   "type": "coordinator-key",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
index 6de638a3af7..57c94a9e6f3 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 22,
   "type": "coordinator-value",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
index ae1fbc8d1a7..4bc6fe16bdf 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 19,
   "type": "coordinator-key",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
index 1ecc047f17a..b4f874027a9 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 19,
   "type": "coordinator-value",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
index 3d583ebb66e..04cb21e0895 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 17,
   "type": "coordinator-key",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
index d7bd377daa9..b6bd535d6b2 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 17,
   "type": "coordinator-value",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
index 7563f01fade..a5e6b1b5804 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 21,
   "type": "coordinator-key",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
index c96dd608c7f..a97890ae167 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 21,
   "type": "coordinator-value",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
index 22fb861083a..2ebb02b0233 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 20,
   "type": "coordinator-key",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
index b9de317cbde..86143bd5a83 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 20,
   "type": "coordinator-value",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
index ac2b8d5932a..17197849323 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 23,
   "type": "coordinator-key",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
index 26ac1ff6675..b05bd40607d 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
@@ -13,7 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
 {
   "apiKey": 23,
   "type": "coordinator-value",
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
index cf910c660dc..84425cac401 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
@@ -24,16 +24,12 @@ public enum StreamsVersion implements FeatureVersion {
     SV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
 
     // Version 1 enables "streams" groups (KIP-1071).
-    // Using metadata version IBP_4_2_IV1 disables it by default in AK 4.1 
release, and enables it by default in AK 4.2 release.
-    //  - in AK 4.1, this can be enabled as "early access [unstable]"
-    //  - in AK 4.2, it is planned to go GA (cf `LATEST_PRODUCTION`)
     SV_1(1, MetadataVersion.IBP_4_2_IV1, Map.of());
 
     public static final String FEATURE_NAME = "streams.version";
 
-    // Mark "streams" group as unstable in AK 4.1 release
-    // Needs to be updated to SV_1 in AK 4.2, to mark as stable
-    public static final StreamsVersion LATEST_PRODUCTION = SV_0;
+    // Mark "streams" group as stable in production
+    public static final StreamsVersion LATEST_PRODUCTION = SV_1;
 
     private final short featureLevel;
     private final MetadataVersion bootstrapMetadataVersion;
diff --git 
a/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java 
b/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java
index 6aef5d5af09..54db719c2d3 100644
--- a/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java
+++ b/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java
@@ -53,14 +53,13 @@ public class NodeMetricsTest {
             new MetricName("maximum-supported-level", expectedGroup, "", 
Map.of("feature-name", "eligible-leader-replicas-version")),
             new MetricName("minimum-supported-level", expectedGroup, "", 
Map.of("feature-name", "eligible-leader-replicas-version")),
             new MetricName("maximum-supported-level", expectedGroup, "", 
Map.of("feature-name", "share-version")),
-            new MetricName("minimum-supported-level", expectedGroup, "", 
Map.of("feature-name", "share-version"))
-        );
-
-        Set<MetricName> unstableFeatureMetrics = Set.of(
+            new MetricName("minimum-supported-level", expectedGroup, "", 
Map.of("feature-name", "share-version")),
             new MetricName("maximum-supported-level", expectedGroup, "", 
Map.of("feature-name", "streams-version")),
             new MetricName("minimum-supported-level", expectedGroup, "", 
Map.of("feature-name", "streams-version"))
         );
 
+        Set<MetricName> unstableFeatureMetrics = Set.of();
+
         Set<MetricName> expectedMetrics = enableUnstableVersions
             ? Stream.concat(stableFeatureMetrics.stream(), 
unstableFeatureMetrics.stream()).collect(Collectors.toSet())
             : stableFeatureMetrics;
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0c37d8b2ce4..1e04c1fd418 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1500,8 +1500,6 @@ public class StreamsConfig extends AbstractConfig {
 
     private void verifyStreamsProtocolCompatibility(final boolean doLog) {
         if (doLog && isStreamsProtocolEnabled()) {
-            log.warn("The streams rebalance protocol is still in development 
and should not be used in production. "
-                + "Please set group.protocol=classic (default) in all 
production use cases.");
             final Map<String, Object> mainConsumerConfigs = 
getMainConsumerConfigs("dummy", "dummy", -1);
             final String instanceId = (String) 
mainConsumerConfigs.get(CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG);
             if (instanceId != null && !instanceId.isEmpty()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index ca4048ee9c6..d9fbef48929 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1687,20 +1687,6 @@ public class StreamsConfigTest {
         assertTrue(streamsConfig.isStreamsProtocolEnabled());
     }
 
-    @Test
-    public void shouldLogWarningWhenStreamsProtocolIsUsed() {
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
-            appender.setClassLogger(StreamsConfig.class, Level.WARN);
-            props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
-
-            new StreamsConfig(props);
-
-            assertTrue(appender.getMessages().stream()
-                .anyMatch(msg -> msg.contains("The streams rebalance protocol 
is still in development and should " +
-                    "not be used in production. Please set 
group.protocol=classic (default) in all production use cases.")));
-        }
-    }
-
     @Test
     public void shouldLogWarningWhenWarmupReplicasSetWithStreamsProtocol() {
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {

Reply via email to