This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 497072a5644 KAFKA-19869: Mark streams groups records and APIs stable
(#20841)
497072a5644 is described below
commit 497072a5644d26debc6471e6315177ec694f944a
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Nov 14 13:42:32 2025 +0100
KAFKA-19869: Mark streams groups records and APIs stable (#20841)
With 4.2, we are making records and APIs stable, guaranteeing no
backwards-incompatible changes.
---
.../internals/DescribeStreamsGroupsHandler.java | 2 +-
.../StreamsGroupHeartbeatRequestManager.java | 2 +-
.../requests/StreamsGroupDescribeRequest.java | 6 +---
.../requests/StreamsGroupHeartbeatRequest.java | 6 +---
.../message/StreamsGroupDescribeRequest.json | 4 ---
.../message/StreamsGroupHeartbeatRequest.json | 4 ---
.../scala/unit/kafka/server/KafkaApisTest.scala | 36 +++++++++++-----------
.../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +--
.../StreamsGroupCurrentMemberAssignmentKey.json | 1 -
.../StreamsGroupCurrentMemberAssignmentValue.json | 1 -
.../message/StreamsGroupMemberMetadataKey.json | 1 -
.../message/StreamsGroupMemberMetadataValue.json | 1 -
.../common/message/StreamsGroupMetadataKey.json | 1 -
.../common/message/StreamsGroupMetadataValue.json | 1 -
.../StreamsGroupTargetAssignmentMemberKey.json | 1 -
.../StreamsGroupTargetAssignmentMemberValue.json | 1 -
.../StreamsGroupTargetAssignmentMetadataKey.json | 1 -
.../StreamsGroupTargetAssignmentMetadataValue.json | 1 -
.../common/message/StreamsGroupTopologyKey.json | 1 -
.../common/message/StreamsGroupTopologyValue.json | 1 -
.../apache/kafka/server/common/StreamsVersion.java | 8 ++---
.../kafka/server/metrics/NodeMetricsTest.java | 7 ++---
.../org/apache/kafka/streams/StreamsConfig.java | 2 --
.../apache/kafka/streams/StreamsConfigTest.java | 14 ---------
24 files changed, 29 insertions(+), 78 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
index 9c037d7dd46..490ef89d4e2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
@@ -92,7 +92,7 @@ public class DescribeStreamsGroupsHandler extends
AdminApiHandler.Batched<Coordi
StreamsGroupDescribeRequestData data = new
StreamsGroupDescribeRequestData()
.setGroupIds(groupIds)
.setIncludeAuthorizedOperations(includeAuthorizedOperations);
- return new StreamsGroupDescribeRequest.Builder(data, true);
+ return new StreamsGroupDescribeRequest.Builder(data);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index eee77b7ae9a..c45e3da1b5e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -506,7 +506,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
long currentTimeMs) {
NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
- new
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData(),
true),
+ new
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
coordinatorRequestManager.coordinator()
);
heartbeatRequestState.onSendAttempt(currentTimeMs);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
index bf7f1576cb0..d4ea35bb64c 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
@@ -32,11 +32,7 @@ public class StreamsGroupDescribeRequest extends
AbstractRequest {
private final StreamsGroupDescribeRequestData data;
public Builder(StreamsGroupDescribeRequestData data) {
- this(data, false);
- }
-
- public Builder(StreamsGroupDescribeRequestData data, boolean
enableUnstableLastVersion) {
- super(ApiKeys.STREAMS_GROUP_DESCRIBE, enableUnstableLastVersion);
+ super(ApiKeys.STREAMS_GROUP_DESCRIBE);
this.data = data;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java
index ad6e8414636..26d1592f040 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java
@@ -39,11 +39,7 @@ public class StreamsGroupHeartbeatRequest extends
AbstractRequest {
private final StreamsGroupHeartbeatRequestData data;
public Builder(StreamsGroupHeartbeatRequestData data) {
- this(data, false);
- }
-
- public Builder(StreamsGroupHeartbeatRequestData data, boolean
enableUnstableLastVersion) {
- super(ApiKeys.STREAMS_GROUP_HEARTBEAT, enableUnstableLastVersion);
+ super(ApiKeys.STREAMS_GROUP_HEARTBEAT);
this.data = data;
}
diff --git
a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
index e3dad6fa8ec..6e36479043a 100644
--- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
+++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
@@ -20,10 +20,6 @@
"name": "StreamsGroupDescribeRequest",
"validVersions": "0",
"flexibleVersions": "0+",
- // The StreamsGroupDescribeRequest API is added as part of KIP-1071 and is
still under
- // development. Hence, the API is not exposed by default by brokers unless
- // explicitly enabled.
- "latestVersionUnstable": true,
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType":
"groupId",
"about": "The ids of the groups to describe" },
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index 8b63e037fc0..a2cba46e763 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -20,10 +20,6 @@
"name": "StreamsGroupHeartbeatRequest",
"validVersions": "0",
"flexibleVersions": "0+",
- // The StreamsGroupHeartbeatRequest API is added as part of KIP-1071 and is
still under
- // development. Hence, the API is not exposed by default by brokers unless
- // explicitly enabled.
- "latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
"about": "The group identifier." },
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 284f92b6136..de9daf773bc 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -10828,7 +10828,7 @@ class KafkaApisTest extends Logging {
def testStreamsGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY)
@@ -10858,7 +10858,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
when(groupCoordinator.streamsGroupHeartbeat(
@@ -10907,7 +10907,7 @@ class KafkaApisTest extends Logging {
)
)
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
@@ -10956,7 +10956,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
when(groupCoordinator.streamsGroupHeartbeat(
@@ -10981,7 +10981,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
@@ -11022,7 +11022,7 @@ class KafkaApisTest extends Logging {
)
)
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
@@ -11057,7 +11057,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
kafkaApis = createKafkaApis(
overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,consumer")
@@ -11078,7 +11078,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -11108,7 +11108,7 @@ class KafkaApisTest extends Logging {
)
)
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -11138,7 +11138,7 @@ class KafkaApisTest extends Logging {
)
)
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -11158,7 +11158,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
when(groupCoordinator.streamsGroupHeartbeat(
@@ -11189,7 +11189,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
when(groupCoordinator.streamsGroupHeartbeat(
@@ -11245,7 +11245,7 @@ class KafkaApisTest extends Logging {
when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
when(groupCoordinator.streamsGroupHeartbeat(
@@ -11476,7 +11476,7 @@ class KafkaApisTest extends Logging {
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
- val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData).build())
val future = new
CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
when(groupCoordinator.streamsGroupDescribe(
@@ -11549,7 +11549,7 @@ class KafkaApisTest extends Logging {
val groupId = "group0"
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
streamsGroupDescribeRequestData.groupIds.add(groupId)
- val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData).build())
val errorCode = Errors.UNSUPPORTED_VERSION.code
val expectedDescribedGroup = new
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupId).setErrorCode(errorCode)
@@ -11582,7 +11582,7 @@ class KafkaApisTest extends Logging {
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
streamsGroupDescribeRequestData.groupIds.add("group-id")
- val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
@@ -11613,7 +11613,7 @@ class KafkaApisTest extends Logging {
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
streamsGroupDescribeRequestData.groupIds.add("group-id")
- val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData).build())
val future = new
CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
when(groupCoordinator.streamsGroupDescribe(
@@ -11645,7 +11645,7 @@ class KafkaApisTest extends Logging {
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
- val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData,
true).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index e188889556b..ae892a219f6 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -746,10 +746,10 @@ class RequestQuotaTest extends BaseRequestTest {
new ReadShareGroupStateSummaryRequest.Builder(new
ReadShareGroupStateSummaryRequestData())
case ApiKeys.STREAMS_GROUP_HEARTBEAT =>
- new StreamsGroupHeartbeatRequest.Builder(new
StreamsGroupHeartbeatRequestData(), true)
+ new StreamsGroupHeartbeatRequest.Builder(new
StreamsGroupHeartbeatRequestData())
case ApiKeys.STREAMS_GROUP_DESCRIBE =>
- new StreamsGroupDescribeRequest.Builder(new
StreamsGroupDescribeRequestData(), true)
+ new StreamsGroupDescribeRequest.Builder(new
StreamsGroupDescribeRequestData())
case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS =>
new DescribeShareGroupOffsetsRequest.Builder(new
DescribeShareGroupOffsetsRequestData())
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
index 236d66de03c..077939ceb50 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 22,
"type": "coordinator-key",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
index 6de638a3af7..57c94a9e6f3 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 22,
"type": "coordinator-value",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
index ae1fbc8d1a7..4bc6fe16bdf 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 19,
"type": "coordinator-key",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
index 1ecc047f17a..b4f874027a9 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 19,
"type": "coordinator-value",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
index 3d583ebb66e..04cb21e0895 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 17,
"type": "coordinator-key",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
index d7bd377daa9..b6bd535d6b2 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 17,
"type": "coordinator-value",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
index 7563f01fade..a5e6b1b5804 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 21,
"type": "coordinator-key",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
index c96dd608c7f..a97890ae167 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 21,
"type": "coordinator-value",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
index 22fb861083a..2ebb02b0233 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 20,
"type": "coordinator-key",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
index b9de317cbde..86143bd5a83 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 20,
"type": "coordinator-value",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
index ac2b8d5932a..17197849323 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 23,
"type": "coordinator-key",
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
index 26ac1ff6675..b05bd40607d 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
@@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
{
"apiKey": 23,
"type": "coordinator-value",
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
index cf910c660dc..84425cac401 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
@@ -24,16 +24,12 @@ public enum StreamsVersion implements FeatureVersion {
SV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
// Version 1 enables "streams" groups (KIP-1071).
- // Using metadata version IBP_4_2_IV1 disables it by default in AK 4.1
release, and enables it by default in AK 4.2 release.
- // - in AK 4.1, this can be enabled as "early access [unstable]"
- // - in AK 4.2, it is planned to go GA (cf `LATEST_PRODUCTION`)
SV_1(1, MetadataVersion.IBP_4_2_IV1, Map.of());
public static final String FEATURE_NAME = "streams.version";
- // Mark "streams" group as unstable in AK 4.1 release
- // Needs to be updated to SV_1 in AK 4.2, to mark as stable
- public static final StreamsVersion LATEST_PRODUCTION = SV_0;
+ // Mark "streams" group as stable in production
+ public static final StreamsVersion LATEST_PRODUCTION = SV_1;
private final short featureLevel;
private final MetadataVersion bootstrapMetadataVersion;
diff --git
a/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java
b/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java
index 6aef5d5af09..54db719c2d3 100644
--- a/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java
+++ b/server/src/test/java/org/apache/kafka/server/metrics/NodeMetricsTest.java
@@ -53,14 +53,13 @@ public class NodeMetricsTest {
new MetricName("maximum-supported-level", expectedGroup, "",
Map.of("feature-name", "eligible-leader-replicas-version")),
new MetricName("minimum-supported-level", expectedGroup, "",
Map.of("feature-name", "eligible-leader-replicas-version")),
new MetricName("maximum-supported-level", expectedGroup, "",
Map.of("feature-name", "share-version")),
- new MetricName("minimum-supported-level", expectedGroup, "",
Map.of("feature-name", "share-version"))
- );
-
- Set<MetricName> unstableFeatureMetrics = Set.of(
+ new MetricName("minimum-supported-level", expectedGroup, "",
Map.of("feature-name", "share-version")),
new MetricName("maximum-supported-level", expectedGroup, "",
Map.of("feature-name", "streams-version")),
new MetricName("minimum-supported-level", expectedGroup, "",
Map.of("feature-name", "streams-version"))
);
+ Set<MetricName> unstableFeatureMetrics = Set.of();
+
Set<MetricName> expectedMetrics = enableUnstableVersions
? Stream.concat(stableFeatureMetrics.stream(),
unstableFeatureMetrics.stream()).collect(Collectors.toSet())
: stableFeatureMetrics;
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0c37d8b2ce4..1e04c1fd418 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1500,8 +1500,6 @@ public class StreamsConfig extends AbstractConfig {
private void verifyStreamsProtocolCompatibility(final boolean doLog) {
if (doLog && isStreamsProtocolEnabled()) {
- log.warn("The streams rebalance protocol is still in development
and should not be used in production. "
- + "Please set group.protocol=classic (default) in all
production use cases.");
final Map<String, Object> mainConsumerConfigs =
getMainConsumerConfigs("dummy", "dummy", -1);
final String instanceId = (String)
mainConsumerConfigs.get(CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG);
if (instanceId != null && !instanceId.isEmpty()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index ca4048ee9c6..d9fbef48929 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1687,20 +1687,6 @@ public class StreamsConfigTest {
assertTrue(streamsConfig.isStreamsProtocolEnabled());
}
- @Test
- public void shouldLogWarningWhenStreamsProtocolIsUsed() {
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
- appender.setClassLogger(StreamsConfig.class, Level.WARN);
- props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
-
- new StreamsConfig(props);
-
- assertTrue(appender.getMessages().stream()
- .anyMatch(msg -> msg.contains("The streams rebalance protocol
is still in development and should " +
- "not be used in production. Please set
group.protocol=classic (default) in all production use cases.")));
- }
- }
-
@Test
public void shouldLogWarningWhenWarmupReplicasSetWithStreamsProtocol() {
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {