This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7cd99ea66da KAFKA-19373 Fix protocol name comparison (#19903)
7cd99ea66da is described below

commit 7cd99ea66dadfaab12814053d6acfc0d9bde1f5c
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Jun 4 23:48:26 2025 -0400

    KAFKA-19373 Fix protocol name comparison (#19903)
    
    Fix to ensure protocol name comparison in integration test ignore case
    (group protocol from param is lower case, vs enum name upper case)
    
    The tests were not failing but the custom configs/expectation were not
    being applied depending on the protocol (the tests checks for
    "groupProtocol.equals(CLASSIC)" would never be true.
    
    Found all comparisons with equals agains the constant name and fixed
    them (not too many luckily).
    
    I did consider changing the protocol param that is passed to every test
    (that is now lowercase), but still, seems more robust to have the tests
    ignore case.
    
    Reviewers: Gaurav Narula <[email protected]>, Ken Huang
     <[email protected]>, Chia-Ping Tsai <[email protected]>, TengYao Chi
     <[email protected]>
---
 core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala  | 2 +-
 .../src/test/scala/integration/kafka/api/ConsumerBounceTest.scala | 8 ++++----
 .../scala/integration/kafka/api/EndToEndAuthorizationTest.scala   | 2 +-
 .../integration/kafka/api/PlaintextConsumerSubscriptionTest.scala | 2 +-
 .../test/scala/integration/kafka/api/PlaintextConsumerTest.scala  | 2 +-
 5 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 191802438e1..2d5e1a2c631 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -83,7 +83,7 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def testCoordinatorFailover(groupProtocol: String): Unit = {
     val listener = new TestConsumerReassignmentListener()
-    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+    if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
       
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"5001")
       
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
     }
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 52cae6c0cbd..cb2287b3e89 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -307,7 +307,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     val consumer1 = createConsumerAndReceive(group1, manualAssign = false, 
numRecords)
 
     val requestTimeout = 6000
-    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+    if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
       
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"5000")
       
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
     }
@@ -338,7 +338,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     val partitionCount = consumerCount * 2
 
     
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"60000")
-    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+    if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
       
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
     }
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
@@ -377,7 +377,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     val group = "fatal-exception-test"
     val topic = "fatal-exception-test"
     
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"60000")
-    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+    if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
       
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
     }
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
@@ -418,7 +418,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     val topic = "closetest"
     createTopic(topic, 10, brokerCount)
     
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"60000")
-    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+    if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
       
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
     }
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 98ce6920f00..40bb4f649cb 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -431,7 +431,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 
     // Verify that records are consumed if all topics are authorized
     consumer.subscribe(java.util.List.of(topic))
-    if (groupProtocol.equals(GroupProtocol.CLASSIC)) {
+    if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
       consumeRecordsIgnoreOneAuthorizationException(consumer)
     } else {
       TestUtils.waitUntilTrue(() => {
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
index ff8b32b742e..776c1aef961 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
@@ -360,7 +360,7 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def testUnsubscribeTopic(groupProtocol: String): Unit = {
-    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+    if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
       
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"100") // timeout quickly to avoid slow test
       
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"30")
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index e67f27f9634..c9d33354c5d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -386,7 +386,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def testPauseStateNotPreservedByRebalance(groupProtocol: String): Unit = {
-    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+    if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
       
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"100") // timeout quickly to avoid slow test
       
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"30")
     }

Reply via email to