philipnee commented on code in PR #13920:
URL: https://github.com/apache/kafka/pull/13920#discussion_r1270851855


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -724,6 +725,90 @@ public void 
testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
         assignor.assignPartitions(partitionsPerTopic, subscriptions);
     }
 
+    @Timeout(90)
+    @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
+    @ValueSource(booleans = {false, true})
+    public void 
testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean 
hasConsumerRack) {
+        initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK 
: RackConfig.NO_CONSUMER_RACK);
+        int topicCount = hasConsumerRack ? 50 : 500;
+        int partitionCount = 2_00;
+        int consumerCount = 2_0;
+
+        List<String> topics = new ArrayList<>();
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        for (int i = 0; i < topicCount; i++) {
+            String topicName = getTopicName(i, topicCount);
+            topics.add(topicName);
+            partitionsPerTopic.put(topicName, partitionInfos(topicName, 
partitionCount));
+        }
+        for (int i = 0; i < consumerCount; i++) {
+            if (i % 4 == 0) {
+                subscriptions.put(getConsumerName(i, consumerCount),
+                        subscription(topics.subList(0, topicCount / 2), i));
+            } else {
+                subscriptions.put(getConsumerName(i, consumerCount),
+                        subscription(topics.subList(topicCount / 2, 
topicCount), i));
+            }
+        }
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+
+        for (int i = 1; i < consumerCount; i++) {
+            String consumer = getConsumerName(i, consumerCount);
+            if (i % 4 == 0) {
+                subscriptions.put(
+                        consumer,
+                        buildSubscriptionV2Above(topics.subList(0, topicCount 
/ 2),
+                        assignment.get(consumer), generationId, i)
+                );
+            } else {
+                subscriptions.put(consumer,

Review Comment:
   nit: can the style be consistent with the .put above? i.e.: either (consumer 
or (



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -724,6 +725,90 @@ public void 
testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
         assignor.assignPartitions(partitionsPerTopic, subscriptions);
     }
 
+    @Timeout(90)
+    @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
+    @ValueSource(booleans = {false, true})
+    public void 
testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean 
hasConsumerRack) {
+        initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK 
: RackConfig.NO_CONSUMER_RACK);
+        int topicCount = hasConsumerRack ? 50 : 500;
+        int partitionCount = 2_00;
+        int consumerCount = 2_0;
+
+        List<String> topics = new ArrayList<>();
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        for (int i = 0; i < topicCount; i++) {
+            String topicName = getTopicName(i, topicCount);
+            topics.add(topicName);
+            partitionsPerTopic.put(topicName, partitionInfos(topicName, 
partitionCount));
+        }
+        for (int i = 0; i < consumerCount; i++) {
+            if (i % 4 == 0) {
+                subscriptions.put(getConsumerName(i, consumerCount),
+                        subscription(topics.subList(0, topicCount / 2), i));
+            } else {
+                subscriptions.put(getConsumerName(i, consumerCount),
+                        subscription(topics.subList(topicCount / 2, 
topicCount), i));
+            }
+        }
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+
+        for (int i = 1; i < consumerCount; i++) {
+            String consumer = getConsumerName(i, consumerCount);
+            if (i % 4 == 0) {
+                subscriptions.put(
+                        consumer,
+                        buildSubscriptionV2Above(topics.subList(0, topicCount 
/ 2),
+                        assignment.get(consumer), generationId, i)
+                );
+            } else {
+                subscriptions.put(consumer,
+                        buildSubscriptionV2Above(topics.subList(topicCount / 
2, topicCount),
+                        assignment.get(consumer), generationId, i)
+                );
+            }
+        }
+
+        assignor.assignPartitions(partitionsPerTopic, subscriptions);
+    }
+
+    @Test
+    public void 
testSubscriptionNotEqualAndAssignSamePartitionWith3Generation() {

Review Comment:
   Thanks for the test! I wrote a similar test, 
`testEnsurePartitionsAssignedToHighestGeneration` but just the generation order 
was 0, 1, 2 instead of, per your suggestions, 0, 2, 1.  Do you think we could 
parametrize that test or reuse the same test style, to make things more 
consistent, but change the assignment order?  Also, is the title accurate? This 
happens in `allSubscriptionsEqual`, so it doesn't matter if the subscriptions 
are equal or not right? The goal of this is to ensure consumer with the highest 
generation gets the right assignment, regardless of the generation order.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -724,6 +725,90 @@ public void 
testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
         assignor.assignPartitions(partitionsPerTopic, subscriptions);
     }
 
+    @Timeout(90)

Review Comment:
   Just an opinion here: Other tests also use timeout annotation, but it might 
be better to use `TestUtils.waitForCondition(()` from other tests because it 
directly tests the timeout of the `assignPartitions` invocation.  To me, it is 
a bit more precise about where the tests are failing.  Also, any specific 
reason we use 90?  I assume the unit is second? I see there's a variation of 
timeout we are using, from 30->90 😅.  



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -724,6 +725,90 @@ public void 
testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
         assignor.assignPartitions(partitionsPerTopic, subscriptions);
     }
 
+    @Timeout(90)
+    @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
+    @ValueSource(booleans = {false, true})
+    public void 
testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean 
hasConsumerRack) {
+        initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK 
: RackConfig.NO_CONSUMER_RACK);
+        int topicCount = hasConsumerRack ? 50 : 500;
+        int partitionCount = 2_00;
+        int consumerCount = 2_0;
+
+        List<String> topics = new ArrayList<>();
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        for (int i = 0; i < topicCount; i++) {
+            String topicName = getTopicName(i, topicCount);
+            topics.add(topicName);
+            partitionsPerTopic.put(topicName, partitionInfos(topicName, 
partitionCount));
+        }
+        for (int i = 0; i < consumerCount; i++) {
+            if (i % 4 == 0) {
+                subscriptions.put(getConsumerName(i, consumerCount),
+                        subscription(topics.subList(0, topicCount / 2), i));
+            } else {
+                subscriptions.put(getConsumerName(i, consumerCount),
+                        subscription(topics.subList(topicCount / 2, 
topicCount), i));
+            }
+        }
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+
+        for (int i = 1; i < consumerCount; i++) {
+            String consumer = getConsumerName(i, consumerCount);
+            if (i % 4 == 0) {
+                subscriptions.put(
+                        consumer,
+                        buildSubscriptionV2Above(topics.subList(0, topicCount 
/ 2),
+                        assignment.get(consumer), generationId, i)
+                );
+            } else {
+                subscriptions.put(consumer,
+                        buildSubscriptionV2Above(topics.subList(topicCount / 
2, topicCount),
+                        assignment.get(consumer), generationId, i)
+                );
+            }
+        }
+
+        assignor.assignPartitions(partitionsPerTopic, subscriptions);

Review Comment:
   Can we directly test the `GeneralAssignmentBuilder.build()` ? It seems like 
that's where isBalanced is triggered.  We could test it at the top level, but 
as the bug occurs in isBalanced() which is only triggered in the 
GeneralAssignmentBuilder, it would be more direct, when failing, for people to 
debug.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -724,6 +725,90 @@ public void 
testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
         assignor.assignPartitions(partitionsPerTopic, subscriptions);
     }
 
+    @Timeout(90)

Review Comment:
   Just an opinion here: Other tests also use timeout annotation, but it might 
be better to use `TestUtils.waitForCondition(()` from other tests because it 
directly tests the timeout of the `assignPartitions` invocation.  To me, it is 
a bit more precise about where the tests are failing.  Also, any specific 
reason we use 90?  I assume the unit is second? I see there's a variation of 
timeout we are using, from 30->90 😅.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to