This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 56e50120be4 KAFKA-18621: Add StreamsCoordinatorRecordHelpers (#18669)
56e50120be4 is described below
commit 56e50120be40be40e126cd075caa87f54ca91546
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jan 30 09:28:45 2025 +0100
KAFKA-18621: Add StreamsCoordinatorRecordHelpers (#18669)
A class with helper methods to create records stored in the
__consumer_offsets topic.
Compared to the feature branch, I added unit tests (most functions were not
tested) and adopted the new interface for constructing coordinator records
introduced by David.
Reviewers: Bruno Cadonna <[email protected]>
---
.../streams/StreamsCoordinatorRecordHelpers.java | 473 +++++++++++
.../StreamsCoordinatorRecordHelpersTest.java | 882 +++++++++++++++++++++
2 files changed, 1355 insertions(+)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
new file mode 100644
index 00000000000..c44e5d89713
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+ public static CoordinatorRecord newStreamsGroupMemberRecord(
+ String groupId,
+ StreamsGroupMember member
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+ Objects.requireNonNull(member, "member should not be null here");
+
+ return CoordinatorRecord.record(
+ new StreamsGroupMemberMetadataKey()
+ .setGroupId(groupId)
+ .setMemberId(member.memberId()),
+ new ApiMessageAndVersion(
+ new StreamsGroupMemberMetadataValue()
+ .setRackId(member.rackId().orElse(null))
+ .setInstanceId(member.instanceId().orElse(null))
+ .setClientId(member.clientId())
+ .setClientHost(member.clientHost())
+ .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+ .setTopologyEpoch(member.topologyEpoch())
+ .setProcessId(member.processId())
+ .setUserEndpoint(member.userEndpoint().orElse(null))
+
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+ new StreamsGroupMemberMetadataValue.KeyValue()
+ .setKey(e.getKey())
+ .setValue(e.getValue())
+
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+ (short) 0
+ )
+ );
+ }
+
+ /**
+ * Creates a StreamsGroupMemberMetadata tombstone.
+ *
+ * @param groupId The streams group id.
+ * @param memberId The streams group member id.
+ * @return The record.
+ */
+ public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+ String groupId,
+ String memberId
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+ Objects.requireNonNull(memberId, "memberId should not be null here");
+
+ return CoordinatorRecord.tombstone(
+ new StreamsGroupMemberMetadataKey()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ );
+ }
+
+ /**
+ * Creates a StreamsGroupPartitionMetadata record.
+ *
+ * @param groupId The streams group id.
+ * @param newPartitionMetadata The partition metadata.
+ * @return The record.
+ */
+ public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+ String groupId,
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
newPartitionMetadata
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+ Objects.requireNonNull(newPartitionMetadata, "newPartitionMetadata
should not be null here");
+
+ StreamsGroupPartitionMetadataValue value = new
StreamsGroupPartitionMetadataValue();
+ newPartitionMetadata.forEach((topicName, topicMetadata) -> {
+ List<StreamsGroupPartitionMetadataValue.PartitionMetadata>
partitionMetadata = new ArrayList<>();
+ if (!topicMetadata.partitionRacks().isEmpty()) {
+ topicMetadata.partitionRacks().forEach((partition, racks) ->
+ partitionMetadata.add(new
StreamsGroupPartitionMetadataValue.PartitionMetadata()
+ .setPartition(partition)
+ .setRacks(racks.stream().sorted().toList())
+ )
+ );
+ }
+
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
+ value.topics().add(new
StreamsGroupPartitionMetadataValue.TopicMetadata()
+ .setTopicId(topicMetadata.id())
+ .setTopicName(topicMetadata.name())
+ .setNumPartitions(topicMetadata.numPartitions())
+ .setPartitionMetadata(partitionMetadata)
+ );
+ });
+
+
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
+
+ return CoordinatorRecord.record(
+ new StreamsGroupPartitionMetadataKey()
+ .setGroupId(groupId),
+ new ApiMessageAndVersion(
+ value,
+ (short) 0
+ )
+ );
+ }
+
+ /**
+ * Creates a StreamsGroupPartitionMetadata tombstone.
+ *
+ * @param groupId The streams group id.
+ * @return The record.
+ */
+ public static CoordinatorRecord
newStreamsGroupPartitionMetadataTombstoneRecord(
+ String groupId
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+
+ return CoordinatorRecord.tombstone(
+ new StreamsGroupPartitionMetadataKey()
+ .setGroupId(groupId)
+ );
+ }
+
+ public static CoordinatorRecord newStreamsGroupEpochRecord(
+ String groupId,
+ int newGroupEpoch
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+
+ return CoordinatorRecord.record(
+ new StreamsGroupMetadataKey()
+ .setGroupId(groupId),
+ new ApiMessageAndVersion(
+ new StreamsGroupMetadataValue()
+ .setEpoch(newGroupEpoch),
+ (short) 0
+ )
+ );
+ }
+
+ /**
+ * Creates a StreamsGroupMetadata tombstone.
+ *
+ * @param groupId The streams group id.
+ * @return The record.
+ */
+ public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord(
+ String groupId
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+
+ return CoordinatorRecord.tombstone(
+ new StreamsGroupMetadataKey()
+ .setGroupId(groupId)
+ );
+ }
+
+ public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
+ String groupId,
+ String memberId,
+ TasksTuple assignment
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+ Objects.requireNonNull(memberId, "memberId should not be null here");
+ Objects.requireNonNull(assignment, "assignment should not be null
here");
+
+ List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds =
new ArrayList<>(assignment.activeTasks().size());
+ for (Map.Entry<String, Set<Integer>> entry :
assignment.activeTasks().entrySet()) {
+ activeTaskIds.add(
+ new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(entry.getKey())
+ .setPartitions(entry.getValue().stream().sorted().toList())
+ );
+ }
+
activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+ List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds =
new ArrayList<>(assignment.standbyTasks().size());
+ for (Map.Entry<String, Set<Integer>> entry :
assignment.standbyTasks().entrySet()) {
+ standbyTaskIds.add(
+ new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(entry.getKey())
+ .setPartitions(entry.getValue().stream().sorted().toList())
+ );
+ }
+
standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+ List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds =
new ArrayList<>(assignment.warmupTasks().size());
+ for (Map.Entry<String, Set<Integer>> entry :
assignment.warmupTasks().entrySet()) {
+ warmupTaskIds.add(
+ new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(entry.getKey())
+ .setPartitions(entry.getValue().stream().sorted().toList())
+ );
+ }
+
warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+
+ return CoordinatorRecord.record(
+ new StreamsGroupTargetAssignmentMemberKey()
+ .setGroupId(groupId)
+ .setMemberId(memberId),
+ new ApiMessageAndVersion(
+ new StreamsGroupTargetAssignmentMemberValue()
+ .setActiveTasks(activeTaskIds)
+ .setStandbyTasks(standbyTaskIds)
+ .setWarmupTasks(warmupTaskIds),
+ (short) 0
+ )
+ );
+ }
+
+ /**
+ * Creates a StreamsGroupTargetAssignmentMember tombstone.
+ *
+ * @param groupId The streams group id.
+ * @param memberId The streams group member id.
+ * @return The record.
+ */
+ public static CoordinatorRecord
newStreamsGroupTargetAssignmentTombstoneRecord(
+ String groupId,
+ String memberId
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+ Objects.requireNonNull(memberId, "memberId should not be null here");
+
+ return CoordinatorRecord.tombstone(
+ new StreamsGroupTargetAssignmentMemberKey()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ );
+ }
+
+
+ public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
+ String groupId,
+ int assignmentEpoch
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+
+ return CoordinatorRecord.record(
+ new StreamsGroupTargetAssignmentMetadataKey()
+ .setGroupId(groupId),
+ new ApiMessageAndVersion(
+ new StreamsGroupTargetAssignmentMetadataValue()
+ .setAssignmentEpoch(assignmentEpoch),
+ (short) 0
+ )
+ );
+ }
+
+ /**
+ * Creates a StreamsGroupTargetAssignmentMetadata tombstone.
+ *
+ * @param groupId The streams group id.
+ * @return The record.
+ */
+ public static CoordinatorRecord
newStreamsGroupTargetAssignmentEpochTombstoneRecord(
+ String groupId
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+
+ return CoordinatorRecord.tombstone(
+ new StreamsGroupTargetAssignmentMetadataKey()
+ .setGroupId(groupId)
+ );
+ }
+
+ public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord(
+ String groupId,
+ StreamsGroupMember member
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+ Objects.requireNonNull(member, "member should not be null here");
+
+ return CoordinatorRecord.record(
+ new StreamsGroupCurrentMemberAssignmentKey()
+ .setGroupId(groupId)
+ .setMemberId(member.memberId()),
+ new ApiMessageAndVersion(
+ new StreamsGroupCurrentMemberAssignmentValue()
+ .setMemberEpoch(member.memberEpoch())
+ .setPreviousMemberEpoch(member.previousMemberEpoch())
+ .setState(member.state().value())
+
.setActiveTasks(toTaskIds(member.assignedTasks().activeTasks()))
+
.setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks()))
+
.setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks()))
+
.setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks()))
+
.setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks()))
+
.setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())),
+ (short) 0
+ )
+ );
+ }
+
+ /**
+ * Creates a StreamsGroupCurrentMemberAssignment tombstone.
+ *
+ * @param groupId The streams group id.
+ * @param memberId The streams group member id.
+ * @return The record.
+ */
+ public static CoordinatorRecord
newStreamsGroupCurrentAssignmentTombstoneRecord(
+ String groupId,
+ String memberId
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+ Objects.requireNonNull(memberId, "memberId should not be null here");
+
+ return CoordinatorRecord.tombstone(
+ new StreamsGroupCurrentMemberAssignmentKey()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ );
+ }
+
+ private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds>
toTaskIds(
+ Map<String, Set<Integer>> tasks
+ ) {
+ List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new
ArrayList<>(tasks.size());
+ tasks.forEach((subtopologyId, partitions) ->
+ taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+ .setSubtopologyId(subtopologyId)
+ .setPartitions(partitions.stream().sorted().toList()))
+ );
+
taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId));
+ return taskIds;
+ }
+
+ /**
+ * Creates a StreamsTopology record.
+ *
+ * @param groupId The consumer group id.
+ * @param topology The new topology.
+ * @return The record.
+ */
+ public static CoordinatorRecord newStreamsGroupTopologyRecord(String
groupId,
+
StreamsGroupHeartbeatRequestData.Topology topology) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+ Objects.requireNonNull(topology, "topology should not be null here");
+
+ return newStreamsGroupTopologyRecord(groupId,
convertToStreamsGroupTopologyRecord(topology));
+ }
+
+ /**
+ * Creates a StreamsTopology record.
+ *
+ * @param groupId The consumer group id.
+ * @param value The encoded topology record value.
+ * @return The record.
+ */
+ public static CoordinatorRecord newStreamsGroupTopologyRecord(String
groupId, StreamsGroupTopologyValue value) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+ Objects.requireNonNull(value, "value should not be null here");
+
+ return CoordinatorRecord.record(
+ new StreamsGroupTopologyKey()
+ .setGroupId(groupId),
+ new ApiMessageAndVersion(value, (short) 0)
+ );
+ }
+
+ /**
+ * Encodes subtopologies from the Heartbeat RPC to a StreamsTopology
record value.
+ *
+ * @param topology The new topology
+ * @return The record value.
+ */
+ public static StreamsGroupTopologyValue
convertToStreamsGroupTopologyRecord(StreamsGroupHeartbeatRequestData.Topology
topology) {
+ Objects.requireNonNull(topology, "topology should not be null here");
+
+ StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
+ value.setEpoch(topology.epoch());
+ topology.subtopologies().forEach(subtopology -> {
+ List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics =
+ subtopology.repartitionSourceTopics().stream()
+ .map(StreamsCoordinatorRecordHelpers::convertToTopicInfo)
+ .collect(Collectors.toList());
+
+ List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics =
+ subtopology.stateChangelogTopics().stream()
+ .map(StreamsCoordinatorRecordHelpers::convertToTopicInfo)
+ .collect(Collectors.toList());
+
+ List<StreamsGroupTopologyValue.CopartitionGroup> copartitionGroups
=
+ subtopology.copartitionGroups().stream()
+ .map(copartitionGroup -> new
StreamsGroupTopologyValue.CopartitionGroup()
+ .setSourceTopics(copartitionGroup.sourceTopics())
+
.setSourceTopicRegex(copartitionGroup.sourceTopicRegex())
+
.setRepartitionSourceTopics(copartitionGroup.repartitionSourceTopics())
+ )
+ .collect(Collectors.toList());
+
+ value.subtopologies().add(
+ new StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId(subtopology.subtopologyId())
+ .setSourceTopics(subtopology.sourceTopics())
+ .setSourceTopicRegex(subtopology.sourceTopicRegex())
+
.setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
+ .setRepartitionSourceTopics(repartitionSourceTopics)
+ .setStateChangelogTopics(stateChangelogTopics)
+ .setCopartitionGroups(copartitionGroups)
+ );
+ });
+ return value;
+ }
+
+ private static StreamsGroupTopologyValue.TopicInfo
convertToTopicInfo(StreamsGroupHeartbeatRequestData.TopicInfo topicInfo) {
+ List<StreamsGroupTopologyValue.TopicConfig> topicConfigs =
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
+ .map(config -> new
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
+ .collect(Collectors.toList()) : null;
+ return new StreamsGroupTopologyValue.TopicInfo()
+ .setName(topicInfo.name())
+ .setTopicConfigs(topicConfigs)
+ .setPartitions(topicInfo.partitions())
+ .setReplicationFactor(topicInfo.replicationFactor());
+ }
+
+ /**
+ * Creates a StreamsGroupTopology tombstone.
+ *
+ * @param groupId The streams group id.
+ * @return The record.
+ */
+ public static CoordinatorRecord newStreamsGroupTopologyRecordTombstone(
+ String groupId
+ ) {
+ Objects.requireNonNull(groupId, "groupId should not be null here");
+
+ return CoordinatorRecord.tombstone(
+ new StreamsGroupTopologyKey()
+ .setGroupId(groupId)
+ );
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
new file mode 100644
index 00000000000..489d4fa0254
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue.TaskIds;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+class StreamsCoordinatorRecordHelpersTest {
+
+ public static final String CLIENT_HOST = "client-host";
+ public static final String CLIENT_ID = "client-id";
+ public static final String CONFIG_NAME_1 = "config-name1";
+ public static final String CONFIG_NAME_2 = "config-name2";
+ public static final String CONFIG_VALUE_1 = "config-value1";
+ public static final String CONFIG_VALUE_2 = "config-value2";
+ public static final String GROUP_ID = "group-id";
+ public static final String INSTANCE_ID = "instance-id";
+ public static final String MEMBER_ID = "member-id";
+ public static final String PROCESS_ID = "process-id";
+ public static final String RACK_1 = "rack1";
+ public static final String RACK_2 = "rack2";
+ public static final String RACK_3 = "rack3";
+ public static final String SUBTOPOLOGY_1 = "subtopology1";
+ public static final String SUBTOPOLOGY_2 = "subtopology2";
+ public static final String SUBTOPOLOGY_3 = "subtopology3";
+ public static final String TAG_1 = "tag1";
+ public static final String TAG_2 = "tag2";
+ public static final String TOPIC_1 = "topic1";
+ public static final String TOPIC_2 = "topic2";
+ public static final String TOPIC_BAR = "bar";
+ public static final String TOPIC_CHANGELOG = "changelog";
+ public static final String TOPIC_FOO = "foo";
+ public static final String TOPIC_REGEX = "regex";
+ public static final String TOPIC_REPARTITION = "repartition";
+ public static final String USER_ENDPOINT = "user-endpoint";
+ public static final String VALUE_1 = "value1";
+ public static final String VALUE_2 = "value2";
+ public static final int REBALANCE_TIMEOUT_MS = 1000;
+ public static final int USER_ENDPOINT_PORT = 40;
+
+ @Test
+ public void testNewStreamsGroupMemberRecord() {
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+ .setRackId(RACK_1)
+ .setInstanceId(INSTANCE_ID)
+ .setClientId(CLIENT_ID)
+ .setClientHost(CLIENT_HOST)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+ .setTopologyEpoch(1)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(new
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+ .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+ .build();
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupMemberMetadataKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupMemberMetadataValue()
+ .setRackId(RACK_1)
+ .setInstanceId(INSTANCE_ID)
+ .setClientId(CLIENT_ID)
+ .setClientHost(CLIENT_HOST)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+ .setTopologyEpoch(1)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(new
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+ .setClientTags(List.of(
+ new
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
+ new
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
+ )),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
+ }
+
+ @Test
+ public void testNewStreamsGroupMemberRecordWithNullRackId() {
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+ .setRackId(null)
+ .setInstanceId(INSTANCE_ID)
+ .setClientId(CLIENT_ID)
+ .setClientHost(CLIENT_HOST)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+ .setTopologyEpoch(1)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(new
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+ .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+ .build();
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupMemberMetadataKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupMemberMetadataValue()
+ .setRackId(null)
+ .setInstanceId(INSTANCE_ID)
+ .setClientId(CLIENT_ID)
+ .setClientHost(CLIENT_HOST)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+ .setTopologyEpoch(1)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(new
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+ .setClientTags(List.of(
+ new
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
+ new
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
+ )),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
+ }
+
+ @Test
+ public void testNewStreamsGroupMemberRecordWithNullInstanceId() {
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+ .setRackId(RACK_1)
+ .setInstanceId(null)
+ .setClientId(CLIENT_ID)
+ .setClientHost(CLIENT_HOST)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+ .setTopologyEpoch(1)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(new
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+ .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+ .build();
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupMemberMetadataKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupMemberMetadataValue()
+ .setRackId(RACK_1)
+ .setInstanceId(null)
+ .setClientId(CLIENT_ID)
+ .setClientHost(CLIENT_HOST)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+ .setTopologyEpoch(1)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(new
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+ .setClientTags(List.of(
+ new
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
+ new
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
+ )),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
+ }
+
+ @Test
+ public void testNewStreamsGroupMemberRecordWithNullUserEndpoint() {
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+ .setRackId(RACK_1)
+ .setInstanceId(INSTANCE_ID)
+ .setClientId(CLIENT_ID)
+ .setClientHost(CLIENT_HOST)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+ .setTopologyEpoch(1)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(null)
+ .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+ .build();
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupMemberMetadataKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupMemberMetadataValue()
+ .setRackId(RACK_1)
+ .setInstanceId(INSTANCE_ID)
+ .setClientId(CLIENT_ID)
+ .setClientHost(CLIENT_HOST)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+ .setTopologyEpoch(1)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(null)
+ .setClientTags(List.of(
+ new
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
+ new
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
+ )),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
+ }
+
+ @Test
+ public void testNewStreamsGroupMemberTombstoneRecord() {
+ CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+ new StreamsGroupMemberMetadataKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID)
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(GROUP_ID,
MEMBER_ID));
+ }
+
+ @Test
+ public void testNewStreamsGroupPartitionMetadataRecord() {
+ Uuid uuid1 = Uuid.randomUuid();
+ Uuid uuid2 = Uuid.randomUuid();
+ Map<String, TopicMetadata> newPartitionMetadata = Map.of(
+ TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1, Map.of(0,
Set.of(RACK_1, RACK_2))),
+ TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2, Map.of(1,
Set.of(RACK_3)))
+ );
+
+ StreamsGroupPartitionMetadataValue value = new
StreamsGroupPartitionMetadataValue();
+ value.topics().add(new
StreamsGroupPartitionMetadataValue.TopicMetadata()
+ .setTopicId(uuid1)
+ .setTopicName(TOPIC_1)
+ .setNumPartitions(1)
+ .setPartitionMetadata(List.of(
+ new StreamsGroupPartitionMetadataValue.PartitionMetadata()
+ .setPartition(0)
+ .setRacks(List.of(RACK_1, RACK_2))
+ ))
+ );
+ value.topics().add(new
StreamsGroupPartitionMetadataValue.TopicMetadata()
+ .setTopicId(uuid2)
+ .setTopicName(TOPIC_2)
+ .setNumPartitions(2)
+ .setPartitionMetadata(List.of(
+ new StreamsGroupPartitionMetadataValue.PartitionMetadata()
+ .setPartition(1)
+ .setRacks(List.of(RACK_3))
+ ))
+ );
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupPartitionMetadataKey()
+ .setGroupId(GROUP_ID),
+ new ApiMessageAndVersion(value, (short) 0)
+ );
+
+ assertEquals(expectedRecord,
+
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(GROUP_ID,
newPartitionMetadata));
+ }
+
+ @Test
+ public void testNewStreamsGroupPartitionMetadataTombstoneRecord() {
+ CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+ new StreamsGroupPartitionMetadataKey()
+ .setGroupId(GROUP_ID)
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(GROUP_ID));
+ }
+
+ @Test
+ public void testNewStreamsGroupEpochRecord() {
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupMetadataKey()
+ .setGroupId(GROUP_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupMetadataValue()
+ .setEpoch(42),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42));
+ }
+
+ @Test
+ public void testNewStreamsGroupEpochTombstoneRecord() {
+ CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+ new StreamsGroupMetadataKey()
+ .setGroupId(GROUP_ID)
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(GROUP_ID));
+ }
+
+ @Test
+ public void testNewStreamsGroupTargetAssignmentRecord() {
+ Map<String, Set<Integer>> activeTasks = Map.of(SUBTOPOLOGY_1,
Set.of(1, 2, 3));
+ Map<String, Set<Integer>> standbyTasks = Map.of(SUBTOPOLOGY_2,
Set.of(4, 5, 6));
+ Map<String, Set<Integer>> warmupTasks = Map.of(SUBTOPOLOGY_3,
Set.of(7, 8, 9));
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupTargetAssignmentMemberKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupTargetAssignmentMemberValue()
+ .setActiveTasks(List.of(
+ new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_1)
+ .setPartitions(List.of(1, 2, 3))
+ ))
+ .setStandbyTasks(List.of(
+ new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_2)
+ .setPartitions(List.of(4, 5, 6))
+ ))
+ .setWarmupTasks(List.of(
+ new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_3)
+ .setPartitions(List.of(7, 8, 9))
+ )),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(GROUP_ID,
MEMBER_ID,
+ new TasksTuple(activeTasks, standbyTasks, warmupTasks)));
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void
testNewStreamsGroupTargetAssignmentRecordWithEmptyTaskIds(TaskRole taskRole) {
+ final StreamsGroupTargetAssignmentMemberValue
targetAssignmentMemberValue = new StreamsGroupTargetAssignmentMemberValue();
+ final List<TaskIds> taskIds = List.of(new
TaskIds().setSubtopologyId(SUBTOPOLOGY_1).setPartitions(List.of(1, 2, 3)));
+
+ switch (taskRole) {
+ case ACTIVE:
+ targetAssignmentMemberValue.setActiveTasks(taskIds);
+ break;
+ case STANDBY:
+ targetAssignmentMemberValue.setStandbyTasks(taskIds);
+ break;
+ case WARMUP:
+ targetAssignmentMemberValue.setWarmupTasks(taskIds);
+ break;
+ }
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupTargetAssignmentMemberKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID),
+ new ApiMessageAndVersion(
+ targetAssignmentMemberValue,
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(GROUP_ID,
MEMBER_ID,
+ mkTasksTuple(taskRole, mkTasks(SUBTOPOLOGY_1, 1, 2, 3))));
+ }
+
+ @Test
+ public void testNewStreamsGroupTargetAssignmentTombstoneRecord() {
+ CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+ new StreamsGroupTargetAssignmentMemberKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID)
+ );
+
+ assertEquals(expectedRecord,
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(GROUP_ID,
MEMBER_ID));
+ }
+
+ @Test
+ public void testNewStreamsGroupTargetAssignmentEpochRecord() {
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupTargetAssignmentMetadataKey()
+ .setGroupId(GROUP_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupTargetAssignmentMetadataValue()
+ .setAssignmentEpoch(42),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(GROUP_ID,
42));
+ }
+
+ @Test
+ public void testNewStreamsGroupTargetAssignmentEpochTombstoneRecord() {
+ CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+ new StreamsGroupTargetAssignmentMetadataKey()
+ .setGroupId(GROUP_ID)
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(GROUP_ID));
+ }
+
+ @Test
+ public void testNewStreamsGroupCurrentAssignmentRecord() {
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+ .setRackId(RACK_1)
+ .setInstanceId(INSTANCE_ID)
+ .setClientId(CLIENT_ID)
+ .setClientHost(CLIENT_HOST)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .setTopologyEpoch(1)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(new
Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+ .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+ .setAssignedTasks(new TasksTuple(
+ Map.of(
+ SUBTOPOLOGY_1, Set.of(1, 2, 3)
+ ),
+ Map.of(
+ SUBTOPOLOGY_2, Set.of(4, 5, 6)
+ ),
+ Map.of(
+ SUBTOPOLOGY_3, Set.of(7, 8, 9)
+ )
+ ))
+ .setTasksPendingRevocation(new TasksTuple(
+ Map.of(
+ SUBTOPOLOGY_1, Set.of(1, 2, 3)
+ ),
+ Map.of(
+ SUBTOPOLOGY_2, Set.of(4, 5, 6)
+ ),
+ Map.of(
+ SUBTOPOLOGY_3, Set.of(7, 8, 9)
+ )
+ ))
+ .build();
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupCurrentMemberAssignmentKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupCurrentMemberAssignmentValue()
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE.value())
+ .setActiveTasks(List.of(
+ new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_1)
+ .setPartitions(List.of(1, 2, 3))
+ ))
+ .setStandbyTasks(List.of(
+ new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_2)
+ .setPartitions(List.of(4, 5, 6))
+ ))
+ .setWarmupTasks(List.of(
+ new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_3)
+ .setPartitions(List.of(7, 8, 9))
+ ))
+ .setActiveTasksPendingRevocation(List.of(
+ new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_1)
+ .setPartitions(List.of(1, 2, 3))
+ ))
+ .setStandbyTasksPendingRevocation(List.of(
+ new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_2)
+ .setPartitions(List.of(4, 5, 6))
+ ))
+ .setWarmupTasksPendingRevocation(List.of(
+ new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_3)
+ .setPartitions(List.of(7, 8, 9))
+ )),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(GROUP_ID,
member));
+ }
+
+ @Test
+ public void
testNewStreamsGroupCurrentAssignmentRecordWithEmptyAssignment() {
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+ .setRackId(RACK_1)
+ .setInstanceId(INSTANCE_ID)
+ .setClientId(CLIENT_ID)
+ .setClientHost(CLIENT_HOST)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .setTopologyEpoch(1)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(new
Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+ .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+ .setAssignedTasks(new TasksTuple(Map.of(), Map.of(), Map.of()))
+ .setTasksPendingRevocation(new TasksTuple(Map.of(), Map.of(),
Map.of()))
+ .build();
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupCurrentMemberAssignmentKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupCurrentMemberAssignmentValue()
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE.value())
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setActiveTasksPendingRevocation(List.of())
+ .setStandbyTasksPendingRevocation(List.of())
+ .setWarmupTasksPendingRevocation(List.of()),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(GROUP_ID,
member));
+ }
+
+ @Test
+ public void testNewStreamsGroupCurrentAssignmentTombstoneRecord() {
+ CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+ new StreamsGroupCurrentMemberAssignmentKey()
+ .setGroupId(GROUP_ID)
+ .setMemberId(MEMBER_ID)
+ );
+
+ assertEquals(expectedRecord,
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(GROUP_ID,
MEMBER_ID));
+ }
+
+ @Test
+ public void testNewStreamsGroupTopologyRecord() {
+ StreamsGroupHeartbeatRequestData.Topology topology =
+ new StreamsGroupHeartbeatRequestData.Topology()
+ .setEpoch(42)
+ .setSubtopologies(
+ List.of(new StreamsGroupHeartbeatRequestData.Subtopology()
+ .setSubtopologyId(SUBTOPOLOGY_1)
+ .setRepartitionSinkTopics(List.of(TOPIC_FOO))
+ .setSourceTopics(List.of(TOPIC_BAR))
+ .setSourceTopicRegex(List.of(TOPIC_REGEX))
+ .setRepartitionSourceTopics(
+ List.of(
+ new
StreamsGroupHeartbeatRequestData.TopicInfo()
+ .setName(TOPIC_REPARTITION)
+ .setPartitions(4)
+ .setReplicationFactor((short) 3)
+ .setTopicConfigs(List.of(
+ new
StreamsGroupHeartbeatRequestData.KeyValue()
+ .setKey(CONFIG_NAME_1)
+ .setValue(CONFIG_VALUE_1)
+ ))
+ )
+ )
+ .setStateChangelogTopics(
+ List.of(
+ new
StreamsGroupHeartbeatRequestData.TopicInfo()
+ .setName(TOPIC_CHANGELOG)
+ .setReplicationFactor((short) 2)
+ .setTopicConfigs(List.of(
+ new
StreamsGroupHeartbeatRequestData.KeyValue()
+ .setKey(CONFIG_NAME_2)
+ .setValue(CONFIG_VALUE_2)
+ ))
+ )
+ )
+ .setCopartitionGroups(List.of(
+ new
StreamsGroupHeartbeatRequestData.CopartitionGroup()
+ .setSourceTopics(List.of((short) 0))
+
.setRepartitionSourceTopics(List.of((short) 0)),
+ new
StreamsGroupHeartbeatRequestData.CopartitionGroup()
+ .setSourceTopicRegex(List.of((short) 0))
+ )),
+ new StreamsGroupHeartbeatRequestData.Subtopology()
+ .setSubtopologyId(SUBTOPOLOGY_1)
+ .setRepartitionSinkTopics(List.of())
+ .setSourceTopics(List.of(TOPIC_BAR))
+ .setSourceTopicRegex(List.of())
+ .setRepartitionSourceTopics(List.of())
+ .setStateChangelogTopics(List.of())
+ .setCopartitionGroups(List.of())
+ )
+ );
+
+ StreamsGroupTopologyValue expectedTopology =
+ new StreamsGroupTopologyValue()
+ .setEpoch(42)
+ .setSubtopologies(
+ List.of(new StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId(SUBTOPOLOGY_1)
+ .setRepartitionSinkTopics(List.of(TOPIC_FOO))
+ .setSourceTopics(List.of(TOPIC_BAR))
+ .setSourceTopicRegex(List.of(TOPIC_REGEX))
+ .setRepartitionSourceTopics(
+ List.of(
+ new StreamsGroupTopologyValue.TopicInfo()
+ .setName(TOPIC_REPARTITION)
+ .setPartitions(4)
+ .setReplicationFactor((short) 3)
+ .setTopicConfigs(List.of(
+ new
StreamsGroupTopologyValue.TopicConfig()
+ .setKey(CONFIG_NAME_1)
+ .setValue(CONFIG_VALUE_1)
+ ))
+ )
+ )
+ .setStateChangelogTopics(
+ List.of(
+ new StreamsGroupTopologyValue.TopicInfo()
+ .setName(TOPIC_CHANGELOG)
+ .setReplicationFactor((short) 2)
+ .setTopicConfigs(List.of(
+ new
StreamsGroupTopologyValue.TopicConfig()
+ .setKey(CONFIG_NAME_2)
+ .setValue(CONFIG_VALUE_2)
+ ))
+ )
+ )
+ .setCopartitionGroups(List.of(
+ new
StreamsGroupTopologyValue.CopartitionGroup()
+ .setSourceTopics(List.of((short) 0))
+
.setRepartitionSourceTopics(List.of((short) 0)),
+ new
StreamsGroupTopologyValue.CopartitionGroup()
+ .setSourceTopicRegex(List.of((short) 0))
+ )),
+ new StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId(SUBTOPOLOGY_1)
+ .setRepartitionSinkTopics(List.of())
+ .setSourceTopics(List.of(TOPIC_BAR))
+ .setSourceTopicRegex(List.of())
+ .setRepartitionSourceTopics(List.of())
+ .setStateChangelogTopics(List.of())
+ .setCopartitionGroups(List.of())
+ )
+ );
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupTopologyKey()
+ .setGroupId(GROUP_ID),
+ new ApiMessageAndVersion(
+ expectedTopology,
+ (short) 0));
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(GROUP_ID,
topology));
+ }
+
+ @Test
+ public void testNewStreamsGroupTopologyRecordTombstone() {
+ CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+ new StreamsGroupTopologyKey()
+ .setGroupId(GROUP_ID)
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(GROUP_ID));
+ }
+
+ @Test
+ public void testNewStreamsGroupMemberRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(null,
mock(StreamsGroupMember.class)));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupMemberRecordNullMember() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("groupId", null));
+ assertEquals("member should not be null here", exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupMemberTombstoneRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(null,
"memberId"));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupMemberTombstoneRecordNullMemberId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("groupId",
null));
+ assertEquals("memberId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupPartitionMetadataRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(null,
Map.of()));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void
testNewStreamsGroupPartitionMetadataRecordNullNewPartitionMetadata() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("groupId",
null));
+ assertEquals("newPartitionMetadata should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void
testNewStreamsGroupPartitionMetadataTombstoneRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(null));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupEpochRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+ StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null,
1));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupEpochTombstoneRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(null));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupTargetAssignmentRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(null,
"memberId", mock(TasksTuple.class)));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupTargetAssignmentRecordNullMemberId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("groupId",
null, mock(TasksTuple.class)));
+ assertEquals("memberId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupTargetAssignmentRecordNullAssignment() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("groupId",
"memberId", null));
+ assertEquals("assignment should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void
testNewStreamsGroupTargetAssignmentTombstoneRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(null,
"memberId"));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void
testNewStreamsGroupTargetAssignmentTombstoneRecordNullMemberId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("groupId",
null));
+ assertEquals("memberId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupTargetAssignmentEpochRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(null,
1));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void
testNewStreamsGroupTargetAssignmentEpochTombstoneRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(null));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupCurrentAssignmentRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(null,
mock(StreamsGroupMember.class)));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupCurrentAssignmentRecordNullMember() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("groupId",
null));
+ assertEquals("member should not be null here", exception.getMessage());
+ }
+
+ @Test
+ public void
testNewStreamsGroupCurrentAssignmentTombstoneRecordNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(null,
"memberId"));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void
testNewStreamsGroupCurrentAssignmentTombstoneRecordNullMemberId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("groupId",
null));
+ assertEquals("memberId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupTopologyRecordWithValueNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(null,
mock(StreamsGroupTopologyValue.class)));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupTopologyRecordWithTopologyNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(null,
mock(StreamsGroupHeartbeatRequestData.Topology.class)));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupTopologyRecordNullTopology() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("groupId",
(StreamsGroupHeartbeatRequestData.Topology) null));
+ assertEquals("topology should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupTopologyRecordNullValue() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("groupId",
(StreamsGroupTopologyValue) null));
+ assertEquals("value should not be null here", exception.getMessage());
+ }
+
+ @Test
+ public void testNewStreamsGroupTopologyRecordTombstoneNullGroupId() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(null));
+ assertEquals("groupId should not be null here",
exception.getMessage());
+ }
+
+ @Test
+ public void testConvertToStreamsGroupTopologyRecordNullTopology() {
+ NullPointerException exception =
assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(null));
+ assertEquals("topology should not be null here",
exception.getMessage());
+ }
+}
\ No newline at end of file