This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6acf69d7a25 MINOR: Remove the client side assignor from the 
ConsumerGroupHeartbeat API (#14469)
6acf69d7a25 is described below

commit 6acf69d7a257b83fbf3103772ca8d68093718274
Author: David Jacot <[email protected]>
AuthorDate: Mon Oct 2 04:59:20 2023 -0700

    MINOR: Remove the client side assignor from the ConsumerGroupHeartbeat API 
(#14469)
    
    As a first step, we plan to release a preview of the new consumer group 
rebalance protocol without the client side assignor. This patch removes all the 
related fields from the ConsumerGroupHeartbeat API for now. Removing fields is 
fine here because this API is not released yet and not exposed by default. We 
will add them back while bumping the version of the request when we release 
this part in the future.
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../message/ConsumerGroupHeartbeatRequest.json     |  15 ---
 .../message/ConsumerGroupHeartbeatResponse.json    |  14 +--
 .../internals/MembershipManagerImplTest.java       |   2 +-
 .../kafka/common/protocol/ProtoUtilsTest.java      |   1 -
 .../kafka/common/requests/RequestResponseTest.java |   3 +-
 .../server/ConsumerGroupHeartbeatRequestTest.scala |   2 +-
 .../coordinator/group/GroupMetadataManager.java    |  11 +--
 .../group/GroupMetadataManagerTest.java            | 106 ++++++---------------
 8 files changed, 35 insertions(+), 119 deletions(-)

diff --git 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
index dbe69d2f822..a05db31618a 100644
--- 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
+++ 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
@@ -43,21 +43,6 @@
       "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },
     { "name": "ServerAssignor", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "null if not used or if it didn't change since the last 
heartbeat; the server side assignor to use otherwise." },
-    { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-      "about": "null if not used or if it didn't change since the last 
heartbeat; the list of client-side assignors otherwise.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
-        "about": "The name of the assignor." },
-      { "name": "MinimumVersion", "type": "int16", "versions": "0+",
-        "about": "The minimum supported version for the metadata." },
-      { "name": "MaximumVersion", "type": "int16", "versions": "0+",
-        "about": "The maximum supported version for the metadata." },
-      { "name": "Reason", "type": "int8", "versions": "0+",
-        "about": "The reason of the metadata update." },
-      { "name": "MetadataVersion", "type": "int16", "versions": "0+",
-        "about": "The version of the metadata." },
-      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
-        "about": "The metadata." }
-    ]},
     { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": 
"0+", "nullableVersions": "0+", "default": "null",
       "about": "null if it didn't change since the last heartbeat; the 
partitions owned by the member.", "fields": [
       { "name": "TopicId", "type": "uuid", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
index b2e8992d7da..fb55f80bd40 100644
--- 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
+++ 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
@@ -41,22 +41,12 @@
       "about": "The member id generated by the coordinator. Only provided when 
the member joins with MemberEpoch == 0." },
     { "name": "MemberEpoch", "type": "int32", "versions": "0+",
       "about": "The member epoch." },
-    { "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+",
-      "about": "True if the member should compute the assignment for the 
group." },
     { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
       "about": "The heartbeat interval in milliseconds." },
     { "name": "Assignment", "type": "Assignment", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "null if not provided; the assignment otherwise.", "fields": [
-      { "name": "Error", "type": "int8", "versions": "0+",
-        "about": "The assigned error." },
-      { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", 
"versions": "0+",
-        "about": "The partitions assigned to the member that can be used 
immediately." },
-      { "name": "PendingTopicPartitions", "type": "[]TopicPartitions", 
"versions": "0+",
-        "about": "The partitions assigned to the member that cannot be used 
because they are not released by their former owners yet." },
-      { "name": "MetadataVersion", "type": "int16", "versions": "0+",
-        "about": "The version of the metadata." },
-      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
-        "about": "The assigned metadata." }
+      { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": 
"0+",
+        "about": "The partitions assigned to the member that can be used 
immediately." }
     ]}
   ],
   "commonStructs": [
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 8c20b2837d5..09e47acb416 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -190,7 +190,7 @@ public class MembershipManagerImplTest {
 
     private ConsumerGroupHeartbeatResponseData.Assignment createAssignment() {
         return new ConsumerGroupHeartbeatResponseData.Assignment()
-                .setAssignedTopicPartitions(Arrays.asList(
+                .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                                 .setTopicId(Uuid.randomUuid())
                                 .setPartitions(Arrays.asList(0, 1, 2)),
diff --git 
a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
index 74413dc6549..712c61168f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
@@ -35,7 +35,6 @@ public class ProtoUtilsTest {
                 case RENEW_DELEGATION_TOKEN:
                 case ALTER_USER_SCRAM_CREDENTIALS:
                 case ENVELOPE:
-                case CONSUMER_GROUP_HEARTBEAT:
                     assertTrue(key.requiresDelayedAllocation, key + " should 
require delayed allocation");
                     break;
                 default:
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 36bf5c45d73..303c13abfad 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1196,9 +1196,8 @@ public class RequestResponseTest {
             .setThrottleTimeMs(1000)
             .setMemberId("memberid")
             .setMemberEpoch(11)
-            .setShouldComputeAssignment(false)
             .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
-                .setAssignedTopicPartitions(Arrays.asList(
+                .setTopicPartitions(Arrays.asList(
                     new ConsumerGroupHeartbeatResponseData.TopicPartitions()
                         .setTopicId(Uuid.randomUuid())
                         .setPartitions(Arrays.asList(0, 1, 2)),
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 89cddfe97e7..75bad3e62cd 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -120,7 +120,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 
     // This is the expected assignment.
     val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
-      .setAssignedTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
         .setTopicId(topicId)
         .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 002b7956857..d85c3227ad6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -579,7 +579,6 @@ public class GroupMetadataManager {
         throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
         throwIfEmptyString(request.rackId(), "RackId can't be empty.");
         throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex 
is not supported yet.");
-        throwIfNotNull(request.clientAssignors(), "Client side assignors are 
not supported yet.");
 
         if (request.memberEpoch() > 0 || request.memberEpoch() == -1) {
             throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
@@ -689,14 +688,8 @@ public class GroupMetadataManager {
     private ConsumerGroupHeartbeatResponseData.Assignment 
createResponseAssignment(
         ConsumerGroupMember member
     ) {
-        ConsumerGroupHeartbeatResponseData.Assignment assignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
-            
.setAssignedTopicPartitions(fromAssignmentMap(member.assignedPartitions()));
-
-        if (member.state() == ConsumerGroupMember.MemberState.ASSIGNING) {
-            
assignment.setPendingTopicPartitions(fromAssignmentMap(member.partitionsPendingAssignment()));
-        }
-
-        return assignment;
+        return new ConsumerGroupHeartbeatResponseData.Assignment()
+            
.setTopicPartitions(fromAssignmentMap(member.assignedPartitions()));
     }
 
     private List<ConsumerGroupHeartbeatResponseData.TopicPartitions> 
fromAssignmentMap(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index a1b25575e8e..8159ffd3a94 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -1511,7 +1511,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(1)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)),
@@ -1610,7 +1610,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)),
@@ -1746,15 +1746,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId3)
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
-                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setPendingTopicPartitions(Arrays.asList(
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(4, 5)),
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(barTopicId)
-                            .setPartitions(Collections.singletonList(2))
-                    ))),
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
         );
 
@@ -1999,15 +1991,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId3)
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
-                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setPendingTopicPartitions(Arrays.asList(
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(4, 5)),
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(barTopicId)
-                            .setPartitions(Arrays.asList(1))
-                    ))),
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
         );
 
@@ -2042,7 +2026,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(10)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1)),
@@ -2085,7 +2069,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(10)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(3)),
@@ -2156,7 +2140,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1)),
@@ -2213,14 +2197,10 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(barTopicId)
-                            .setPartitions(Arrays.asList(1))))
-                    .setPendingTopicPartitions(Arrays.asList(
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(4, 5))))),
+                            .setPartitions(Arrays.asList(1))))),
             result.response()
         );
 
@@ -2281,7 +2261,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(2, 3)),
@@ -2319,7 +2299,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(4, 5)),
@@ -2414,12 +2394,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId2)
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
-                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setPendingTopicPartitions(Arrays.asList(
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(2))
-                    ))),
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
         );
 
@@ -2448,7 +2423,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(10)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1))))),
@@ -2503,12 +2478,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId3)
                 .setMemberEpoch(12)
                 .setHeartbeatIntervalMs(5000)
-                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setPendingTopicPartitions(Arrays.asList(
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(1))
-                    ))),
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
         );
 
@@ -2537,7 +2507,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(10)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0))))),
@@ -2570,11 +2540,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId2)
                 .setMemberEpoch(12)
                 .setHeartbeatIntervalMs(5000)
-                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setPendingTopicPartitions(Arrays.asList(
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(2))))),
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
         );
 
@@ -2817,7 +2783,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
@@ -2946,7 +2912,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
@@ -3378,7 +3344,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(1)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1, 2))))),
@@ -3419,11 +3385,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId2)
                 .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
-                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setPendingTopicPartitions(Arrays.asList(
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(2))))),
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
         );
 
@@ -3448,7 +3410,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(1)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1))))),
@@ -3495,11 +3457,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId3)
                 .setMemberEpoch(3)
                 .setHeartbeatIntervalMs(5000)
-                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setPendingTopicPartitions(Arrays.asList(
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(1))))),
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
         );
 
@@ -3524,7 +3482,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(1)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0))))),
@@ -3557,7 +3515,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(3)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0))))),
@@ -3618,7 +3576,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(1)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1, 2))))),
@@ -3659,11 +3617,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId2)
                 .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
-                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setPendingTopicPartitions(Arrays.asList(
-                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
-                            .setTopicId(fooTopicId)
-                            .setPartitions(Arrays.asList(2))))),
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
         );
 
@@ -3686,7 +3640,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(1)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
-                    .setAssignedTopicPartitions(Arrays.asList(
+                    .setTopicPartitions(Arrays.asList(
                         new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
                             .setTopicId(fooTopicId)
                             .setPartitions(Arrays.asList(0, 1))))),
@@ -8714,7 +8668,6 @@ public class GroupMetadataManagerTest {
         if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
         if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
         if (expected.memberEpoch() != actual.memberEpoch()) return false;
-        if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
         if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
         // Unordered comparison of the assignments.
         return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
@@ -8728,10 +8681,7 @@ public class GroupMetadataManagerTest {
         if (expected == null) return false;
         if (actual == null) return false;
 
-        if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), 
fromAssignment(actual.pendingTopicPartitions())))
-            return false;
-
-        return 
Objects.equals(fromAssignment(expected.assignedTopicPartitions()), 
fromAssignment(actual.assignedTopicPartitions()));
+        return Objects.equals(fromAssignment(expected.topicPartitions()), 
fromAssignment(actual.topicPartitions()));
     }
 
     private static Map<Uuid, Set<Integer>> fromAssignment(

Reply via email to