This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ce4eeaa3796 MINOR: restore
`testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition`
(#18633)
ce4eeaa3796 is described below
commit ce4eeaa379678af21376b3efa4441f420fbf0f56
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Jan 24 01:27:18 2025 +0800
MINOR: restore
`testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition`
(#18633)
Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../scala/unit/kafka/server/KafkaApisTest.scala | 41 ++++++++++++++++++++++
1 file changed, 41 insertions(+)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5dc5ad06f29..af26996c56e 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3562,6 +3562,47 @@ class KafkaApisTest extends Logging {
assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
}
+
+ /**
+ * Metadata request to fetch all topics should not result in the followings:
+ * 1) Auto topic creation
+ * 2) UNKNOWN_TOPIC_OR_PARTITION
+ *
+ * This case is testing the case that a topic is being deleted from
MetadataCache right after
+ * authorization but before checking in MetadataCache.
+ */
+ @Test
+ def
testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition():
Unit = {
+ // Setup: authorizer authorizes 2 topics, but one got deleted in metadata
cache
+ metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.getAliveBrokerNodes(any())).thenReturn(List(new
Node(brokerId,"localhost", 0)))
+ when(metadataCache.getRandomAliveBrokerId).thenReturn(None)
+
+ // 2 topics returned for authorization in during handle
+ val topicsReturnedFromMetadataCacheForAuthorization =
Set("remaining-topic", "later-deleted-topic")
+
when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
+ // 1 topic is deleted from metadata right at the time between
authorization and the next getTopicMetadata() call
+ when(metadataCache.getTopicMetadata(
+ ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization),
+ any[ListenerName],
+ anyBoolean,
+ anyBoolean
+ )).thenReturn(Seq(
+ new MetadataResponseTopic()
+ .setErrorCode(Errors.NONE.code)
+ .setName("remaining-topic")
+ .setIsInternal(false)
+ ))
+
+ val response = sendMetadataRequestWithInconsistentListeners(new
ListenerName("PLAINTEXT"))
+ val responseTopics = response.topicMetadata().asScala.map { metadata =>
metadata.topic() }
+
+ // verify we don't create topic when getAllTopicMetadata
+ verify(autoTopicCreationManager, never).createTopics(any(), any(), any())
+ assertEquals(List("remaining-topic"), responseTopics)
+
assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty)
+ }
+
@Test
def testUnauthorizedTopicMetadataRequest(): Unit = {
// 1. Set up broker information