This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56e50120be4 KAFKA-18621: Add StreamsCoordinatorRecordHelpers (#18669)
56e50120be4 is described below

commit 56e50120be40be40e126cd075caa87f54ca91546
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jan 30 09:28:45 2025 +0100

    KAFKA-18621: Add StreamsCoordinatorRecordHelpers (#18669)
    
    A class with helper methods to create records stored in the 
__consumer_offsets topic.
    
    Compared to the feature branch, I added unit tests (most functions were not 
tested) and adopted the new interface for constructing coordinator records 
introduced by David.
    
    Reviewers: Bruno Cadonna <[email protected]>
---
 .../streams/StreamsCoordinatorRecordHelpers.java   | 473 +++++++++++
 .../StreamsCoordinatorRecordHelpersTest.java       | 882 +++++++++++++++++++++
 2 files changed, 1355 insertions(+)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
new file mode 100644
index 00000000000..c44e5d89713
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class contains helper methods to create records stored in the 
__consumer_offsets topic.
+ */
+public class StreamsCoordinatorRecordHelpers {
+
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+        Objects.requireNonNull(member, "member should not be null here");
+
+        return CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId().orElse(null))
+                    .setInstanceId(member.instanceId().orElse(null))
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyEpoch(member.topologyEpoch())
+                    .setProcessId(member.processId())
+                    .setUserEndpoint(member.userEndpoint().orElse(null))
+                    
.setClientTags(member.clientTags().entrySet().stream().map(e ->
+                        new StreamsGroupMemberMetadataValue.KeyValue()
+                            .setKey(e.getKey())
+                            .setValue(e.getValue())
+                    
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMemberMetadata tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+        Objects.requireNonNull(memberId, "memberId should not be null here");
+
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata record.
+     *
+     * @param groupId              The streams group id.
+     * @param newPartitionMetadata The partition metadata.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
+        String groupId,
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+        Objects.requireNonNull(newPartitionMetadata, "newPartitionMetadata 
should not be null here");
+
+        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
+        newPartitionMetadata.forEach((topicName, topicMetadata) -> {
+            List<StreamsGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            if (!topicMetadata.partitionRacks().isEmpty()) {
+                topicMetadata.partitionRacks().forEach((partition, racks) ->
+                    partitionMetadata.add(new 
StreamsGroupPartitionMetadataValue.PartitionMetadata()
+                        .setPartition(partition)
+                        .setRacks(racks.stream().sorted().toList())
+                    )
+                );
+            }
+            
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
+            value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
+                .setTopicId(topicMetadata.id())
+                .setTopicName(topicMetadata.name())
+                .setNumPartitions(topicMetadata.numPartitions())
+                .setPartitionMetadata(partitionMetadata)
+            );
+        });
+
+        
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                value,
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupPartitionMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupPartitionMetadataTombstoneRecord(
+        String groupId
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupEpochRecord(
+        String groupId,
+        int newGroupEpoch
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+
+        return CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(newGroupEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord(
+        String groupId
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        TasksTuple assignment
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+        Objects.requireNonNull(memberId, "memberId should not be null here");
+        Objects.requireNonNull(assignment, "assignment should not be null 
here");
+
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = 
new ArrayList<>(assignment.activeTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.activeTasks().entrySet()) {
+            activeTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = 
new ArrayList<>(assignment.standbyTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.standbyTasks().entrySet()) {
+            standbyTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = 
new ArrayList<>(assignment.warmupTasks().size());
+        for (Map.Entry<String, Set<Integer>> entry : 
assignment.warmupTasks().entrySet()) {
+            warmupTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopologyId(entry.getKey())
+                    .setPartitions(entry.getValue().stream().sorted().toList())
+            );
+        }
+        
warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
+
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMemberValue()
+                    .setActiveTasks(activeTaskIds)
+                    .setStandbyTasks(standbyTaskIds)
+                    .setWarmupTasks(warmupTaskIds),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMember tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+        Objects.requireNonNull(memberId, "memberId should not be null here");
+
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
+        String groupId,
+        int assignmentEpoch
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+
+        return CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMetadataValue()
+                    .setAssignmentEpoch(assignmentEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupTargetAssignmentMetadata tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentEpochTombstoneRecord(
+        String groupId
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
+    public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+        Objects.requireNonNull(member, "member should not be null here");
+
+        return CoordinatorRecord.record(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(groupId)
+                .setMemberId(member.memberId()),
+            new ApiMessageAndVersion(
+                new StreamsGroupCurrentMemberAssignmentValue()
+                    .setMemberEpoch(member.memberEpoch())
+                    .setPreviousMemberEpoch(member.previousMemberEpoch())
+                    .setState(member.state().value())
+                    
.setActiveTasks(toTaskIds(member.assignedTasks().activeTasks()))
+                    
.setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks()))
+                    
.setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks()))
+                    
.setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks()))
+                    
.setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks()))
+                    
.setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())),
+                (short) 0
+            )
+        );
+    }
+
+    /**
+     * Creates a StreamsGroupCurrentMemberAssignment tombstone.
+     *
+     * @param groupId  The streams group id.
+     * @param memberId The streams group member id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupCurrentAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+        Objects.requireNonNull(memberId, "memberId should not be null here");
+
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+        );
+    }
+
+    private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> 
toTaskIds(
+        Map<String, Set<Integer>> tasks
+    ) {
+        List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new 
ArrayList<>(tasks.size());
+        tasks.forEach((subtopologyId, partitions) ->
+            taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopologyId(subtopologyId)
+                .setPartitions(partitions.stream().sorted().toList()))
+        );
+        
taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId));
+        return taskIds;
+    }
+
+    /**
+     * Creates a StreamsTopology record.
+     *
+     * @param groupId  The consumer group id.
+     * @param topology The new topology.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId,
+                                                                  
StreamsGroupHeartbeatRequestData.Topology topology) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+        Objects.requireNonNull(topology, "topology should not be null here");
+
+        return newStreamsGroupTopologyRecord(groupId, 
convertToStreamsGroupTopologyRecord(topology));
+    }
+
+    /**
+     * Creates a StreamsTopology record.
+     *
+     * @param groupId The consumer group id.
+     * @param value   The encoded topology record value.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId, StreamsGroupTopologyValue value) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+        Objects.requireNonNull(value, "value should not be null here");
+
+        return CoordinatorRecord.record(
+            new StreamsGroupTopologyKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(value, (short) 0)
+        );
+    }
+
+    /**
+     * Encodes subtopologies from the Heartbeat RPC to a StreamsTopology 
record value.
+     *
+     * @param topology The new topology
+     * @return The record value.
+     */
+    public static StreamsGroupTopologyValue 
convertToStreamsGroupTopologyRecord(StreamsGroupHeartbeatRequestData.Topology 
topology) {
+        Objects.requireNonNull(topology, "topology should not be null here");
+
+        StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
+        value.setEpoch(topology.epoch());
+        topology.subtopologies().forEach(subtopology -> {
+            List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics =
+                subtopology.repartitionSourceTopics().stream()
+                    .map(StreamsCoordinatorRecordHelpers::convertToTopicInfo)
+                    .collect(Collectors.toList());
+
+            List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics =
+                subtopology.stateChangelogTopics().stream()
+                    .map(StreamsCoordinatorRecordHelpers::convertToTopicInfo)
+                    .collect(Collectors.toList());
+
+            List<StreamsGroupTopologyValue.CopartitionGroup> copartitionGroups 
=
+                subtopology.copartitionGroups().stream()
+                    .map(copartitionGroup -> new 
StreamsGroupTopologyValue.CopartitionGroup()
+                        .setSourceTopics(copartitionGroup.sourceTopics())
+                        
.setSourceTopicRegex(copartitionGroup.sourceTopicRegex())
+                        
.setRepartitionSourceTopics(copartitionGroup.repartitionSourceTopics())
+                    )
+                    .collect(Collectors.toList());
+
+            value.subtopologies().add(
+                new StreamsGroupTopologyValue.Subtopology()
+                    .setSubtopologyId(subtopology.subtopologyId())
+                    .setSourceTopics(subtopology.sourceTopics())
+                    .setSourceTopicRegex(subtopology.sourceTopicRegex())
+                    
.setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
+                    .setRepartitionSourceTopics(repartitionSourceTopics)
+                    .setStateChangelogTopics(stateChangelogTopics)
+                    .setCopartitionGroups(copartitionGroups)
+            );
+        });
+        return value;
+    }
+
+    private static StreamsGroupTopologyValue.TopicInfo 
convertToTopicInfo(StreamsGroupHeartbeatRequestData.TopicInfo topicInfo) {
+        List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
+            .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
+            .collect(Collectors.toList()) : null;
+        return new StreamsGroupTopologyValue.TopicInfo()
+            .setName(topicInfo.name())
+            .setTopicConfigs(topicConfigs)
+            .setPartitions(topicInfo.partitions())
+            .setReplicationFactor(topicInfo.replicationFactor());
+    }
+
+    /**
+     * Creates a StreamsGroupTopology tombstone.
+     *
+     * @param groupId The streams group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecordTombstone(
+        String groupId
+    ) {
+        Objects.requireNonNull(groupId, "groupId should not be null here");
+
+        return CoordinatorRecord.tombstone(
+            new StreamsGroupTopologyKey()
+                .setGroupId(groupId)
+        );
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
new file mode 100644
index 00000000000..489d4fa0254
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue.TaskIds;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+class StreamsCoordinatorRecordHelpersTest {
+
+    public static final String CLIENT_HOST = "client-host";
+    public static final String CLIENT_ID = "client-id";
+    public static final String CONFIG_NAME_1 = "config-name1";
+    public static final String CONFIG_NAME_2 = "config-name2";
+    public static final String CONFIG_VALUE_1 = "config-value1";
+    public static final String CONFIG_VALUE_2 = "config-value2";
+    public static final String GROUP_ID = "group-id";
+    public static final String INSTANCE_ID = "instance-id";
+    public static final String MEMBER_ID = "member-id";
+    public static final String PROCESS_ID = "process-id";
+    public static final String RACK_1 = "rack1";
+    public static final String RACK_2 = "rack2";
+    public static final String RACK_3 = "rack3";
+    public static final String SUBTOPOLOGY_1 = "subtopology1";
+    public static final String SUBTOPOLOGY_2 = "subtopology2";
+    public static final String SUBTOPOLOGY_3 = "subtopology3";
+    public static final String TAG_1 = "tag1";
+    public static final String TAG_2 = "tag2";
+    public static final String TOPIC_1 = "topic1";
+    public static final String TOPIC_2 = "topic2";
+    public static final String TOPIC_BAR = "bar";
+    public static final String TOPIC_CHANGELOG = "changelog";
+    public static final String TOPIC_FOO = "foo";
+    public static final String TOPIC_REGEX = "regex";
+    public static final String TOPIC_REPARTITION = "repartition";
+    public static final String USER_ENDPOINT = "user-endpoint";
+    public static final String VALUE_1 = "value1";
+    public static final String VALUE_2 = "value2";
+    public static final int REBALANCE_TIMEOUT_MS = 1000;
+    public static final int USER_ENDPOINT_PORT = 40;
+
+    @Test
+    public void testNewStreamsGroupMemberRecord() {
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+            .setRackId(RACK_1)
+            .setInstanceId(INSTANCE_ID)
+            .setClientId(CLIENT_ID)
+            .setClientHost(CLIENT_HOST)
+            .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+            .setTopologyEpoch(1)
+            .setProcessId(PROCESS_ID)
+            .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+            .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+            .build();
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(RACK_1)
+                    .setInstanceId(INSTANCE_ID)
+                    .setClientId(CLIENT_ID)
+                    .setClientHost(CLIENT_HOST)
+                    .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+                    .setTopologyEpoch(1)
+                    .setProcessId(PROCESS_ID)
+                    .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+                    .setClientTags(List.of(
+                        new 
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
+                        new 
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
+                    )),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
+    }
+
+    @Test
+    public void testNewStreamsGroupMemberRecordWithNullRackId() {
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+            .setRackId(null)
+            .setInstanceId(INSTANCE_ID)
+            .setClientId(CLIENT_ID)
+            .setClientHost(CLIENT_HOST)
+            .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+            .setTopologyEpoch(1)
+            .setProcessId(PROCESS_ID)
+            .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+            .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+            .build();
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(null)
+                    .setInstanceId(INSTANCE_ID)
+                    .setClientId(CLIENT_ID)
+                    .setClientHost(CLIENT_HOST)
+                    .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+                    .setTopologyEpoch(1)
+                    .setProcessId(PROCESS_ID)
+                    .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+                    .setClientTags(List.of(
+                        new 
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
+                        new 
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
+                    )),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
+    }
+
+    @Test
+    public void testNewStreamsGroupMemberRecordWithNullInstanceId() {
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+            .setRackId(RACK_1)
+            .setInstanceId(null)
+            .setClientId(CLIENT_ID)
+            .setClientHost(CLIENT_HOST)
+            .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+            .setTopologyEpoch(1)
+            .setProcessId(PROCESS_ID)
+            .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+            .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+            .build();
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(RACK_1)
+                    .setInstanceId(null)
+                    .setClientId(CLIENT_ID)
+                    .setClientHost(CLIENT_HOST)
+                    .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+                    .setTopologyEpoch(1)
+                    .setProcessId(PROCESS_ID)
+                    .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+                    .setClientTags(List.of(
+                        new 
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
+                        new 
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
+                    )),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
+    }
+
+    @Test
+    public void testNewStreamsGroupMemberRecordWithNullUserEndpoint() {
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+            .setRackId(RACK_1)
+            .setInstanceId(INSTANCE_ID)
+            .setClientId(CLIENT_ID)
+            .setClientHost(CLIENT_HOST)
+            .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+            .setTopologyEpoch(1)
+            .setProcessId(PROCESS_ID)
+            .setUserEndpoint(null)
+            .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+            .build();
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(RACK_1)
+                    .setInstanceId(INSTANCE_ID)
+                    .setClientId(CLIENT_ID)
+                    .setClientHost(CLIENT_HOST)
+                    .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+                    .setTopologyEpoch(1)
+                    .setProcessId(PROCESS_ID)
+                    .setUserEndpoint(null)
+                    .setClientTags(List.of(
+                        new 
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
+                        new 
StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
+                    )),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
+    }
+
+    @Test
+    public void testNewStreamsGroupMemberTombstoneRecord() {
+        CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+            new StreamsGroupMemberMetadataKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID)
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(GROUP_ID, 
MEMBER_ID));
+    }
+
+    @Test
+    public void testNewStreamsGroupPartitionMetadataRecord() {
+        Uuid uuid1 = Uuid.randomUuid();
+        Uuid uuid2 = Uuid.randomUuid();
+        Map<String, TopicMetadata> newPartitionMetadata = Map.of(
+            TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1, Map.of(0, 
Set.of(RACK_1, RACK_2))),
+            TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2, Map.of(1, 
Set.of(RACK_3)))
+        );
+
+        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
+        value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
+            .setTopicId(uuid1)
+            .setTopicName(TOPIC_1)
+            .setNumPartitions(1)
+            .setPartitionMetadata(List.of(
+                new StreamsGroupPartitionMetadataValue.PartitionMetadata()
+                    .setPartition(0)
+                    .setRacks(List.of(RACK_1, RACK_2))
+            ))
+        );
+        value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
+            .setTopicId(uuid2)
+            .setTopicName(TOPIC_2)
+            .setNumPartitions(2)
+            .setPartitionMetadata(List.of(
+                new StreamsGroupPartitionMetadataValue.PartitionMetadata()
+                    .setPartition(1)
+                    .setRacks(List.of(RACK_3))
+            ))
+        );
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(GROUP_ID),
+            new ApiMessageAndVersion(value, (short) 0)
+        );
+
+        assertEquals(expectedRecord,
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(GROUP_ID,
 newPartitionMetadata));
+    }
+
+    @Test
+    public void testNewStreamsGroupPartitionMetadataTombstoneRecord() {
+        CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+            new StreamsGroupPartitionMetadataKey()
+                .setGroupId(GROUP_ID)
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(GROUP_ID));
+    }
+
+    @Test
+    public void testNewStreamsGroupEpochRecord() {
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(GROUP_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(42),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42));
+    }
+
+    @Test
+    public void testNewStreamsGroupEpochTombstoneRecord() {
+        CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+            new StreamsGroupMetadataKey()
+                .setGroupId(GROUP_ID)
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(GROUP_ID));
+    }
+
+    @Test
+    public void testNewStreamsGroupTargetAssignmentRecord() {
+        Map<String, Set<Integer>> activeTasks = Map.of(SUBTOPOLOGY_1, 
Set.of(1, 2, 3));
+        Map<String, Set<Integer>> standbyTasks = Map.of(SUBTOPOLOGY_2, 
Set.of(4, 5, 6));
+        Map<String, Set<Integer>> warmupTasks = Map.of(SUBTOPOLOGY_3, 
Set.of(7, 8, 9));
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMemberValue()
+                    .setActiveTasks(List.of(
+                        new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY_1)
+                            .setPartitions(List.of(1, 2, 3))
+                    ))
+                    .setStandbyTasks(List.of(
+                        new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY_2)
+                            .setPartitions(List.of(4, 5, 6))
+                    ))
+                    .setWarmupTasks(List.of(
+                        new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY_3)
+                            .setPartitions(List.of(7, 8, 9))
+                    )),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord,
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(GROUP_ID, 
MEMBER_ID,
+                new TasksTuple(activeTasks, standbyTasks, warmupTasks)));
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void 
testNewStreamsGroupTargetAssignmentRecordWithEmptyTaskIds(TaskRole taskRole) {
+        final StreamsGroupTargetAssignmentMemberValue 
targetAssignmentMemberValue = new StreamsGroupTargetAssignmentMemberValue();
+        final List<TaskIds> taskIds = List.of(new 
TaskIds().setSubtopologyId(SUBTOPOLOGY_1).setPartitions(List.of(1, 2, 3)));
+
+        switch (taskRole) {
+            case ACTIVE:
+                targetAssignmentMemberValue.setActiveTasks(taskIds);
+                break;
+            case STANDBY:
+                targetAssignmentMemberValue.setStandbyTasks(taskIds);
+                break;
+            case WARMUP:
+                targetAssignmentMemberValue.setWarmupTasks(taskIds);
+                break;
+        }
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID),
+            new ApiMessageAndVersion(
+                targetAssignmentMemberValue,
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord,
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(GROUP_ID, 
MEMBER_ID,
+                mkTasksTuple(taskRole, mkTasks(SUBTOPOLOGY_1, 1, 2, 3))));
+    }
+
+    @Test
+    public void testNewStreamsGroupTargetAssignmentTombstoneRecord() {
+        CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMemberKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID)
+        );
+
+        assertEquals(expectedRecord,
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(GROUP_ID,
 MEMBER_ID));
+    }
+
+    @Test
+    public void testNewStreamsGroupTargetAssignmentEpochRecord() {
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(GROUP_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMetadataValue()
+                    .setAssignmentEpoch(42),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(GROUP_ID,
 42));
+    }
+
+    @Test
+    public void testNewStreamsGroupTargetAssignmentEpochTombstoneRecord() {
+        CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+            new StreamsGroupTargetAssignmentMetadataKey()
+                .setGroupId(GROUP_ID)
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(GROUP_ID));
+    }
+
+    @Test
+    public void testNewStreamsGroupCurrentAssignmentRecord() {
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+            .setRackId(RACK_1)
+            .setInstanceId(INSTANCE_ID)
+            .setClientId(CLIENT_ID)
+            .setClientHost(CLIENT_HOST)
+            .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setState(MemberState.STABLE)
+            .setTopologyEpoch(1)
+            .setProcessId(PROCESS_ID)
+            .setUserEndpoint(new 
Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+            .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+            .setAssignedTasks(new TasksTuple(
+                Map.of(
+                    SUBTOPOLOGY_1, Set.of(1, 2, 3)
+                ),
+                Map.of(
+                    SUBTOPOLOGY_2, Set.of(4, 5, 6)
+                ),
+                Map.of(
+                    SUBTOPOLOGY_3, Set.of(7, 8, 9)
+                )
+            ))
+            .setTasksPendingRevocation(new TasksTuple(
+                Map.of(
+                    SUBTOPOLOGY_1, Set.of(1, 2, 3)
+                ),
+                Map.of(
+                    SUBTOPOLOGY_2, Set.of(4, 5, 6)
+                ),
+                Map.of(
+                    SUBTOPOLOGY_3, Set.of(7, 8, 9)
+                )
+            ))
+            .build();
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupCurrentMemberAssignmentValue()
+                    .setMemberEpoch(1)
+                    .setPreviousMemberEpoch(0)
+                    .setState(MemberState.STABLE.value())
+                    .setActiveTasks(List.of(
+                        new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY_1)
+                            .setPartitions(List.of(1, 2, 3))
+                    ))
+                    .setStandbyTasks(List.of(
+                        new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY_2)
+                            .setPartitions(List.of(4, 5, 6))
+                    ))
+                    .setWarmupTasks(List.of(
+                        new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY_3)
+                            .setPartitions(List.of(7, 8, 9))
+                    ))
+                    .setActiveTasksPendingRevocation(List.of(
+                        new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY_1)
+                            .setPartitions(List.of(1, 2, 3))
+                    ))
+                    .setStandbyTasksPendingRevocation(List.of(
+                        new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY_2)
+                            .setPartitions(List.of(4, 5, 6))
+                    ))
+                    .setWarmupTasksPendingRevocation(List.of(
+                        new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY_3)
+                            .setPartitions(List.of(7, 8, 9))
+                    )),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(GROUP_ID,
 member));
+    }
+
+    @Test
+    public void 
testNewStreamsGroupCurrentAssignmentRecordWithEmptyAssignment() {
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+            .setRackId(RACK_1)
+            .setInstanceId(INSTANCE_ID)
+            .setClientId(CLIENT_ID)
+            .setClientHost(CLIENT_HOST)
+            .setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setState(MemberState.STABLE)
+            .setTopologyEpoch(1)
+            .setProcessId(PROCESS_ID)
+            .setUserEndpoint(new 
Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
+            .setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
+            .setAssignedTasks(new TasksTuple(Map.of(), Map.of(), Map.of()))
+            .setTasksPendingRevocation(new TasksTuple(Map.of(), Map.of(), 
Map.of()))
+            .build();
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupCurrentMemberAssignmentValue()
+                    .setMemberEpoch(1)
+                    .setPreviousMemberEpoch(0)
+                    .setState(MemberState.STABLE.value())
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of())
+                    .setActiveTasksPendingRevocation(List.of())
+                    .setStandbyTasksPendingRevocation(List.of())
+                    .setWarmupTasksPendingRevocation(List.of()),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(GROUP_ID,
 member));
+    }
+
+    @Test
+    public void testNewStreamsGroupCurrentAssignmentTombstoneRecord() {
+        CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+            new StreamsGroupCurrentMemberAssignmentKey()
+                .setGroupId(GROUP_ID)
+                .setMemberId(MEMBER_ID)
+        );
+
+        assertEquals(expectedRecord,
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(GROUP_ID,
 MEMBER_ID));
+    }
+
+    @Test
+    public void testNewStreamsGroupTopologyRecord() {
+        StreamsGroupHeartbeatRequestData.Topology topology =
+            new StreamsGroupHeartbeatRequestData.Topology()
+                .setEpoch(42)
+                .setSubtopologies(
+                    List.of(new StreamsGroupHeartbeatRequestData.Subtopology()
+                            .setSubtopologyId(SUBTOPOLOGY_1)
+                            .setRepartitionSinkTopics(List.of(TOPIC_FOO))
+                            .setSourceTopics(List.of(TOPIC_BAR))
+                            .setSourceTopicRegex(List.of(TOPIC_REGEX))
+                            .setRepartitionSourceTopics(
+                                List.of(
+                                    new 
StreamsGroupHeartbeatRequestData.TopicInfo()
+                                        .setName(TOPIC_REPARTITION)
+                                        .setPartitions(4)
+                                        .setReplicationFactor((short) 3)
+                                        .setTopicConfigs(List.of(
+                                            new 
StreamsGroupHeartbeatRequestData.KeyValue()
+                                                .setKey(CONFIG_NAME_1)
+                                                .setValue(CONFIG_VALUE_1)
+                                        ))
+                                )
+                            )
+                            .setStateChangelogTopics(
+                                List.of(
+                                    new 
StreamsGroupHeartbeatRequestData.TopicInfo()
+                                        .setName(TOPIC_CHANGELOG)
+                                        .setReplicationFactor((short) 2)
+                                        .setTopicConfigs(List.of(
+                                            new 
StreamsGroupHeartbeatRequestData.KeyValue()
+                                                .setKey(CONFIG_NAME_2)
+                                                .setValue(CONFIG_VALUE_2)
+                                        ))
+                                )
+                            )
+                            .setCopartitionGroups(List.of(
+                                new 
StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                                    .setSourceTopics(List.of((short) 0))
+                                    
.setRepartitionSourceTopics(List.of((short) 0)),
+                                new 
StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                                    .setSourceTopicRegex(List.of((short) 0))
+                            )),
+                        new StreamsGroupHeartbeatRequestData.Subtopology()
+                            .setSubtopologyId(SUBTOPOLOGY_1)
+                            .setRepartitionSinkTopics(List.of())
+                            .setSourceTopics(List.of(TOPIC_BAR))
+                            .setSourceTopicRegex(List.of())
+                            .setRepartitionSourceTopics(List.of())
+                            .setStateChangelogTopics(List.of())
+                            .setCopartitionGroups(List.of())
+                    )
+                );
+
+        StreamsGroupTopologyValue expectedTopology =
+            new StreamsGroupTopologyValue()
+                .setEpoch(42)
+                .setSubtopologies(
+                    List.of(new StreamsGroupTopologyValue.Subtopology()
+                            .setSubtopologyId(SUBTOPOLOGY_1)
+                            .setRepartitionSinkTopics(List.of(TOPIC_FOO))
+                            .setSourceTopics(List.of(TOPIC_BAR))
+                            .setSourceTopicRegex(List.of(TOPIC_REGEX))
+                            .setRepartitionSourceTopics(
+                                List.of(
+                                    new StreamsGroupTopologyValue.TopicInfo()
+                                        .setName(TOPIC_REPARTITION)
+                                        .setPartitions(4)
+                                        .setReplicationFactor((short) 3)
+                                        .setTopicConfigs(List.of(
+                                            new 
StreamsGroupTopologyValue.TopicConfig()
+                                                .setKey(CONFIG_NAME_1)
+                                                .setValue(CONFIG_VALUE_1)
+                                        ))
+                                )
+                            )
+                            .setStateChangelogTopics(
+                                List.of(
+                                    new StreamsGroupTopologyValue.TopicInfo()
+                                        .setName(TOPIC_CHANGELOG)
+                                        .setReplicationFactor((short) 2)
+                                        .setTopicConfigs(List.of(
+                                            new 
StreamsGroupTopologyValue.TopicConfig()
+                                                .setKey(CONFIG_NAME_2)
+                                                .setValue(CONFIG_VALUE_2)
+                                        ))
+                                )
+                            )
+                            .setCopartitionGroups(List.of(
+                                new 
StreamsGroupTopologyValue.CopartitionGroup()
+                                    .setSourceTopics(List.of((short) 0))
+                                    
.setRepartitionSourceTopics(List.of((short) 0)),
+                                new 
StreamsGroupTopologyValue.CopartitionGroup()
+                                    .setSourceTopicRegex(List.of((short) 0))
+                            )),
+                        new StreamsGroupTopologyValue.Subtopology()
+                            .setSubtopologyId(SUBTOPOLOGY_1)
+                            .setRepartitionSinkTopics(List.of())
+                            .setSourceTopics(List.of(TOPIC_BAR))
+                            .setSourceTopicRegex(List.of())
+                            .setRepartitionSourceTopics(List.of())
+                            .setStateChangelogTopics(List.of())
+                            .setCopartitionGroups(List.of())
+                    )
+                );
+
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupTopologyKey()
+                .setGroupId(GROUP_ID),
+            new ApiMessageAndVersion(
+                expectedTopology,
+                (short) 0));
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(GROUP_ID, 
topology));
+    }
+
+    @Test
+    public void testNewStreamsGroupTopologyRecordTombstone() {
+        CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+            new StreamsGroupTopologyKey()
+                .setGroupId(GROUP_ID)
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(GROUP_ID));
+    }
+
+    @Test
+    public void testNewStreamsGroupMemberRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(null, 
mock(StreamsGroupMember.class)));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupMemberRecordNullMember() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("groupId", null));
+        assertEquals("member should not be null here", exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupMemberTombstoneRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(null, 
"memberId"));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupMemberTombstoneRecordNullMemberId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("groupId", 
null));
+        assertEquals("memberId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupPartitionMetadataRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(null, 
Map.of()));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void 
testNewStreamsGroupPartitionMetadataRecordNullNewPartitionMetadata() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("groupId",
 null));
+        assertEquals("newPartitionMetadata should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void 
testNewStreamsGroupPartitionMetadataTombstoneRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(null));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupEpochRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null, 
1));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupEpochTombstoneRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(null));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupTargetAssignmentRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(null, 
"memberId", mock(TasksTuple.class)));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupTargetAssignmentRecordNullMemberId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("groupId",
 null, mock(TasksTuple.class)));
+        assertEquals("memberId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupTargetAssignmentRecordNullAssignment() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("groupId",
 "memberId", null));
+        assertEquals("assignment should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void 
testNewStreamsGroupTargetAssignmentTombstoneRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(null,
 "memberId"));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void 
testNewStreamsGroupTargetAssignmentTombstoneRecordNullMemberId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("groupId",
 null));
+        assertEquals("memberId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupTargetAssignmentEpochRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(null,
 1));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void 
testNewStreamsGroupTargetAssignmentEpochTombstoneRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(null));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupCurrentAssignmentRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(null, 
mock(StreamsGroupMember.class)));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupCurrentAssignmentRecordNullMember() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("groupId",
 null));
+        assertEquals("member should not be null here", exception.getMessage());
+    }
+
+    @Test
+    public void 
testNewStreamsGroupCurrentAssignmentTombstoneRecordNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(null,
 "memberId"));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void 
testNewStreamsGroupCurrentAssignmentTombstoneRecordNullMemberId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("groupId",
 null));
+        assertEquals("memberId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupTopologyRecordWithValueNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(null, 
mock(StreamsGroupTopologyValue.class)));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupTopologyRecordWithTopologyNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(null, 
mock(StreamsGroupHeartbeatRequestData.Topology.class)));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupTopologyRecordNullTopology() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("groupId", 
(StreamsGroupHeartbeatRequestData.Topology) null));
+        assertEquals("topology should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupTopologyRecordNullValue() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("groupId", 
(StreamsGroupTopologyValue) null));
+        assertEquals("value should not be null here", exception.getMessage());
+    }
+
+    @Test
+    public void testNewStreamsGroupTopologyRecordTombstoneNullGroupId() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(null));
+        assertEquals("groupId should not be null here", 
exception.getMessage());
+    }
+
+    @Test
+    public void testConvertToStreamsGroupTopologyRecordNullTopology() {
+        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(null));
+        assertEquals("topology should not be null here", 
exception.getMessage());
+    }
+}
\ No newline at end of file

Reply via email to