This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d1794f477ba791275c1e1342c8d9ca5d6da92746
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Sep 25 16:02:39 2024 +0200

    Resolve conflict from 11/25 trunk rebase - Rebase on AK trunk 2024-09-25
---
 .../consumer/internals/RequestManagers.java        |  2 +-
 .../internals/StreamsAssignmentInterface.java      | 25 ++++++-------
 .../StreamsGroupHeartbeatRequestManager.java       |  7 ++--
 .../StreamsGroupInitializeRequestManager.java      |  2 +-
 .../events/ApplicationEventProcessor.java          |  6 +--
 .../StreamsGroupHeartbeatRequestManagerTest.java   |  3 +-
 .../kafka/api/IntegrationTestHarness.scala         |  4 ++
 .../src/test/scala/kafka/utils/TestInfoUtils.scala |  4 ++
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 20 +++++-----
 .../coordinator/group/GroupCoordinatorService.java |  9 +++--
 .../group/metrics/GroupCoordinatorMetrics.java     |  2 +
 .../streams/CoordinatorStreamsRecordHelpers.java   |  2 +-
 .../coordinator/group/streams/StreamsGroup.java    |  2 +-
 .../group/streams/TargetAssignmentBuilder.java     |  2 +-
 .../CoordinatorStreamsRecordHelpersTest.java       |  2 +-
 .../group/streams/StreamsGroupBuilder.java         |  2 +-
 .../group/streams/TargetAssignmentBuilderTest.java |  2 +-
 .../SmokeTestDriverIntegrationTest.java            | 10 ++---
 .../org/apache/kafka/streams/GroupProtocol.java    | 43 ++++++++++++++++++++++
 .../org/apache/kafka/streams/StreamsConfig.java    | 17 +++++++++
 .../streams/processor/internals/StreamThread.java  |  5 +--
 21 files changed, 121 insertions(+), 50 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 4f3ec1b2398..9128ee7889b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -263,7 +263,7 @@ public class RequestManagers implements Closeable {
                             metadata
                             );
                     } else {
-                        heartbeatRequestManager = new HeartbeatRequestManager(
+                        heartbeatRequestManager = new 
ConsumerHeartbeatRequestManager(
                             logContext,
                             time,
                             config,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
index 567fcb5f776..1e23233f4e6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class StreamsAssignmentInterface {
 
-    private UUID processID;
+    private UUID processId;
 
     private Optional<HostInfo> endpoint;
 
@@ -53,8 +53,13 @@ public class StreamsAssignmentInterface {
 
     private Map<String, String> clientTags;
 
-    public UUID processID() {
-        return processID;
+    public UUID processId() {
+        return processId;
+    }
+
+    public String topologyId() {
+        // ToDo: As long as we do not compute the topology ID, let's use a 
constant one
+        return "topology-id";
     }
 
     public Optional<HostInfo> endpoint() {
@@ -73,19 +78,11 @@ public class StreamsAssignmentInterface {
         return assignmentConfiguration;
     }
 
-    // ToDo: As long as we do not compute the topology ID, let's use a 
constant one
-    public final String topologyId = "topology-id";
-
     // TODO: This needs to be used somewhere
     public Map<TaskId, Long> taskLags() {
         return taskLags;
     }
 
-    public byte[] computeTopologyHash() {
-        // TODO
-        return new byte[0];
-    }
-
     public Map<String, String> clientTags() {
         return clientTags;
     }
@@ -266,14 +263,14 @@ public class StreamsAssignmentInterface {
         }
     }
 
-    public StreamsAssignmentInterface(UUID processID,
+    public StreamsAssignmentInterface(UUID processId,
                                       Optional<HostInfo> endpoint,
                                       String assignor,
                                       Map<String, Subtopology> subtopologyMap,
                                       Map<String, Object> 
assignmentConfiguration,
                                       Map<String, String> clientTags
     ) {
-        this.processID = processID;
+        this.processId = processId;
         this.endpoint = endpoint;
         this.assignor = assignor;
         this.subtopologyMap = subtopologyMap;
@@ -286,7 +283,7 @@ public class StreamsAssignmentInterface {
     @Override
     public String toString() {
         return "StreamsAssignmentMetadata{" +
-            "processID=" + processID +
+            "processID=" + processId +
             ", endpoint='" + endpoint + '\'' +
             ", assignor='" + assignor + '\'' +
             ", subtopologyMap=" + subtopologyMap +
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index e93b0e51c10..f41ed9d0998 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import 
org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData.Endpoint;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
 import org.apache.kafka.common.utils.LogContext;
@@ -264,7 +265,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
             }
         }
 
-        membershipManager.onHeartbeatSuccess(cgData);
+        membershipManager.onHeartbeatSuccess(new 
ConsumerGroupHeartbeatResponse(cgData));
     }
 
     private void setTargetAssignmentForConsumerGroup(final 
StreamsGroupHeartbeatResponseData data, final 
ConsumerGroupHeartbeatResponseData cgData) {
@@ -534,7 +535,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
             data.setGroupId(membershipManager.groupId());
 
             // TopologyId - always sent
-            data.setTopologyId(streamsInterface.topologyId);
+            data.setTopologyId(streamsInterface.topologyId());
 
             // MemberId - always sent, empty until it has been received from 
the coordinator
             data.setMemberId(membershipManager.memberId());
@@ -555,7 +556,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
             // Immutable -- only sent when joining
             if (joining) {
-                data.setProcessId(streamsInterface.processID().toString());
+                data.setProcessId(streamsInterface.processId().toString());
                 data.setActiveTasks(Collections.emptyList());
                 data.setStandbyTasks(Collections.emptyList());
                 data.setWarmupTasks(Collections.emptyList());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
index 0a6a67594c1..64565f347b1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
@@ -68,7 +68,7 @@ public class StreamsGroupInitializeRequestManager implements 
RequestManager {
     private NetworkClientDelegate.UnsentRequest makeRequest() {
         final StreamsGroupInitializeRequestData 
streamsGroupInitializeRequestData = new StreamsGroupInitializeRequestData();
         streamsGroupInitializeRequestData.setGroupId(groupId);
-        
streamsGroupInitializeRequestData.setTopologyId(streamsAssignmentInterface.topologyId);
+        
streamsGroupInitializeRequestData.setTopologyId(streamsAssignmentInterface.topologyId());
         final List<StreamsGroupInitializeRequestData.Subtopology> topology = 
getTopologyFromStreams();
         streamsGroupInitializeRequestData.setTopology(topology);
         final StreamsGroupInitializeRequest.Builder 
streamsGroupInitializeRequestBuilder = new 
StreamsGroupInitializeRequest.Builder(
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 2a84c18e234..52aee8f20e0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -367,8 +367,8 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      *              the group is sent out.
      */
     private void process(final UnsubscribeEvent event) {
-        if (requestManagers.consumerHeartbeatRequestManager.isPresent()) {
-            CompletableFuture<Void> future = 
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().leaveGroup();
+        if (requestManagers.consumerMembershipManager.isPresent()) {
+            CompletableFuture<Void> future = 
requestManagers.consumerMembershipManager.get().leaveGroup();
             future.whenComplete(complete(event.future()));
         } else {
             // If the consumer is not using the group management capabilities, 
we still need to clear all assignments it may have.
@@ -417,7 +417,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
             );
             return;
         }
-        
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().consumerRebalanceListenerCallbackCompleted(event);
+        
requestManagers.consumerMembershipManager.get().consumerRebalanceListenerCallbackCompleted(event);
     }
 
     private void process(@SuppressWarnings("unused") final CommitOnCloseEvent 
event) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index b9fdb4c7722..503576d8fd5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -32,6 +32,7 @@ import 
org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
@@ -290,7 +291,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
         mockResponse(data);
 
         ArgumentCaptor<ConsumerGroupHeartbeatResponseData> captor = 
ArgumentCaptor.forClass(ConsumerGroupHeartbeatResponseData.class);
-        verify(membershipManager, 
times(1)).onHeartbeatSuccess(captor.capture());
+        verify(membershipManager, times(1)).onHeartbeatSuccess(new 
ConsumerGroupHeartbeatResponse(captor.capture()));
         ConsumerGroupHeartbeatResponseData response = captor.getValue();
         assertEquals(Errors.NONE.code(), response.errorCode());
         assertEquals(TEST_MEMBER_ID, response.memberId());
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index b7efed1d495..a66752c30f3 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -74,6 +74,10 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
       
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,share"))
       
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
"true"))
     }
+    if (isStreamsGroupTest()) {
+      
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,streams"))
+      
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
"true"))
+    }
 
     if(isKRaftTest()) {
       cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, 
TestUtils.tempDir().getAbsolutePath))
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala 
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index cd22727839e..c6b05d9254c 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -51,6 +51,10 @@ object TestInfoUtils {
     testInfo.getDisplayName.contains("kraft+kip932")
   }
 
+  def isStreamsGroupTest(testInfo: TestInfo): Boolean = {
+    testInfo.getDisplayName.contains("kraft+kip1071")
+  }
+
   def maybeGroupProtocolSpecified(testInfo: TestInfo): Option[GroupProtocol] = 
{
     if (testInfo.getDisplayName.contains("groupProtocol=classic"))
       Some(GroupProtocol.CLASSIC)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1241287965f..8d62dadd642 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11251,7 +11251,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
       raftSupport = true
     )
-    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val streamsGroupInitializeResponse = new 
StreamsGroupInitializeResponseData()
 
@@ -11277,7 +11277,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
       raftSupport = true
     )
-    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
     val response = 
verifyNoThrottling[StreamsGroupInitializeResponse](requestChannelRequest)
@@ -11300,7 +11300,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
       raftSupport = true
     )
-    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val response = 
verifyNoThrottling[StreamsGroupInitializeResponse](requestChannelRequest)
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
@@ -11324,7 +11324,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
       raftSupport = true
     )
-    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
       .setMemberId("member")
@@ -11351,7 +11351,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
       raftSupport = true
     )
-    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
     val response = 
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
@@ -11374,7 +11374,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
       raftSupport = true
     )
-    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val response = 
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
@@ -11521,7 +11521,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
       raftSupport = true
     )
-    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     future.complete(List(
       new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
@@ -11564,7 +11564,7 @@ class KafkaApisTest extends Logging {
     val expectedResponse = new StreamsGroupDescribeResponseData()
     expectedResponse.groups.add(expectedDescribedGroup)
     kafkaApis = createKafkaApis()
-    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
     val response = 
verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
 
     assertEquals(expectedResponse, response.data)
@@ -11593,7 +11593,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
       raftSupport = true
     )
-    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val response = 
verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.groups.get(0).errorCode)
@@ -11616,7 +11616,7 @@ class KafkaApisTest extends Logging {
       overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
       raftSupport = true
     )
-    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
     val response = 
verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index c6631945add..007e991ad16 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -375,7 +375,8 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             exception,
             (error, message) -> new StreamsGroupInitializeResponseData()
                 .setErrorCode(error.code())
-                .setErrorMessage(message)
+                .setErrorMessage(message),
+            log
         ));
     }
 
@@ -404,7 +405,8 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             exception,
             (error, message) -> new StreamsGroupHeartbeatResponseData()
                 .setErrorCode(error.code())
-                .setErrorMessage(message)
+                .setErrorMessage(message),
+            log
         ));
     }
 
@@ -767,7 +769,8 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                     "streams-group-describe",
                     groupList,
                     exception,
-                    (error, __) -> 
StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupList, error)
+                    (error, __) -> 
StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupList, error),
+                    log
                 ));
 
             futures.add(future);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
index bf595347273..1a543490b53 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
@@ -116,6 +116,7 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
         this(KafkaYammerMetrics.defaultRegistry(), new Metrics());
     }
 
+    @SuppressWarnings("MethodLength")
     public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) {
         this.registry = Objects.requireNonNull(registry);
         this.metrics = Objects.requireNonNull(metrics);
@@ -198,6 +199,7 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
             "The number of share groups in dead state.",
             SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString(),
             SHARE_GROUP_COUNT_STATE_TAG, 
ShareGroup.ShareGroupState.DEAD.toString()
+        );
 
         streamsGroupCountMetricName = metrics.metricName(
             GROUP_COUNT_METRIC_NAME,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
index 679e987f407..48a31fdd4c7 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.coordinator.group.streams;
 
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
-import org.apache.kafka.coordinator.group.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 11f0cc198fd..f9891e62953 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -25,7 +25,7 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.coordinator.group.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
index d4cb0b85275..fcb2e99ff55 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.coordinator.group.streams;
 
-import org.apache.kafka.coordinator.group.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.taskassignor.AssignmentMemberSpec;
 import org.apache.kafka.coordinator.group.taskassignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.taskassignor.GroupSpecImpl;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
index b9298dab3f3..0b291bd098e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.coordinator.group.streams;
 
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
-import org.apache.kafka.coordinator.group.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index 8e39ae949da..e05365dbac8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -20,7 +20,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.kafka.coordinator.group.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 
 public class StreamsGroupBuilder {
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index dfc263c8710..1679336815f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -39,7 +39,7 @@ import java.util.Set;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals;
-import static 
org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
 import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 7de099c4ede..78c23184f4f 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -20,8 +20,8 @@ import java.util.Locale;
 
 import kafka.api.IntegrationTestHarness;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.tests.SmokeTestClient;
@@ -86,7 +86,7 @@ public class SmokeTestDriverIntegrationTest extends 
IntegrationTestHarness {
     }
 
     @Override
-    public boolean isNewGroupCoordinatorEnabled() {
+    public boolean isStreamsGroupTest() {
         return true;
     }
 
@@ -110,7 +110,7 @@ public class SmokeTestDriverIntegrationTest extends 
IntegrationTestHarness {
     public void shouldWorkWithRebalance(
         final boolean stateUpdaterEnabled,
         final boolean processingThreadsEnabled,
-        final boolean consumerProtocolEnabled
+        final boolean streamsProtocolEnabled
     ) throws InterruptedException {
         Exit.setExitProcedure((statusCode, message) -> {
             throw new AssertionError("Test called exit(). code:" + statusCode 
+ " message:" + message);
@@ -166,8 +166,8 @@ public class SmokeTestDriverIntegrationTest extends 
IntegrationTestHarness {
         props.put(InternalConfig.PROCESSING_THREADS_ENABLED, 
processingThreadsEnabled);
         // decrease the session timeout so that we can trigger the rebalance 
soon after old client left closed
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
-        if (consumerProtocolEnabled) {
-            
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_PROTOCOL_CONFIG), 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.getDefault()));
+        if (streamsProtocolEnabled) {
+            props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
         }
 
         // cycle out Streams instances as long as the test is running.
diff --git a/streams/src/main/java/org/apache/kafka/streams/GroupProtocol.java 
b/streams/src/main/java/org/apache/kafka/streams/GroupProtocol.java
new file mode 100644
index 00000000000..146a5e6e9de
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/GroupProtocol.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Locale;
+
+public enum GroupProtocol {
+    /** Classic group protocol.  */
+    CLASSIC("CLASSIC"),
+
+    /** Streams group protocol */
+    STREAMS("STREAMS");
+
+    /**
+     * String representation of the group protocol.
+     */
+    public final String name;
+
+    GroupProtocol(final String name) {
+        this.name = name;
+    }
+
+    /**
+     * Case-insensitive group protocol lookup by string name.
+     */
+    public static GroupProtocol of(final String name) {
+        return GroupProtocol.valueOf(name.toUpperCase(Locale.ROOT));
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 36acc2b2049..6e171e022b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import java.util.Locale;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.admin.Admin;
@@ -588,6 +589,16 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable 
pushing of internal client metrics for (main, restore, and global) consumers, 
producers, and admin clients." +
         " The cluster must have a client metrics subscription which 
corresponds to a client.";
 
+    /**
+     * <code>group.protocol</code>
+     */
+    public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+    public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.CLASSIC.name().toLowerCase(
+        Locale.ROOT);
+    public static final String GROUP_PROTOCOL_DOC = "The group protocol 
streams should use. We currently " +
+        "support \"classic\" or \"streams\". If \"streams\" is specified, then 
the streams rebalance protocol will be " +
+        "used. Otherwise, the classic group protocol will be used.";
+
     /** {@code log.summary.interval.ms} */
     public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = 
"log.summary.interval.ms";
     private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output 
interval in milliseconds for logging summary information.\n" +
@@ -1026,6 +1037,12 @@ public class StreamsConfig extends AbstractConfig {
                         TOPOLOGY_OPTIMIZATION_CONFIGS::toString),
                     Importance.MEDIUM,
                     TOPOLOGY_OPTIMIZATION_DOC)
+            .define(GROUP_PROTOCOL_CONFIG,
+                    Type.STRING,
+                    DEFAULT_GROUP_PROTOCOL,
+                    
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)),
+                    Importance.MEDIUM,
+                    GROUP_PROTOCOL_DOC)
 
             // LOW
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b87ba61b51d..f26f9d55089 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -45,6 +44,7 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
@@ -493,8 +493,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
 
         final Consumer<byte[], byte[]> mainConsumer;
         final StreamsAssignmentInterface streamsAssignmentInterface;
-        if (consumerConfigs.containsKey(ConsumerConfig.GROUP_PROTOCOL_CONFIG) 
&&
-            
consumerConfigs.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toString().equalsIgnoreCase(GroupProtocol.CONSUMER.name))
 {
+        if 
(config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name))
 {
             if (topologyMetadata.hasNamedTopologies()) {
                 throw new IllegalStateException("Named topologies and the 
CONSUMER protocol cannot be used at the same time.");
             }


Reply via email to