[ https://issues.apache.org/jira/browse/KAFKA-6772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442175#comment-16442175 ]
ASF GitHub Bot commented on KAFKA-6772: --------------------------------------- rajinisivaram closed pull request #4867: KAFKA-6772: Load credentials from ZK before accepting connections URL: https://github.com/apache/kafka/pull/4867 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index e4fdb089cd1..c0bc5939c08 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -76,12 +76,24 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private var stoppedProcessingRequests = false /** - * Start the socket server + * Start the socket server. Acceptors for all the listeners are started. Processors + * are started if `startupProcessors` is true. If not, processors are only started when + * [[kafka.network.SocketServer#startProcessors()]] is invoked. Delayed starting of processors + * is used to delay processing client connections until server is fully initialized, e.g. + * to ensure that all credentials have been loaded before authentications are performed. + * Acceptors are always started during `startup` so that the bound port is known when this + * method completes even when ephemeral ports are used. Incoming connections on this server + * are processed when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]]. + * + * @param startupProcessors Flag indicating whether `Processor`s must be started. */ - def startup() { + def startup(startupProcessors: Boolean = true) { this.synchronized { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) createAcceptorAndProcessors(config.numNetworkThreads, config.listeners) + if (startupProcessors) { + startProcessors() + } } newGauge("NetworkProcessorAvgIdlePercent", @@ -110,6 +122,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time info("Started " + acceptors.size + " acceptor threads") } + /** + * Starts processors of all the acceptors of this server if they have not already been started. + * This method is used for delayed starting of processors if [[kafka.network.SocketServer#startup]] + * was invoked with `startupProcessors=false`. + */ + def startProcessors(): Unit = synchronized { + acceptors.values.asScala.foreach { _.startProcessors() } + info(s"Started processors for ${acceptors.size} acceptors") + } + private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap private def createAcceptorAndProcessors(processorsPerListener: Int, @@ -196,6 +218,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized { info(s"Adding listeners for endpoints $listenersAdded") createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded) + startProcessors() } def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized { @@ -307,13 +330,25 @@ private[kafka] class Acceptor(val endPoint: EndPoint, private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) private val processors = new ArrayBuffer[Processor]() + private val processorsStarted = new AtomicBoolean private[network] def addProcessors(newProcessors: Buffer[Processor]): Unit = synchronized { - newProcessors.foreach { processor => + processors ++= newProcessors + if (processorsStarted.get) + startProcessors(newProcessors) + } + + private[network] def startProcessors(): Unit = synchronized { + if (!processorsStarted.getAndSet(true)) { + startProcessors(processors) + } + } + + private def startProcessors(processors: Seq[Processor]): Unit = synchronized { + processors.foreach { processor => KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", processor).start() } - processors ++= newProcessors } private[network] def removeProcessors(removeCount: Int, requestChannel: RequestChannel): Unit = synchronized { @@ -328,7 +363,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint, override def shutdown(): Unit = { super.shutdown() - processors.foreach(_.shutdown()) + synchronized { + processors.foreach(_.shutdown()) + } } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a0d2c799e6a..c729c8c90f7 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -243,8 +243,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) + // Create and start the socket server acceptor threads so that the bound port is known. + // Delay starting processors until the end of the initialization sequence to ensure + // that credentials have been loaded before processing authentications. socketServer = new SocketServer(config, metrics, time, credentialProvider) - socketServer.startup() + socketServer.startup(startupProcessors = false) /* start replica manager */ replicaManager = createReplicaManager(isShuttingDown) @@ -310,7 +313,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers) dynamicConfigManager.startup() - + socketServer.startProcessors() brokerState.newState(RunningAsBroker) shutdownLatch = new CountDownLatch(1) startupComplete.set(true) diff --git a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala new file mode 100644 index 00000000000..18b4f8e23b8 --- /dev/null +++ b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server +import java.util.Collections + +import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup} +import kafka.utils._ +import kafka.zk.ConfigEntityChangeNotificationZNode +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.JavaConverters._ + +/** + * Tests that there are no failed authentications during broker startup. This is to verify + * that SCRAM credentials are loaded by brokers before client connections can be made. + * For simplicity of testing, this test verifies authentications of controller connections. + */ +class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup { + + override val producerCount = 0 + override val consumerCount = 0 + override val serverCount = 1 + + private val kafkaClientSaslMechanism = "SCRAM-SHA-256" + private val kafkaServerSaslMechanisms = Collections.singletonList("SCRAM-SHA-256").asScala + + override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT + + override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) + + override def configureSecurityBeforeServersStart() { + super.configureSecurityBeforeServersStart() + zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) + // Create credentials before starting brokers + createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) + + startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), KafkaSasl)) + } + + @Test + def testAuthentications(): Unit = { + val successfulAuths = totalAuthentications("successful-authentication-total") + assertTrue("No successful authentications", successfulAuths > 0) + val failedAuths = totalAuthentications("failed-authentication-total") + assertEquals(0, failedAuths) + } + + private def totalAuthentications(metricName: String): Int = { + val allMetrics = servers.head.metrics.metrics + val totalAuthCount = allMetrics.values().asScala.filter(_.metricName().name() == metricName) + .foldLeft(0.0)((total, metric) => total + metric.metricValue.asInstanceOf[Double]) + totalAuthCount.toInt + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broker should load credentials from ZK before requests are allowed > ------------------------------------------------------------------ > > Key: KAFKA-6772 > URL: https://issues.apache.org/jira/browse/KAFKA-6772 > Project: Kafka > Issue Type: Improvement > Affects Versions: 1.0.0, 1.1.0, 1.0.1 > Reporter: Ismael Juma > Assignee: Rajini Sivaram > Priority: Major > Fix For: 1.0.2, 1.2.0, 1.1.1 > > > It is currently possible for clients to get an AuthenticationException during > start-up if the brokers have not yet loaded credentials from ZK. This > definitely affects SCRAM, but it may also affect delegation tokens. -- This message was sent by Atlassian JIRA (v7.6.3#76005)