This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 52280cdb55d KAFKA-18619: New consumer topic metadata events should set
requireMetadata flag (#18668)
52280cdb55d is described below
commit 52280cdb55da9070be9c50374764bb9b678b2d9b
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Jan 29 21:36:05 2025 +0800
KAFKA-18619: New consumer topic metadata events should set requireMetadata
flag (#18668)
Reviewers: Lianet Magrans <[email protected]>
---
.../events/AbstractTopicMetadataEvent.java | 5 ++++
.../SaslClientsWithInvalidCredentialsTest.scala | 31 +++++++++++-----------
.../SaslClientsWithInvalidCredentialsTest.java | 22 +++++++--------
3 files changed, 31 insertions(+), 27 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
index 9621e34ef5b..cb23e6aaf28 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
@@ -26,4 +26,9 @@ public abstract class AbstractTopicMetadataEvent extends
CompletableApplicationE
protected AbstractTopicMetadataEvent(final Type type, final long
deadlineMs) {
super(type, deadlineMs);
}
+
+ @Override
+ public boolean requireSubscriptionMetadata() {
+ return true;
+ }
}
diff --git
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 0735829a0b1..03a987c54b4 100644
---
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -108,12 +108,12 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
createProducer(configOverrides = prop)
else
producer
- verifyWithRetry(sendOneRecord(producer2))
+ verifyWithRetry(sendOneRecord(producer2))()
}
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
- def testTransactionalProducerWithAuthenticationFailure(quorum: String,
groupProtocol: String): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testTransactionalProducerWithAuthenticationFailure(quorum: String): Unit
= {
val txProducer = createTransactionalProducer()
verifyAuthenticationException(txProducer.initTransactions())
@@ -122,7 +122,7 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumerWithAuthenticationFailure(quorum: String, groupProtocol:
String): Unit = {
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
@@ -130,7 +130,7 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testManualAssignmentConsumerWithAuthenticationFailure(quorum: String,
groupProtocol: String): Unit = {
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
@@ -138,7 +138,7 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def
testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure(quorum:
String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false.toString)
val consumer = createConsumer()
@@ -153,13 +153,13 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
createClientCredential()
val producer = createProducer()
- verifyWithRetry(sendOneRecord(producer))
- verifyWithRetry(assertEquals(1,
consumer.poll(Duration.ofMillis(1000)).count))
+ verifyWithRetry(sendOneRecord(producer))()
+ verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1)
}
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
- def testKafkaAdminClientWithAuthenticationFailure(quorum: String,
groupProtocol: String): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testKafkaAdminClientWithAuthenticationFailure(quorum: String): Unit = {
val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol,
OptionConverters.toJava(trustStoreFile),
OptionConverters.toJava(clientSaslProperties))
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val adminClient = Admin.create(props)
@@ -180,7 +180,7 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
verifyAuthenticationException(describeTopic())
createClientCredential()
- verifyWithRetry(describeTopic())
+ verifyWithRetry(describeTopic())()
} finally {
adminClient.close()
}
@@ -209,13 +209,12 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs")
}
- private def verifyWithRetry(action: => Unit): Unit = {
+ private def verifyWithRetry[T](operation: => T)(predicate: T => Boolean =
(_: T) => true): Unit = {
var attempts = 0
TestUtils.waitUntilTrue(() => {
try {
attempts += 1
- action
- true
+ predicate(operation)
} catch {
case _: SaslAuthenticationException => false
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
index cd4198c7c79..4aad4af5751 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
@@ -136,24 +136,26 @@ public class SaslClientsWithInvalidCredentialsTest
extends AbstractSaslTest {
// NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName()
in the ParameterizedTest name.
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
-
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
public void testConsumerGroupServiceWithAuthenticationFailure(String
quorum, String groupProtocol) throws Exception {
- ConsumerGroupCommand.ConsumerGroupService consumerGroupService =
prepareConsumerGroupService();
- try (Consumer<byte[], byte[]> consumer = createConsumer()) {
+ try (
+ ConsumerGroupCommand.ConsumerGroupService consumerGroupService =
prepareConsumerGroupService();
+ Consumer<byte[], byte[]> consumer = createConsumer()
+ ) {
consumer.subscribe(Collections.singletonList(TOPIC));
-
verifyAuthenticationException(consumerGroupService::listGroups);
- } finally {
- consumerGroupService.close();
}
}
+ // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName()
in the ParameterizedTest name.
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
-
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
public void testConsumerGroupServiceWithAuthenticationSuccess(String
quorum, String groupProtocol) throws Exception {
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2,
JaasTestUtils.KAFKA_SCRAM_PASSWORD_2);
- ConsumerGroupCommand.ConsumerGroupService consumerGroupService =
prepareConsumerGroupService();
- try (Consumer<byte[], byte[]> consumer = createConsumer()) {
+ try (
+ ConsumerGroupCommand.ConsumerGroupService consumerGroupService =
prepareConsumerGroupService();
+ Consumer<byte[], byte[]> consumer = createConsumer()
+ ) {
consumer.subscribe(Collections.singletonList(TOPIC));
TestUtils.waitForCondition(() -> {
@@ -165,8 +167,6 @@ public class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
}
}, "failed to poll data with authentication");
assertEquals(1, consumerGroupService.listConsumerGroups().size());
- } finally {
- consumerGroupService.close();
}
}