This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d1794f477ba791275c1e1342c8d9ca5d6da92746 Author: Lucas Brutschy <[email protected]> AuthorDate: Wed Sep 25 16:02:39 2024 +0200 Resolve conflict from 11/25 trunk rebase - Rebase on AK trunk 2024-09-25 --- .../consumer/internals/RequestManagers.java | 2 +- .../internals/StreamsAssignmentInterface.java | 25 ++++++------- .../StreamsGroupHeartbeatRequestManager.java | 7 ++-- .../StreamsGroupInitializeRequestManager.java | 2 +- .../events/ApplicationEventProcessor.java | 6 +-- .../StreamsGroupHeartbeatRequestManagerTest.java | 3 +- .../kafka/api/IntegrationTestHarness.scala | 4 ++ .../src/test/scala/kafka/utils/TestInfoUtils.scala | 4 ++ .../scala/unit/kafka/server/KafkaApisTest.scala | 20 +++++----- .../coordinator/group/GroupCoordinatorService.java | 9 +++-- .../group/metrics/GroupCoordinatorMetrics.java | 2 + .../streams/CoordinatorStreamsRecordHelpers.java | 2 +- .../coordinator/group/streams/StreamsGroup.java | 2 +- .../group/streams/TargetAssignmentBuilder.java | 2 +- .../CoordinatorStreamsRecordHelpersTest.java | 2 +- .../group/streams/StreamsGroupBuilder.java | 2 +- .../group/streams/TargetAssignmentBuilderTest.java | 2 +- .../SmokeTestDriverIntegrationTest.java | 10 ++--- .../org/apache/kafka/streams/GroupProtocol.java | 43 ++++++++++++++++++++++ .../org/apache/kafka/streams/StreamsConfig.java | 17 +++++++++ .../streams/processor/internals/StreamThread.java | 5 +-- 21 files changed, 121 insertions(+), 50 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 4f3ec1b2398..9128ee7889b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -263,7 +263,7 @@ public class RequestManagers implements Closeable { metadata ); } else { - heartbeatRequestManager = new HeartbeatRequestManager( + heartbeatRequestManager = new ConsumerHeartbeatRequestManager( logContext, time, config, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java index 567fcb5f776..1e23233f4e6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; */ public class StreamsAssignmentInterface { - private UUID processID; + private UUID processId; private Optional<HostInfo> endpoint; @@ -53,8 +53,13 @@ public class StreamsAssignmentInterface { private Map<String, String> clientTags; - public UUID processID() { - return processID; + public UUID processId() { + return processId; + } + + public String topologyId() { + // ToDo: As long as we do not compute the topology ID, let's use a constant one + return "topology-id"; } public Optional<HostInfo> endpoint() { @@ -73,19 +78,11 @@ public class StreamsAssignmentInterface { return assignmentConfiguration; } - // ToDo: As long as we do not compute the topology ID, let's use a constant one - public final String topologyId = "topology-id"; - // TODO: This needs to be used somewhere public Map<TaskId, Long> taskLags() { return taskLags; } - public byte[] computeTopologyHash() { - // TODO - return new byte[0]; - } - public Map<String, String> clientTags() { return clientTags; } @@ -266,14 +263,14 @@ public class StreamsAssignmentInterface { } } - public StreamsAssignmentInterface(UUID processID, + public StreamsAssignmentInterface(UUID processId, Optional<HostInfo> endpoint, String assignor, Map<String, Subtopology> subtopologyMap, Map<String, Object> assignmentConfiguration, Map<String, String> clientTags ) { - this.processID = processID; + this.processId = processId; this.endpoint = endpoint; this.assignor = assignor; this.subtopologyMap = subtopologyMap; @@ -286,7 +283,7 @@ public class StreamsAssignmentInterface { @Override public String toString() { return "StreamsAssignmentMetadata{" + - "processID=" + processID + + "processID=" + processId + ", endpoint='" + endpoint + '\'' + ", assignor='" + assignor + '\'' + ", subtopologyMap=" + subtopologyMap + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index e93b0e51c10..f41ed9d0998 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData.Endpoint; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.utils.LogContext; @@ -264,7 +265,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { } } - membershipManager.onHeartbeatSuccess(cgData); + membershipManager.onHeartbeatSuccess(new ConsumerGroupHeartbeatResponse(cgData)); } private void setTargetAssignmentForConsumerGroup(final StreamsGroupHeartbeatResponseData data, final ConsumerGroupHeartbeatResponseData cgData) { @@ -534,7 +535,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { data.setGroupId(membershipManager.groupId()); // TopologyId - always sent - data.setTopologyId(streamsInterface.topologyId); + data.setTopologyId(streamsInterface.topologyId()); // MemberId - always sent, empty until it has been received from the coordinator data.setMemberId(membershipManager.memberId()); @@ -555,7 +556,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { // Immutable -- only sent when joining if (joining) { - data.setProcessId(streamsInterface.processID().toString()); + data.setProcessId(streamsInterface.processId().toString()); data.setActiveTasks(Collections.emptyList()); data.setStandbyTasks(Collections.emptyList()); data.setWarmupTasks(Collections.emptyList()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java index 0a6a67594c1..64565f347b1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java @@ -68,7 +68,7 @@ public class StreamsGroupInitializeRequestManager implements RequestManager { private NetworkClientDelegate.UnsentRequest makeRequest() { final StreamsGroupInitializeRequestData streamsGroupInitializeRequestData = new StreamsGroupInitializeRequestData(); streamsGroupInitializeRequestData.setGroupId(groupId); - streamsGroupInitializeRequestData.setTopologyId(streamsAssignmentInterface.topologyId); + streamsGroupInitializeRequestData.setTopologyId(streamsAssignmentInterface.topologyId()); final List<StreamsGroupInitializeRequestData.Subtopology> topology = getTopologyFromStreams(); streamsGroupInitializeRequestData.setTopology(topology); final StreamsGroupInitializeRequest.Builder streamsGroupInitializeRequestBuilder = new StreamsGroupInitializeRequest.Builder( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 2a84c18e234..52aee8f20e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -367,8 +367,8 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven * the group is sent out. */ private void process(final UnsubscribeEvent event) { - if (requestManagers.consumerHeartbeatRequestManager.isPresent()) { - CompletableFuture<Void> future = requestManagers.consumerHeartbeatRequestManager.get().membershipManager().leaveGroup(); + if (requestManagers.consumerMembershipManager.isPresent()) { + CompletableFuture<Void> future = requestManagers.consumerMembershipManager.get().leaveGroup(); future.whenComplete(complete(event.future())); } else { // If the consumer is not using the group management capabilities, we still need to clear all assignments it may have. @@ -417,7 +417,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven ); return; } - requestManagers.consumerHeartbeatRequestManager.get().membershipManager().consumerRebalanceListenerCallbackCompleted(event); + requestManagers.consumerMembershipManager.get().consumerRebalanceListenerCallbackCompleted(event); } private void process(@SuppressWarnings("unused") final CommitOnCloseEvent event) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index b9fdb4c7722..503576d8fd5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; @@ -290,7 +291,7 @@ class StreamsGroupHeartbeatRequestManagerTest { mockResponse(data); ArgumentCaptor<ConsumerGroupHeartbeatResponseData> captor = ArgumentCaptor.forClass(ConsumerGroupHeartbeatResponseData.class); - verify(membershipManager, times(1)).onHeartbeatSuccess(captor.capture()); + verify(membershipManager, times(1)).onHeartbeatSuccess(new ConsumerGroupHeartbeatResponse(captor.capture())); ConsumerGroupHeartbeatResponseData response = captor.getValue(); assertEquals(Errors.NONE.code(), response.errorCode()); assertEquals(TEST_MEMBER_ID, response.memberId()); diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index b7efed1d495..a66752c30f3 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -74,6 +74,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share")) cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")) } + if (isStreamsGroupTest()) { + cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams")) + cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")) + } if(isKRaftTest()) { cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath)) diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala b/core/src/test/scala/kafka/utils/TestInfoUtils.scala index cd22727839e..c6b05d9254c 100644 --- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala +++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala @@ -51,6 +51,10 @@ object TestInfoUtils { testInfo.getDisplayName.contains("kraft+kip932") } + def isStreamsGroupTest(testInfo: TestInfo): Boolean = { + testInfo.getDisplayName.contains("kraft+kip1071") + } + def maybeGroupProtocolSpecified(testInfo: TestInfo): Option[GroupProtocol] = { if (testInfo.getDisplayName.contains("groupProtocol=classic")) Some(GroupProtocol.CLASSIC) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 1241287965f..8d62dadd642 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -11251,7 +11251,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), raftSupport = true ) - kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val streamsGroupInitializeResponse = new StreamsGroupInitializeResponseData() @@ -11277,7 +11277,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), raftSupport = true ) - kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) val response = verifyNoThrottling[StreamsGroupInitializeResponse](requestChannelRequest) @@ -11300,7 +11300,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), raftSupport = true ) - kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val response = verifyNoThrottling[StreamsGroupInitializeResponse](requestChannelRequest) assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) @@ -11324,7 +11324,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), raftSupport = true ) - kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData() .setMemberId("member") @@ -11351,7 +11351,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), raftSupport = true ) - kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) @@ -11374,7 +11374,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), raftSupport = true ) - kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) @@ -11521,7 +11521,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), raftSupport = true ) - kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) future.complete(List( new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)), @@ -11564,7 +11564,7 @@ class KafkaApisTest extends Logging { val expectedResponse = new StreamsGroupDescribeResponseData() expectedResponse.groups.add(expectedDescribedGroup) kafkaApis = createKafkaApis() - kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest) assertEquals(expectedResponse, response.data) @@ -11593,7 +11593,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), raftSupport = true ) - kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest) assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.groups.get(0).errorCode) @@ -11616,7 +11616,7 @@ class KafkaApisTest extends Logging { overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), raftSupport = true ) - kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index c6631945add..007e991ad16 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -375,7 +375,8 @@ public class GroupCoordinatorService implements GroupCoordinator { exception, (error, message) -> new StreamsGroupInitializeResponseData() .setErrorCode(error.code()) - .setErrorMessage(message) + .setErrorMessage(message), + log )); } @@ -404,7 +405,8 @@ public class GroupCoordinatorService implements GroupCoordinator { exception, (error, message) -> new StreamsGroupHeartbeatResponseData() .setErrorCode(error.code()) - .setErrorMessage(message) + .setErrorMessage(message), + log )); } @@ -767,7 +769,8 @@ public class GroupCoordinatorService implements GroupCoordinator { "streams-group-describe", groupList, exception, - (error, __) -> StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupList, error) + (error, __) -> StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupList, error), + log )); futures.add(future); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java index bf595347273..1a543490b53 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java @@ -116,6 +116,7 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC this(KafkaYammerMetrics.defaultRegistry(), new Metrics()); } + @SuppressWarnings("MethodLength") public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) { this.registry = Objects.requireNonNull(registry); this.metrics = Objects.requireNonNull(metrics); @@ -198,6 +199,7 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC "The number of share groups in dead state.", SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString(), SHARE_GROUP_COUNT_STATE_TAG, ShareGroup.ShareGroupState.DEAD.toString() + ); streamsGroupCountMetricName = metrics.metricName( GROUP_COUNT_METRIC_NAME, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java index 679e987f407..48a31fdd4c7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java @@ -17,7 +17,7 @@ package org.apache.kafka.coordinator.group.streams; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; -import org.apache.kafka.coordinator.group.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 11f0cc198fd..f9891e62953 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.coordinator.group.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.group.Group; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java index d4cb0b85275..fcb2e99ff55 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.coordinator.group.streams; -import org.apache.kafka.coordinator.group.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.group.taskassignor.AssignmentMemberSpec; import org.apache.kafka.coordinator.group.taskassignor.GroupAssignment; import org.apache.kafka.coordinator.group.taskassignor.GroupSpecImpl; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java index b9298dab3f3..0b291bd098e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.coordinator.group.streams; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; -import org.apache.kafka.coordinator.group.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.server.common.ApiMessageAndVersion; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java index 8e39ae949da..e05365dbac8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.kafka.coordinator.group.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; public class StreamsGroupBuilder { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java index dfc263c8710..1679336815f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java @@ -39,7 +39,7 @@ import java.util.Set; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals; -import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord; import static org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec; diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 7de099c4ede..78c23184f4f 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -20,8 +20,8 @@ import java.util.Locale; import kafka.api.IntegrationTestHarness; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.tests.SmokeTestClient; @@ -86,7 +86,7 @@ public class SmokeTestDriverIntegrationTest extends IntegrationTestHarness { } @Override - public boolean isNewGroupCoordinatorEnabled() { + public boolean isStreamsGroupTest() { return true; } @@ -110,7 +110,7 @@ public class SmokeTestDriverIntegrationTest extends IntegrationTestHarness { public void shouldWorkWithRebalance( final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled, - final boolean consumerProtocolEnabled + final boolean streamsProtocolEnabled ) throws InterruptedException { Exit.setExitProcedure((statusCode, message) -> { throw new AssertionError("Test called exit(). code:" + statusCode + " message:" + message); @@ -166,8 +166,8 @@ public class SmokeTestDriverIntegrationTest extends IntegrationTestHarness { props.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); // decrease the session timeout so that we can trigger the rebalance soon after old client left closed props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); - if (consumerProtocolEnabled) { - props.put(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_PROTOCOL_CONFIG), GroupProtocol.CONSUMER.name().toLowerCase(Locale.getDefault())); + if (streamsProtocolEnabled) { + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); } // cycle out Streams instances as long as the test is running. diff --git a/streams/src/main/java/org/apache/kafka/streams/GroupProtocol.java b/streams/src/main/java/org/apache/kafka/streams/GroupProtocol.java new file mode 100644 index 00000000000..146a5e6e9de --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/GroupProtocol.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.util.Locale; + +public enum GroupProtocol { + /** Classic group protocol. */ + CLASSIC("CLASSIC"), + + /** Streams group protocol */ + STREAMS("STREAMS"); + + /** + * String representation of the group protocol. + */ + public final String name; + + GroupProtocol(final String name) { + this.name = name; + } + + /** + * Case-insensitive group protocol lookup by string name. + */ + public static GroupProtocol of(final String name) { + return GroupProtocol.valueOf(name.toUpperCase(Locale.ROOT)); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 36acc2b2049..6e171e022b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams; +import java.util.Locale; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.MetadataRecoveryStrategy; import org.apache.kafka.clients.admin.Admin; @@ -588,6 +589,16 @@ public class StreamsConfig extends AbstractConfig { public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." + " The cluster must have a client metrics subscription which corresponds to a client."; + /** + * <code>group.protocol</code> + */ + public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; + public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.CLASSIC.name().toLowerCase( + Locale.ROOT); + public static final String GROUP_PROTOCOL_DOC = "The group protocol streams should use. We currently " + + "support \"classic\" or \"streams\". If \"streams\" is specified, then the streams rebalance protocol will be " + + "used. Otherwise, the classic group protocol will be used."; + /** {@code log.summary.interval.ms} */ public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms"; private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" + @@ -1026,6 +1037,12 @@ public class StreamsConfig extends AbstractConfig { TOPOLOGY_OPTIMIZATION_CONFIGS::toString), Importance.MEDIUM, TOPOLOGY_OPTIMIZATION_DOC) + .define(GROUP_PROTOCOL_CONFIG, + Type.STRING, + DEFAULT_GROUP_PROTOCOL, + ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)), + Importance.MEDIUM, + GROUP_PROTOCOL_DOC) // LOW diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index b87ba61b51d..f26f9d55089 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -45,6 +44,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; @@ -493,8 +493,7 @@ public class StreamThread extends Thread implements ProcessingThread { final Consumer<byte[], byte[]> mainConsumer; final StreamsAssignmentInterface streamsAssignmentInterface; - if (consumerConfigs.containsKey(ConsumerConfig.GROUP_PROTOCOL_CONFIG) && - consumerConfigs.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toString().equalsIgnoreCase(GroupProtocol.CONSUMER.name)) { + if (config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) { if (topologyMetadata.hasNamedTopologies()) { throw new IllegalStateException("Named topologies and the CONSUMER protocol cannot be used at the same time."); }
