philipnee commented on code in PR #13920: URL: https://github.com/apache/kafka/pull/13920#discussion_r1270851855
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ########## @@ -724,6 +725,90 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } + @Timeout(90) + @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) + @ValueSource(booleans = {false, true}) + public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { + initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); + int topicCount = hasConsumerRack ? 50 : 500; + int partitionCount = 2_00; + int consumerCount = 2_0; + + List<String> topics = new ArrayList<>(); + Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>(); + for (int i = 0; i < topicCount; i++) { + String topicName = getTopicName(i, topicCount); + topics.add(topicName); + partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); + } + for (int i = 0; i < consumerCount; i++) { + if (i % 4 == 0) { + subscriptions.put(getConsumerName(i, consumerCount), + subscription(topics.subList(0, topicCount / 2), i)); + } else { + subscriptions.put(getConsumerName(i, consumerCount), + subscription(topics.subList(topicCount / 2, topicCount), i)); + } + } + + Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); + + for (int i = 1; i < consumerCount; i++) { + String consumer = getConsumerName(i, consumerCount); + if (i % 4 == 0) { + subscriptions.put( + consumer, + buildSubscriptionV2Above(topics.subList(0, topicCount / 2), + assignment.get(consumer), generationId, i) + ); + } else { + subscriptions.put(consumer, Review Comment: nit: can the style be consistent with the .put above? i.e.: either (consumer or ( ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ########## @@ -724,6 +725,90 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } + @Timeout(90) + @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) + @ValueSource(booleans = {false, true}) + public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { + initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); + int topicCount = hasConsumerRack ? 50 : 500; + int partitionCount = 2_00; + int consumerCount = 2_0; + + List<String> topics = new ArrayList<>(); + Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>(); + for (int i = 0; i < topicCount; i++) { + String topicName = getTopicName(i, topicCount); + topics.add(topicName); + partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); + } + for (int i = 0; i < consumerCount; i++) { + if (i % 4 == 0) { + subscriptions.put(getConsumerName(i, consumerCount), + subscription(topics.subList(0, topicCount / 2), i)); + } else { + subscriptions.put(getConsumerName(i, consumerCount), + subscription(topics.subList(topicCount / 2, topicCount), i)); + } + } + + Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); + + for (int i = 1; i < consumerCount; i++) { + String consumer = getConsumerName(i, consumerCount); + if (i % 4 == 0) { + subscriptions.put( + consumer, + buildSubscriptionV2Above(topics.subList(0, topicCount / 2), + assignment.get(consumer), generationId, i) + ); + } else { + subscriptions.put(consumer, + buildSubscriptionV2Above(topics.subList(topicCount / 2, topicCount), + assignment.get(consumer), generationId, i) + ); + } + } + + assignor.assignPartitions(partitionsPerTopic, subscriptions); + } + + @Test + public void testSubscriptionNotEqualAndAssignSamePartitionWith3Generation() { Review Comment: Thanks for the test! I wrote a similar test, `testEnsurePartitionsAssignedToHighestGeneration` but just the generation order was 0, 1, 2 instead of, per your suggestions, 0, 2, 1. Do you think we could parametrize that test or reuse the same test style, to make things more consistent, but change the assignment order? Also, is the title accurate? This happens in `allSubscriptionsEqual`, so it doesn't matter if the subscriptions are equal or not right? The goal of this is to ensure consumer with the highest generation gets the right assignment, regardless of the generation order. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ########## @@ -724,6 +725,90 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } + @Timeout(90) Review Comment: Just an opinion here: Other tests also use timeout annotation, but it might be better to use `TestUtils.waitForCondition(()` from other tests because it directly tests the timeout of the `assignPartitions` invocation. To me, it is a bit more precise about where the tests are failing. Also, any specific reason we use 90? I assume the unit is second? I see there's a variation of timeout we are using, from 30->90 😅. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ########## @@ -724,6 +725,90 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } + @Timeout(90) + @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) + @ValueSource(booleans = {false, true}) + public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { + initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); + int topicCount = hasConsumerRack ? 50 : 500; + int partitionCount = 2_00; + int consumerCount = 2_0; + + List<String> topics = new ArrayList<>(); + Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>(); + for (int i = 0; i < topicCount; i++) { + String topicName = getTopicName(i, topicCount); + topics.add(topicName); + partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); + } + for (int i = 0; i < consumerCount; i++) { + if (i % 4 == 0) { + subscriptions.put(getConsumerName(i, consumerCount), + subscription(topics.subList(0, topicCount / 2), i)); + } else { + subscriptions.put(getConsumerName(i, consumerCount), + subscription(topics.subList(topicCount / 2, topicCount), i)); + } + } + + Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); + + for (int i = 1; i < consumerCount; i++) { + String consumer = getConsumerName(i, consumerCount); + if (i % 4 == 0) { + subscriptions.put( + consumer, + buildSubscriptionV2Above(topics.subList(0, topicCount / 2), + assignment.get(consumer), generationId, i) + ); + } else { + subscriptions.put(consumer, + buildSubscriptionV2Above(topics.subList(topicCount / 2, topicCount), + assignment.get(consumer), generationId, i) + ); + } + } + + assignor.assignPartitions(partitionsPerTopic, subscriptions); Review Comment: Can we directly test the `GeneralAssignmentBuilder.build()` ? It seems like that's where isBalanced is triggered. We could test it at the top level, but as the bug occurs in isBalanced() which is only triggered in the GeneralAssignmentBuilder, it would be more direct, when failing, for people to debug. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ########## @@ -724,6 +725,90 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } + @Timeout(90) Review Comment: Just an opinion here: Other tests also use timeout annotation, but it might be better to use `TestUtils.waitForCondition(()` from other tests because it directly tests the timeout of the `assignPartitions` invocation. To me, it is a bit more precise about where the tests are failing. Also, any specific reason we use 90? I assume the unit is second? I see there's a variation of timeout we are using, from 30->90 😅. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org