This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 52280cdb55d KAFKA-18619: New consumer topic metadata events should set 
requireMetadata flag (#18668)
52280cdb55d is described below

commit 52280cdb55da9070be9c50374764bb9b678b2d9b
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Jan 29 21:36:05 2025 +0800

    KAFKA-18619: New consumer topic metadata events should set requireMetadata 
flag (#18668)
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../events/AbstractTopicMetadataEvent.java         |  5 ++++
 .../SaslClientsWithInvalidCredentialsTest.scala    | 31 +++++++++++-----------
 .../SaslClientsWithInvalidCredentialsTest.java     | 22 +++++++--------
 3 files changed, 31 insertions(+), 27 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
index 9621e34ef5b..cb23e6aaf28 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
@@ -26,4 +26,9 @@ public abstract class AbstractTopicMetadataEvent extends 
CompletableApplicationE
     protected AbstractTopicMetadataEvent(final Type type, final long 
deadlineMs) {
         super(type, deadlineMs);
     }
+
+    @Override
+    public boolean requireSubscriptionMetadata() {
+        return true;
+    }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 0735829a0b1..03a987c54b4 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -108,12 +108,12 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
       createProducer(configOverrides = prop)
     else
       producer
-    verifyWithRetry(sendOneRecord(producer2))
+    verifyWithRetry(sendOneRecord(producer2))()
   }
 
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
-  def testTransactionalProducerWithAuthenticationFailure(quorum: String, 
groupProtocol: String): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testTransactionalProducerWithAuthenticationFailure(quorum: String): Unit 
= {
     val txProducer = createTransactionalProducer()
     verifyAuthenticationException(txProducer.initTransactions())
 
@@ -122,7 +122,7 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testConsumerWithAuthenticationFailure(quorum: String, groupProtocol: 
String): Unit = {
     val consumer = createConsumer()
     consumer.subscribe(List(topic).asJava)
@@ -130,7 +130,7 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testManualAssignmentConsumerWithAuthenticationFailure(quorum: String, 
groupProtocol: String): Unit = {
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
@@ -138,7 +138,7 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def 
testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure(quorum:
 String, groupProtocol: String): Unit = {
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false.toString)
     val consumer = createConsumer()
@@ -153,13 +153,13 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
 
     createClientCredential()
     val producer = createProducer()
-    verifyWithRetry(sendOneRecord(producer))
-    verifyWithRetry(assertEquals(1, 
consumer.poll(Duration.ofMillis(1000)).count))
+    verifyWithRetry(sendOneRecord(producer))()
+    verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1)
   }
 
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
-  def testKafkaAdminClientWithAuthenticationFailure(quorum: String, 
groupProtocol: String): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testKafkaAdminClientWithAuthenticationFailure(quorum: String): Unit = {
     val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol, 
OptionConverters.toJava(trustStoreFile), 
OptionConverters.toJava(clientSaslProperties))
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     val adminClient = Admin.create(props)
@@ -180,7 +180,7 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
       verifyAuthenticationException(describeTopic())
 
       createClientCredential()
-      verifyWithRetry(describeTopic())
+      verifyWithRetry(describeTopic())()
     } finally {
       adminClient.close()
     }
@@ -209,13 +209,12 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
     assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs")
   }
 
-  private def verifyWithRetry(action: => Unit): Unit = {
+  private def verifyWithRetry[T](operation: => T)(predicate: T => Boolean = 
(_: T) => true): Unit = {
     var attempts = 0
     TestUtils.waitUntilTrue(() => {
       try {
         attempts += 1
-        action
-        true
+        predicate(operation)
       } catch {
         case _: SaslAuthenticationException => false
       }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
index cd4198c7c79..4aad4af5751 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
@@ -136,24 +136,26 @@ public class SaslClientsWithInvalidCredentialsTest 
extends AbstractSaslTest {
 
     // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() 
in the ParameterizedTest name.
     @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
-    
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+    @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
     public void testConsumerGroupServiceWithAuthenticationFailure(String 
quorum, String groupProtocol) throws Exception {
-        ConsumerGroupCommand.ConsumerGroupService consumerGroupService = 
prepareConsumerGroupService();
-        try (Consumer<byte[], byte[]> consumer = createConsumer()) {
+        try (
+            ConsumerGroupCommand.ConsumerGroupService consumerGroupService = 
prepareConsumerGroupService();
+            Consumer<byte[], byte[]> consumer = createConsumer()
+        ) {
             consumer.subscribe(Collections.singletonList(TOPIC));
-
             verifyAuthenticationException(consumerGroupService::listGroups);
-        } finally {
-            consumerGroupService.close();
         }
     }
 
+    // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() 
in the ParameterizedTest name.
     @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
-    
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+    @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
     public void testConsumerGroupServiceWithAuthenticationSuccess(String 
quorum, String groupProtocol) throws Exception {
         
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2,
 JaasTestUtils.KAFKA_SCRAM_PASSWORD_2);
-        ConsumerGroupCommand.ConsumerGroupService consumerGroupService = 
prepareConsumerGroupService();
-        try (Consumer<byte[], byte[]> consumer = createConsumer()) {
+        try (
+            ConsumerGroupCommand.ConsumerGroupService consumerGroupService = 
prepareConsumerGroupService();
+            Consumer<byte[], byte[]> consumer = createConsumer()
+        ) {
             consumer.subscribe(Collections.singletonList(TOPIC));
 
             TestUtils.waitForCondition(() -> {
@@ -165,8 +167,6 @@ public class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
                 }
             }, "failed to poll data with authentication");
             assertEquals(1, consumerGroupService.listConsumerGroups().size());
-        } finally {
-            consumerGroupService.close();
         }
     }
 

Reply via email to