This is an automated email from the ASF dual-hosted git repository.
boyang pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 77e3ca0 KAFKA-12381: remove live broker checks for forwarding topic
creation (#10240)
77e3ca0 is described below
commit 77e3ca019efe38195ec28ddab87aed52e1c403c8
Author: Boyang Chen <[email protected]>
AuthorDate: Fri Mar 5 15:55:14 2021 -0800
KAFKA-12381: remove live broker checks for forwarding topic creation
(#10240)
Removed broker number checks for invalid replication factor when doing the
forwarding, in order to reduce false alarms for clients.
Reviewers: Jason Gustafson <[email protected]>
---
.../kafka/server/AutoTopicCreationManager.scala | 33 +-----
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
.../server/AutoTopicCreationManagerTest.scala | 118 ++++++++++++++++++---
tests/kafkatest/tests/core/security_test.py | 7 +-
4 files changed, 110 insertions(+), 50 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 01dabed..f8063a8 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.server.metadata.MetadataBroker
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.errors.InvalidTopicException
@@ -82,14 +81,13 @@ object AutoTopicCreationManager {
))
else
None
- new DefaultAutoTopicCreationManager(config, metadataCache, channelManager,
adminManager,
+ new DefaultAutoTopicCreationManager(config, channelManager, adminManager,
controller, groupCoordinator, txnCoordinator)
}
}
class DefaultAutoTopicCreationManager(
config: KafkaConfig,
- metadataCache: MetadataCache,
channelManager: Option[BrokerToControllerChannelManager],
adminManager: Option[ZkAdminManager],
controller: Option[KafkaController],
@@ -266,7 +264,6 @@ class DefaultAutoTopicCreationManager(
topics: Set[String]
): (Map[String, CreatableTopic], Seq[MetadataResponseTopic]) = {
- val aliveBrokers = metadataCache.getAliveBrokers
val creatableTopics = mutable.Map.empty[String, CreatableTopic]
val uncreatableTopics = mutable.Buffer.empty[MetadataResponseTopic]
@@ -274,8 +271,6 @@ class DefaultAutoTopicCreationManager(
// Attempt basic topic validation before sending any requests to the
controller.
val validationError: Option[Errors] = if (!isValidTopicName(topic)) {
Some(Errors.INVALID_TOPIC_EXCEPTION)
- } else if (!hasEnoughLiveBrokers(topic, aliveBrokers)) {
- Some(Errors.INVALID_REPLICATION_FACTOR)
} else if (!inflightTopics.add(topic)) {
Some(Errors.UNKNOWN_TOPIC_OR_PARTITION)
} else {
@@ -295,30 +290,4 @@ class DefaultAutoTopicCreationManager(
(creatableTopics, uncreatableTopics)
}
-
- private def hasEnoughLiveBrokers(
- topicName: String,
- aliveBrokers: Seq[MetadataBroker]
- ): Boolean = {
- val (replicationFactor, replicationFactorConfig) = topicName match {
- case GROUP_METADATA_TOPIC_NAME =>
- (config.offsetsTopicReplicationFactor.intValue,
KafkaConfig.OffsetsTopicReplicationFactorProp)
-
- case TRANSACTION_STATE_TOPIC_NAME =>
- (config.transactionTopicReplicationFactor.intValue,
KafkaConfig.TransactionsTopicReplicationFactorProp)
-
- case _ =>
- (config.defaultReplicationFactor,
KafkaConfig.DefaultReplicationFactorProp)
- }
-
- if (aliveBrokers.size < replicationFactor) {
- error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the
required replication factor " +
- s"'$replicationFactor' for auto creation of topic '$topicName' which
is configured by $replicationFactorConfig. " +
- "This error can be ignored if the cluster is starting up and not all
brokers are up yet.")
- false
- } else {
- true
- }
- }
-
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 4c0aaab..e7f7a12 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -248,7 +248,7 @@ class BrokerServer(
val autoTopicCreationChannelManager =
BrokerToControllerChannelManager(controllerNodeProvider,
time, metrics, config, "autocreate", threadNamePrefix, 60000)
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config, metadataCache, Some(autoTopicCreationChannelManager), None,
None,
+ config, Some(autoTopicCreationChannelManager), None, None,
groupCoordinator, transactionCoordinator)
autoTopicCreationManager.start()
diff --git
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 9f9749b..0bd97c3 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -24,6 +24,7 @@ import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.TestUtils
import kafka.utils.TestUtils.createBroker
+import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
TRANSACTION_STATE_TOPIC_NAME}
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
@@ -33,6 +34,7 @@ import org.apache.kafka.common.requests._
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
+import org.mockito.invocation.InvocationOnMock
import org.mockito.{ArgumentMatchers, Mockito}
import scala.collection.{Map, Seq}
@@ -94,7 +96,6 @@ class AutoTopicCreationManagerTest {
replicationFactor: Short = 1): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- metadataCache,
Some(brokerToController),
Some(adminManager),
Some(controller),
@@ -123,7 +124,6 @@ class AutoTopicCreationManagerTest {
def testCreateTopicsWithForwardingDisabled(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- metadataCache,
None,
Some(adminManager),
Some(controller),
@@ -146,29 +146,117 @@ class AutoTopicCreationManagerTest {
}
@Test
- def testNotEnoughLiveBrokers(): Unit = {
- val props = TestUtils.createBrokerConfig(1, "localhost")
- props.setProperty(KafkaConfig.DefaultReplicationFactorProp, 3.toString)
- config = KafkaConfig.fromProps(props)
+ def testInvalidReplicationFactorForNonInternalTopics(): Unit = {
+ testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR, "topic",
isInternal = false)
+ }
+
+ @Test
+ def testInvalidReplicationFactorForConsumerOffsetsTopic(): Unit = {
+ Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new
Properties)
+ testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR,
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
+ }
+
+ @Test
+ def testInvalidReplicationFactorForTxnOffsetTopic(): Unit = {
+
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new
Properties)
+ testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR,
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
+ }
+
+ @Test
+ def testTopicExistsErrorSwapForNonInternalTopics(): Unit = {
+ testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS, "topic", isInternal
= false,
+ expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+ }
+
+ @Test
+ def testTopicExistsErrorSwapForConsumerOffsetsTopic(): Unit = {
+ Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new
Properties)
+ testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS,
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
+ expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+ }
+
+ @Test
+ def testTopicExistsErrorSwapForTxnOffsetTopic(): Unit = {
+
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new
Properties)
+ testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS,
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true,
+ expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+ }
+
+ @Test
+ def testRequestTimeoutErrorSwapForNonInternalTopics(): Unit = {
+ testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT, "topic", isInternal =
false,
+ expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+ }
+
+ @Test
+ def testRequestTimeoutErrorSwapForConsumerOffsetTopic(): Unit = {
+ Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new
Properties)
+ testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT,
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
+ expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+ }
+
+ @Test
+ def testRequestTimeoutErrorSwapForTxnOffsetTopic(): Unit = {
+
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new
Properties)
+ testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT,
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true,
+ expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
+ }
+
+ @Test
+ def testUnknownTopicPartitionForNonIntervalTopic(): Unit = {
+ testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, "topic",
isInternal = false)
+ }
+ @Test
+ def testUnknownTopicPartitionForConsumerOffsetTopic(): Unit = {
+ Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new
Properties)
+ testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION,
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
+ }
+
+ @Test
+ def testUnknownTopicPartitionForTxnOffsetTopic(): Unit = {
+
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new
Properties)
+ testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION,
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
+ }
+
+ private def testErrorWithCreationInZk(error: Errors,
+ topicName: String,
+ isInternal: Boolean,
+ expectedError: Option[Errors] = None):
Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- metadataCache,
- Some(brokerToController),
+ None,
Some(adminManager),
Some(controller),
groupCoordinator,
transactionCoordinator)
- val topicName = "topic"
-
Mockito.when(controller.isActive).thenReturn(false)
+ val newTopic = if (isInternal) {
+ topicName match {
+ case Topic.GROUP_METADATA_TOPIC_NAME => getNewTopic(topicName,
+ numPartitions = config.offsetsTopicPartitions, replicationFactor =
config.offsetsTopicReplicationFactor)
+ case Topic.TRANSACTION_STATE_TOPIC_NAME => getNewTopic(topicName,
+ numPartitions = config.transactionTopicPartitions, replicationFactor
= config.transactionTopicReplicationFactor)
+ }
+ } else {
+ getNewTopic(topicName)
+ }
+
+ val topicErrors = if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) null else
+ Map(topicName -> new ApiError(error))
+ Mockito.when(adminManager.createTopics(
+ ArgumentMatchers.eq(0),
+ ArgumentMatchers.eq(false),
+ ArgumentMatchers.eq(Map(topicName -> newTopic)),
+ ArgumentMatchers.eq(Map.empty),
+ any(classOf[ControllerMutationQuota]),
+ any(classOf[Map[String, ApiError] => Unit]))).thenAnswer((invocation:
InvocationOnMock) => {
+ invocation.getArgument(5).asInstanceOf[Map[String, ApiError] => Unit]
+ .apply(topicErrors)
+ })
- createTopicAndVerifyResult(Errors.INVALID_REPLICATION_FACTOR, topicName,
false)
-
- Mockito.verify(brokerToController, Mockito.never()).sendRequest(
- any(),
- any(classOf[ControllerRequestCompletionHandler]))
+ createTopicAndVerifyResult(expectedError.getOrElse(error), topicName,
isInternal = isInternal)
}
private def createTopicAndVerifyResult(error: Errors,
diff --git a/tests/kafkatest/tests/core/security_test.py
b/tests/kafkatest/tests/core/security_test.py
index 0ce12c9..8e2b695 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -65,7 +65,10 @@ class SecurityTest(EndToEndTest):
Test that invalid hostname in certificate results in connection
failures.
When security_protocol=SSL, client SSL handshakes are expected to fail
due to hostname verification failure.
When security_protocol=PLAINTEXT and
interbroker_security_protocol=SSL, controller connections fail
- with hostname verification failure. Hence clients are expected to fail
with LEADER_NOT_AVAILABLE.
+ with hostname verification failure. Since metadata cannot be
propagated in the cluster without a valid certificate,
+ the broker's metadata caches will be empty. Hence we expect Metadata
requests to fail with an INVALID_REPLICATION_FACTOR
+ error since the broker will attempt to create the topic automatically
as it does not exist in the metadata cache,
+ and there will be no online brokers.
"""
# Start Kafka with valid hostnames in the certs' SANs so that we can
create the test topic via the admin client
@@ -111,7 +114,7 @@ class SecurityTest(EndToEndTest):
# expected
pass
- error = 'SSLHandshakeException' if security_protocol == 'SSL' else
'LEADER_NOT_AVAILABLE'
+ error = 'SSLHandshakeException' if security_protocol == 'SSL' else
'INVALID_REPLICATION_FACTOR'
wait_until(lambda:
self.producer_consumer_have_expected_error(error), timeout_sec=30)
self.producer.stop()
self.consumer.stop()