This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 17851da  KAFKA-12381: remove live broker checks for forwarding topic 
creation (#10240)
17851da is described below

commit 17851da66791361e9fb0250325da236988149100
Author: Boyang Chen <[email protected]>
AuthorDate: Fri Mar 5 15:55:14 2021 -0800

    KAFKA-12381: remove live broker checks for forwarding topic creation 
(#10240)
    
    Removed broker number checks for invalid replication factor when doing the 
forwarding, in order to reduce false alarms for clients.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../kafka/server/AutoTopicCreationManager.scala    |  33 +-----
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +-
 .../server/AutoTopicCreationManagerTest.scala      | 118 ++++++++++++++++++---
 tests/kafkatest/tests/core/security_test.py        |   7 +-
 4 files changed, 110 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 01dabed..f8063a8 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference
 import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.server.metadata.MetadataBroker
 import kafka.utils.Logging
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.errors.InvalidTopicException
@@ -82,14 +81,13 @@ object AutoTopicCreationManager {
         ))
       else
         None
-    new DefaultAutoTopicCreationManager(config, metadataCache, channelManager, 
adminManager,
+    new DefaultAutoTopicCreationManager(config, channelManager, adminManager,
       controller, groupCoordinator, txnCoordinator)
   }
 }
 
 class DefaultAutoTopicCreationManager(
   config: KafkaConfig,
-  metadataCache: MetadataCache,
   channelManager: Option[BrokerToControllerChannelManager],
   adminManager: Option[ZkAdminManager],
   controller: Option[KafkaController],
@@ -266,7 +264,6 @@ class DefaultAutoTopicCreationManager(
     topics: Set[String]
   ): (Map[String, CreatableTopic], Seq[MetadataResponseTopic]) = {
 
-    val aliveBrokers = metadataCache.getAliveBrokers
     val creatableTopics = mutable.Map.empty[String, CreatableTopic]
     val uncreatableTopics = mutable.Buffer.empty[MetadataResponseTopic]
 
@@ -274,8 +271,6 @@ class DefaultAutoTopicCreationManager(
       // Attempt basic topic validation before sending any requests to the 
controller.
       val validationError: Option[Errors] = if (!isValidTopicName(topic)) {
         Some(Errors.INVALID_TOPIC_EXCEPTION)
-      } else if (!hasEnoughLiveBrokers(topic, aliveBrokers)) {
-        Some(Errors.INVALID_REPLICATION_FACTOR)
       } else if (!inflightTopics.add(topic)) {
         Some(Errors.UNKNOWN_TOPIC_OR_PARTITION)
       } else {
@@ -295,30 +290,4 @@ class DefaultAutoTopicCreationManager(
 
     (creatableTopics, uncreatableTopics)
   }
-
-  private def hasEnoughLiveBrokers(
-    topicName: String,
-    aliveBrokers: Seq[MetadataBroker]
-  ): Boolean = {
-    val (replicationFactor, replicationFactorConfig) = topicName match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        (config.offsetsTopicReplicationFactor.intValue, 
KafkaConfig.OffsetsTopicReplicationFactorProp)
-
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        (config.transactionTopicReplicationFactor.intValue, 
KafkaConfig.TransactionsTopicReplicationFactorProp)
-
-      case _ =>
-        (config.defaultReplicationFactor, 
KafkaConfig.DefaultReplicationFactorProp)
-    }
-
-    if (aliveBrokers.size < replicationFactor) {
-      error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the 
required replication factor " +
-        s"'$replicationFactor' for auto creation of topic '$topicName' which 
is configured by $replicationFactorConfig. " +
-        "This error can be ignored if the cluster is starting up and not all 
brokers are up yet.")
-      false
-    } else {
-      true
-    }
-  }
-
 }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 4c0aaab..e7f7a12 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -248,7 +248,7 @@ class BrokerServer(
       val autoTopicCreationChannelManager = 
BrokerToControllerChannelManager(controllerNodeProvider,
         time, metrics, config, "autocreate", threadNamePrefix, 60000)
       autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-        config, metadataCache, Some(autoTopicCreationChannelManager), None, 
None,
+        config, Some(autoTopicCreationChannelManager), None, None,
         groupCoordinator, transactionCoordinator)
       autoTopicCreationManager.start()
 
diff --git 
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 9f9749b..0bd97c3 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -24,6 +24,7 @@ import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils.createBroker
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME}
 import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
@@ -33,6 +34,7 @@ import org.apache.kafka.common.requests._
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
+import org.mockito.invocation.InvocationOnMock
 import org.mockito.{ArgumentMatchers, Mockito}
 
 import scala.collection.{Map, Seq}
@@ -94,7 +96,6 @@ class AutoTopicCreationManagerTest {
                               replicationFactor: Short = 1): Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      metadataCache,
       Some(brokerToController),
       Some(adminManager),
       Some(controller),
@@ -123,7 +124,6 @@ class AutoTopicCreationManagerTest {
   def testCreateTopicsWithForwardingDisabled(): Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      metadataCache,
       None,
       Some(adminManager),
       Some(controller),
@@ -146,29 +146,117 @@ class AutoTopicCreationManagerTest {
   }
 
   @Test
-  def testNotEnoughLiveBrokers(): Unit = {
-    val props = TestUtils.createBrokerConfig(1, "localhost")
-    props.setProperty(KafkaConfig.DefaultReplicationFactorProp, 3.toString)
-    config = KafkaConfig.fromProps(props)
+  def testInvalidReplicationFactorForNonInternalTopics(): Unit = {
+    testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR, "topic", 
isInternal = false)
+  }
+
+  @Test
+  def testInvalidReplicationFactorForConsumerOffsetsTopic(): Unit = {
+    Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+    testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
+  }
+
+  @Test
+  def testInvalidReplicationFactorForTxnOffsetTopic(): Unit = {
+    
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
+    testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
+  }
+
+  @Test
+  def testTopicExistsErrorSwapForNonInternalTopics(): Unit = {
+    testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS, "topic", isInternal 
= false,
+      expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+  }
+
+  @Test
+  def testTopicExistsErrorSwapForConsumerOffsetsTopic(): Unit = {
+    Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+    testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
+      expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+  }
+
+  @Test
+  def testTopicExistsErrorSwapForTxnOffsetTopic(): Unit = {
+    
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
+    testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true,
+      expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+  }
+
+  @Test
+  def testRequestTimeoutErrorSwapForNonInternalTopics(): Unit = {
+    testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT, "topic", isInternal = 
false,
+      expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+  }
+
+  @Test
+  def testRequestTimeoutErrorSwapForConsumerOffsetTopic(): Unit = {
+    Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+    testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
+      expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+  }
+
+  @Test
+  def testRequestTimeoutErrorSwapForTxnOffsetTopic(): Unit = {
+    
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
+    testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true,
+      expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+  }
+
+  @Test
+  def testUnknownTopicPartitionForNonIntervalTopic(): Unit = {
+    testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, "topic", 
isInternal = false)
+  }
 
+  @Test
+  def testUnknownTopicPartitionForConsumerOffsetTopic(): Unit = {
+    Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+    testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
+  }
+
+  @Test
+  def testUnknownTopicPartitionForTxnOffsetTopic(): Unit = {
+    
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
+    testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
+  }
+
+  private def testErrorWithCreationInZk(error: Errors,
+                                        topicName: String,
+                                        isInternal: Boolean,
+                                        expectedError: Option[Errors] = None): 
Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      metadataCache,
-      Some(brokerToController),
+      None,
       Some(adminManager),
       Some(controller),
       groupCoordinator,
       transactionCoordinator)
 
-    val topicName = "topic"
-
     Mockito.when(controller.isActive).thenReturn(false)
+    val newTopic = if (isInternal) {
+      topicName match {
+        case Topic.GROUP_METADATA_TOPIC_NAME => getNewTopic(topicName,
+          numPartitions = config.offsetsTopicPartitions, replicationFactor = 
config.offsetsTopicReplicationFactor)
+        case Topic.TRANSACTION_STATE_TOPIC_NAME => getNewTopic(topicName,
+          numPartitions = config.transactionTopicPartitions, replicationFactor 
= config.transactionTopicReplicationFactor)
+      }
+    } else {
+      getNewTopic(topicName)
+    }
+
+    val topicErrors = if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) null else
+      Map(topicName -> new ApiError(error))
+    Mockito.when(adminManager.createTopics(
+      ArgumentMatchers.eq(0),
+      ArgumentMatchers.eq(false),
+      ArgumentMatchers.eq(Map(topicName -> newTopic)),
+      ArgumentMatchers.eq(Map.empty),
+      any(classOf[ControllerMutationQuota]),
+      any(classOf[Map[String, ApiError] => Unit]))).thenAnswer((invocation: 
InvocationOnMock) => {
+      invocation.getArgument(5).asInstanceOf[Map[String, ApiError] => Unit]
+        .apply(topicErrors)
+    })
 
-    createTopicAndVerifyResult(Errors.INVALID_REPLICATION_FACTOR, topicName, 
false)
-
-    Mockito.verify(brokerToController, Mockito.never()).sendRequest(
-      any(),
-      any(classOf[ControllerRequestCompletionHandler]))
+    createTopicAndVerifyResult(expectedError.getOrElse(error), topicName, 
isInternal = isInternal)
   }
 
   private def createTopicAndVerifyResult(error: Errors,
diff --git a/tests/kafkatest/tests/core/security_test.py 
b/tests/kafkatest/tests/core/security_test.py
index 0ce12c9..8e2b695 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -65,7 +65,10 @@ class SecurityTest(EndToEndTest):
         Test that invalid hostname in certificate results in connection 
failures.
         When security_protocol=SSL, client SSL handshakes are expected to fail 
due to hostname verification failure.
         When security_protocol=PLAINTEXT and 
interbroker_security_protocol=SSL, controller connections fail
-        with hostname verification failure. Hence clients are expected to fail 
with LEADER_NOT_AVAILABLE.
+        with hostname verification failure. Since metadata cannot be 
propagated in the cluster without a valid certificate,
+        the broker's metadata caches will be empty. Hence we expect Metadata 
requests to fail with an INVALID_REPLICATION_FACTOR
+        error since the broker will attempt to create the topic automatically 
as it does not exist in the metadata cache,
+        and there will be no online brokers.
         """
 
         # Start Kafka with valid hostnames in the certs' SANs so that we can 
create the test topic via the admin client
@@ -111,7 +114,7 @@ class SecurityTest(EndToEndTest):
                 # expected
                 pass
 
-            error = 'SSLHandshakeException' if security_protocol == 'SSL' else 
'LEADER_NOT_AVAILABLE'
+            error = 'SSLHandshakeException' if security_protocol == 'SSL' else 
'INVALID_REPLICATION_FACTOR'
             wait_until(lambda: 
self.producer_consumer_have_expected_error(error), timeout_sec=30)
             self.producer.stop()
             self.consumer.stop()

Reply via email to