This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce4eeaa3796 MINOR: restore 
`testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition` 
(#18633)
ce4eeaa3796 is described below

commit ce4eeaa379678af21376b3efa4441f420fbf0f56
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Jan 24 01:27:18 2025 +0800

    MINOR: restore 
`testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition` 
(#18633)
    
    Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 41 ++++++++++++++++++++++
 1 file changed, 41 insertions(+)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5dc5ad06f29..af26996c56e 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3562,6 +3562,47 @@ class KafkaApisTest extends Logging {
     assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
   }
 
+
+  /**
+   * Metadata request to fetch all topics should not result in the followings:
+   * 1) Auto topic creation
+   * 2) UNKNOWN_TOPIC_OR_PARTITION
+   *
+   * This case is testing the case that a topic is being deleted from 
MetadataCache right after
+   * authorization but before checking in MetadataCache.
+   */
+  @Test
+  def 
testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): 
Unit = {
+    // Setup: authorizer authorizes 2 topics, but one got deleted in metadata 
cache
+    metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.getAliveBrokerNodes(any())).thenReturn(List(new 
Node(brokerId,"localhost", 0)))
+    when(metadataCache.getRandomAliveBrokerId).thenReturn(None)
+
+    // 2 topics returned for authorization in during handle
+    val topicsReturnedFromMetadataCacheForAuthorization = 
Set("remaining-topic", "later-deleted-topic")
+    
when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
+    // 1 topic is deleted from metadata right at the time between 
authorization and the next getTopicMetadata() call
+    when(metadataCache.getTopicMetadata(
+      ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization),
+      any[ListenerName],
+      anyBoolean,
+      anyBoolean
+    )).thenReturn(Seq(
+      new MetadataResponseTopic()
+        .setErrorCode(Errors.NONE.code)
+        .setName("remaining-topic")
+        .setIsInternal(false)
+    ))
+
+    val response = sendMetadataRequestWithInconsistentListeners(new 
ListenerName("PLAINTEXT"))
+    val responseTopics = response.topicMetadata().asScala.map { metadata => 
metadata.topic() }
+
+    // verify we don't create topic when getAllTopicMetadata
+    verify(autoTopicCreationManager, never).createTopics(any(), any(), any())
+    assertEquals(List("remaining-topic"), responseTopics)
+    
assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty)
+  }
+
   @Test
   def testUnauthorizedTopicMetadataRequest(): Unit = {
     // 1. Set up broker information

Reply via email to