This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fa59be4e770 KAFKA-13649: Implement early.start.listeners and fix
StandardAuthorizer loading (#11969)
fa59be4e770 is described below
commit fa59be4e770627cd34cef85986b58ad7f606928d
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu May 12 14:48:33 2022 -0700
KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer
loading (#11969)
Since the StandardAuthorizer relies on the metadata log to store its ACLs,
we need to be sure that
we have the latest metadata before allowing the authorizer to be used.
However, if the authorizer
is not usable for controllers in the cluster, the latest metadata cannot be
fetched, because
inter-node communication cannot occur. In the initial commit which
introduced StandardAuthorizer,
we punted on the loading issue by allowing the authorizer to be used
immediately. This commit fixes
that by implementing early.start.listeners as specified in KIP-801. This
will allow in superusers
immediately, but throw the new AuthorizerNotReadyException if
non-superusers try to use the
authorizer before StandardAuthorizer#completeInitialLoad is called.
For the broker, we call StandardAuthorizer#completeInitialLoad immediately
after metadata catch-up
is complete, right before unfencing. For the controller, we call
StandardAuthorizer#completeInitialLoad when the node has caught up to the
high water mark of the
cluster metadata partition.
This PR refactors the SocketServer so that it creates the configured
acceptors and processors in
its constructor, rather than requiring a call to SocketServer#startup A new
function,
SocketServer#enableRequestProcessing, then starts the threads and begins
listening on the
configured ports. enableRequestProcessing uses an async model: we will
start the acceptor and
processors associated with an endpoint as soon as that endpoint's
authorizer future is completed.
Also fix a bug where the controller and listener were sharing an Authorizer
when in co-located
mode, which was not intended.
Reviewers: Jason Gustafson <[email protected]>
---
.../common/errors/AuthorizerNotReadyException.java | 28 ++
.../server/authorizer/AuthorizerServerInfo.java | 5 +
core/src/main/scala/kafka/cluster/Broker.scala | 6 +-
core/src/main/scala/kafka/cluster/EndPoint.scala | 6 +
.../main/scala/kafka/network/SocketServer.scala | 533 +++++++++------------
.../kafka/server/BrokerLifecycleManager.scala | 10 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 42 +-
.../main/scala/kafka/server/ControllerServer.scala | 23 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 29 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 5 +-
.../main/scala/kafka/tools/TestRaftServer.scala | 4 +-
.../unit/kafka/network/SocketServerTest.scala | 63 +--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 48 +-
.../apache/kafka/controller/QuorumController.java | 40 +-
.../authorizer/ClusterMetadataAuthorizer.java | 12 +
.../metadata/authorizer/StandardAuthorizer.java | 31 +-
.../authorizer/StandardAuthorizerData.java | 24 +
.../kafka/controller/AclControlManagerTest.java | 10 +
.../kafka/controller/QuorumControllerTest.java | 49 ++
.../authorizer/ClusterMetadataAuthorizerTest.java | 10 +
.../authorizer/StandardAuthorizerTest.java | 170 ++++++-
.../org/apache/kafka/metalog/LocalLogManager.java | 41 +-
.../kafka/metalog/LocalLogManagerTestEnv.java | 34 +-
.../java/org/apache/kafka/raft/RaftClient.java | 6 +
24 files changed, 837 insertions(+), 392 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizerNotReadyException.java
b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizerNotReadyException.java
new file mode 100644
index 00000000000..1c110ef2143
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizerNotReadyException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * An exception that indicates that the authorizer is not ready to receive the
request yet.
+ */
+public class AuthorizerNotReadyException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public AuthorizerNotReadyException() {
+ super();
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizerServerInfo.java
b/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizerServerInfo.java
index 51e23fba57f..eb03c117b6c 100644
---
a/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizerServerInfo.java
+++
b/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizerServerInfo.java
@@ -48,4 +48,9 @@ public interface AuthorizerServerInfo {
* Returns the inter-broker endpoint. This is one of the endpoints
returned by {@link #endpoints()}.
*/
Endpoint interBrokerEndpoint();
+
+ /**
+ * Returns the configured early start listeners.
+ */
+ Collection<String> earlyStartListeners();
}
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala
b/core/src/main/scala/kafka/cluster/Broker.scala
index 657d89b8fe7..9b1d741835c 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -35,7 +35,8 @@ object Broker {
private[kafka] case class ServerInfo(clusterResource: ClusterResource,
brokerId: Int,
endpoints: util.List[Endpoint],
- interBrokerEndpoint: Endpoint)
extends AuthorizerServerInfo
+ interBrokerEndpoint: Endpoint,
+ earlyStartListeners:
util.Set[String]) extends AuthorizerServerInfo
def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker =
{
new Broker(id, endPoints, rack, emptySupportedFeatures)
@@ -93,6 +94,7 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack:
Option[String], featu
val clusterResource: ClusterResource = new ClusterResource(clusterId)
val interBrokerEndpoint: Endpoint =
endPoint(config.interBrokerListenerName).toJava
val brokerEndpoints: util.List[Endpoint] =
endPoints.toList.map(_.toJava).asJava
- Broker.ServerInfo(clusterResource, id, brokerEndpoints,
interBrokerEndpoint)
+ Broker.ServerInfo(clusterResource, id, brokerEndpoints,
interBrokerEndpoint,
+ config.earlyStartListeners.map(_.value()).asJava)
}
}
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala
b/core/src/main/scala/kafka/cluster/EndPoint.scala
index 3e84f9ed834..89c9f5ec3d4 100644
--- a/core/src/main/scala/kafka/cluster/EndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -65,6 +65,12 @@ object EndPoint {
case _ => throw new KafkaException(s"Unable to parse a listener name
from $connectionString")
}
}
+
+ def fromJava(endpoint: JEndpoint): EndPoint =
+ new EndPoint(endpoint.host(),
+ endpoint.port(),
+ new ListenerName(endpoint.listenerName().get()),
+ endpoint.securityProtocol())
}
/**
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 88dfa15b3f5..e91c240415c 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -22,7 +22,6 @@ import java.net._
import java.nio.ByteBuffer
import java.nio.channels.{Selector => NSelector, _}
import java.util
-import java.util.Optional
import java.util.concurrent._
import java.util.concurrent.atomic._
@@ -34,7 +33,7 @@ import kafka.network.RequestChannel.{CloseConnectionResponse,
EndThrottlingRespo
import kafka.network.SocketServer._
import kafka.security.CredentialProvider
import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
-import kafka.utils.Implicits._
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.config.internals.QuotaConfigs
@@ -104,51 +103,34 @@ class SocketServer(val config: KafkaConfig,
private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
- private var startedProcessingRequests = false
- private var stoppedProcessingRequests = false
- // Processors are now created by each Acceptor. However to preserve
compatibility, we need to number the processors
- // globally, so we keep the nextProcessorId counter in SocketServer
- def nextProcessorId(): Int = {
- nextProcessorId.getAndIncrement()
- }
+ /**
+ * A future which is completed once all the authorizer futures are complete.
+ */
+ private val allAuthorizerFuturesComplete = new CompletableFuture[Void]
/**
- * Starts the socket server and creates all the Acceptors and the
Processors. The Acceptors
- * start listening at this stage so that the bound port is known when this
method completes
- * even when ephemeral ports are used. Acceptors and Processors are started
if `startProcessingRequests`
- * is true. If not, acceptors and processors are only started when
[[kafka.network.SocketServer#startProcessingRequests()]]
- * is invoked. Delayed starting of acceptors and processors is used to delay
processing client
- * connections until server is fully initialized, e.g. to ensure that all
credentials have been
- * loaded before authentications are performed. Incoming connections on this
server are processed
- * when processors start up and invoke
[[org.apache.kafka.common.network.Selector#poll]].
- *
- * @param startProcessingRequests Flag indicating whether `Processor`s must
be started.
- * @param controlPlaneListener The control plane listener, or None if
there is none.
- * @param dataPlaneListeners The data plane listeners.
+ * True if the SocketServer is stopped. Must be accessed under the
SocketServer lock.
*/
- def startup(startProcessingRequests: Boolean = true,
- controlPlaneListener: Option[EndPoint] =
config.controlPlaneListener,
- dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners):
Unit = {
- this.synchronized {
- createControlPlaneAcceptorAndProcessor(controlPlaneListener)
- createDataPlaneAcceptorsAndProcessors(dataPlaneListeners)
- if (startProcessingRequests) {
- this.startProcessingRequests()
- }
- }
+ private var stopped = false
+ // Socket server metrics
+ newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent",
() => SocketServer.this.synchronized {
val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a =>
a.processors)
- val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a =>
a.processors(0))
-
newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", ()
=> SocketServer.this.synchronized {
- val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
- metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
- }
+ val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
+ metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
+ }
+ if (dataPlaneProcessors.isEmpty) {
+ 1.0
+ } else {
ioWaitRatioMetricNames.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(m =>
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
}.sum / dataPlaneProcessors.size
- })
+ }
+ })
+ if (config.requiresZookeeper) {
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent",
() => SocketServer.this.synchronized {
+ val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a =>
a.processors(0))
val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
}
@@ -156,17 +138,21 @@ class SocketServer(val config: KafkaConfig,
Option(metrics.metric(metricName)).fold(0.0)(m =>
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
}.getOrElse(Double.NaN)
})
- newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
- newGauge("MemoryPoolUsed", () => memoryPool.size() -
memoryPool.availableMemory)
-
newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", ()
=> SocketServer.this.synchronized {
- val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map {
p =>
- metrics.metricName("expired-connections-killed-count", MetricsGroup,
p.metricTags)
- }
- expiredConnectionsKilledCountMetricNames.map { metricName =>
- Option(metrics.metric(metricName)).fold(0.0)(m =>
m.metricValue.asInstanceOf[Double])
- }.sum
- })
+ }
+ newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
+ newGauge("MemoryPoolUsed", () => memoryPool.size() -
memoryPool.availableMemory)
+ newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount",
() => SocketServer.this.synchronized {
+ val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a =>
a.processors)
+ val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p
=>
+ metrics.metricName("expired-connections-killed-count", MetricsGroup,
p.metricTags)
+ }
+ expiredConnectionsKilledCountMetricNames.map { metricName =>
+ Option(metrics.metric(metricName)).fold(0.0)(m =>
m.metricValue.asInstanceOf[Double])
+ }.sum
+ })
+ if (config.requiresZookeeper) {
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount",
() => SocketServer.this.synchronized {
+ val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a =>
a.processors(0))
val expiredConnectionsKilledCountMetricNames =
controlPlaneProcessorOpt.map { p =>
metrics.metricName("expired-connections-killed-count", MetricsGroup,
p.metricTags)
}
@@ -176,112 +162,86 @@ class SocketServer(val config: KafkaConfig,
})
}
- /**
- * Start processing requests and new connections. This method is used for
delayed starting of
- * all the acceptors and processors if
[[kafka.network.SocketServer#startup]] was invoked with
- * `startProcessingRequests=false`.
- *
- * Before starting processors for each endpoint, we ensure that authorizer
has all the metadata
- * to authorize requests on that endpoint by waiting on the provided future.
We start inter-broker
- * listener before other listeners. This allows authorization metadata for
other listeners to be
- * stored in Kafka topics in this cluster.
- *
- * @param authorizerFutures Future per [[EndPoint]] used to wait before
starting the processor
- * corresponding to the [[EndPoint]]
- */
- def startProcessingRequests(authorizerFutures: Map[Endpoint,
CompletableFuture[Void]] = Map.empty): Unit = {
- info("Starting socket server acceptors and processors")
- this.synchronized {
- if (!startedProcessingRequests) {
- startControlPlaneProcessorAndAcceptor(authorizerFutures)
- startDataPlaneProcessorsAndAcceptors(authorizerFutures)
- startedProcessingRequests = true
- } else {
- info("Socket server acceptors and processors already started")
- }
- }
- info("Started socket server acceptors and processors")
+ // Create acceptors and processors for the statically configured endpoints
when the
+ // SocketServer is constructed. Note that this just opens the ports and
creates the data
+ // structures. It does not start the acceptors and processors or their
associated JVM
+ // threads.
+ if (apiVersionManager.listenerType.equals(ListenerType.CONTROLLER)) {
+ config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors)
+ } else {
+ config.controlPlaneListener.foreach(createControlPlaneAcceptorAndProcessor)
+ config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors)
}
- /**
- * Starts processors of the provided acceptor and the acceptor itself.
- *
- * Before starting them, we ensure that authorizer has all the metadata to
authorize
- * requests on that endpoint by waiting on the provided future.
- */
- private def startAcceptorAndProcessors(acceptor: Acceptor,
- authorizerFutures: Map[Endpoint,
CompletableFuture[Void]] = Map.empty): Unit = {
- val endpoint = acceptor.endPoint
- debug(s"Wait for authorizer to complete start up on listener
${endpoint.listenerName}")
- waitForAuthorizerFuture(acceptor, authorizerFutures)
- debug(s"Start processors on listener ${endpoint.listenerName}")
- acceptor.startProcessors()
- debug(s"Start acceptor thread on listener ${endpoint.listenerName}")
- if (!acceptor.isStarted()) {
- KafkaThread.nonDaemon(
-
s"${acceptor.threadPrefix()}-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}",
- acceptor
- ).start()
- acceptor.awaitStartup()
- }
- info(s"Started ${acceptor.threadPrefix()} acceptor and processor(s) for
endpoint : ${endpoint.listenerName}")
+ // Processors are now created by each Acceptor. However to preserve
compatibility, we need to number the processors
+ // globally, so we keep the nextProcessorId counter in SocketServer
+ def nextProcessorId(): Int = {
+ nextProcessorId.getAndIncrement()
}
/**
- * Starts processors of all the data-plane acceptors and all the acceptors
of this server.
+ * This method enables request processing for all endpoints managed by this
SocketServer. Each
+ * endpoint will be brought up asynchronously as soon as its associated
future is completed.
+ * Therefore, we do not know that any particular request processor will be
running by the end of
+ * this function -- just that it might be running.
*
- * We start inter-broker listener before other listeners. This allows
authorization metadata for
- * other listeners to be stored in Kafka topics in this cluster.
+ * @param authorizerFutures Future per [[EndPoint]] used to wait before
starting the
+ * processor corresponding to the [[EndPoint]].
Any endpoint
+ * that does not appear in this map will be
started once all
+ * authorizerFutures are complete.
*/
- private def startDataPlaneProcessorsAndAcceptors(authorizerFutures:
Map[Endpoint, CompletableFuture[Void]]): Unit = {
- val interBrokerListener = dataPlaneAcceptors.asScala.keySet
- .find(_.listenerName == config.interBrokerListenerName)
- val orderedAcceptors = interBrokerListener match {
- case Some(interBrokerListener) =>
List(dataPlaneAcceptors.get(interBrokerListener)) ++
- dataPlaneAcceptors.asScala.filter { case (k, _) => k !=
interBrokerListener }.values
- case None => dataPlaneAcceptors.asScala.values
- }
- orderedAcceptors.foreach { acceptor =>
- startAcceptorAndProcessors(acceptor, authorizerFutures)
+ def enableRequestProcessing(
+ authorizerFutures: Map[Endpoint, CompletableFuture[Void]]
+ ): Unit = this.synchronized {
+ if (stopped) {
+ throw new RuntimeException("Can't enable request processing:
SocketServer is stopped.")
+ }
+
+ def chainAcceptorFuture(acceptor: Acceptor): Unit = {
+ // Because of ephemeral ports, we need to match acceptors to futures by
looking at
+ // the listener name, rather than the endpoint object.
+ authorizerFutures.find {
+ case (endpoint, _) =>
acceptor.endPoint.listenerName.value().equals(endpoint.listenerName().get())
+ } match {
+ case None => chainFuture(allAuthorizerFuturesComplete,
acceptor.startFuture)
+ case Some((_, future)) => chainFuture(future, acceptor.startFuture)
+ }
}
- }
- /**
- * Start the processor of control-plane acceptor and the acceptor of this
server.
- */
- private def startControlPlaneProcessorAndAcceptor(authorizerFutures:
Map[Endpoint, CompletableFuture[Void]]): Unit = {
- controlPlaneAcceptorOpt.foreach { controlPlaneAcceptor =>
- startAcceptorAndProcessors(controlPlaneAcceptor, authorizerFutures)
- }
+ info("Enabling request processing.")
+ controlPlaneAcceptorOpt.foreach(chainAcceptorFuture)
+ dataPlaneAcceptors.values().forEach(chainAcceptorFuture)
+ chainFuture(CompletableFuture.allOf(authorizerFutures.values.toArray: _*),
+ allAuthorizerFuturesComplete)
}
- private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
-
- def createDataPlaneAcceptorsAndProcessors(endpoints: Seq[EndPoint]): Unit = {
- endpoints.foreach { endpoint =>
- val parsedConfigs =
config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
- connectionQuotas.addListener(config, endpoint.listenerName)
-
- val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty &&
config.interBrokerListenerName == endpoint.listenerName
-
- val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint,
isPrivilegedListener, dataPlaneRequestChannel)
- config.addReconfigurable(dataPlaneAcceptor)
- dataPlaneAcceptor.configure(parsedConfigs)
- dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
- info(s"Created data-plane acceptor and processors for endpoint :
${endpoint.listenerName}")
+ def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit =
synchronized {
+ if (stopped) {
+ throw new RuntimeException("Can't create new data plane acceptor and
processors: SocketServer is stopped.")
}
+ val parsedConfigs =
config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
+ connectionQuotas.addListener(config, endpoint.listenerName)
+ val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty &&
+ config.interBrokerListenerName == endpoint.listenerName
+ val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint,
isPrivilegedListener, dataPlaneRequestChannel)
+ config.addReconfigurable(dataPlaneAcceptor)
+ dataPlaneAcceptor.configure(parsedConfigs)
+ dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
+ info(s"Created data-plane acceptor and processors for endpoint :
${endpoint.listenerName}")
}
- private def createControlPlaneAcceptorAndProcessor(endpointOpt:
Option[EndPoint]): Unit = {
- endpointOpt.foreach { endpoint =>
- connectionQuotas.addListener(config, endpoint.listenerName)
- val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint,
controlPlaneRequestChannelOpt.get)
- controlPlaneAcceptor.addProcessors(1)
- controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
- info(s"Created control-plane acceptor and processor for endpoint :
${endpoint.listenerName}")
+ private def createControlPlaneAcceptorAndProcessor(endpoint: EndPoint): Unit
= synchronized {
+ if (stopped) {
+ throw new RuntimeException("Can't create new control plane acceptor and
processor: SocketServer is stopped.")
}
+ connectionQuotas.addListener(config, endpoint.listenerName)
+ val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint,
controlPlaneRequestChannelOpt.get)
+ controlPlaneAcceptor.addProcessors(1)
+ controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
+ info(s"Created control-plane acceptor and processor for endpoint :
${endpoint.listenerName}")
}
+ private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
protected def createDataPlaneAcceptor(endPoint: EndPoint,
isPrivilegedListener: Boolean, requestChannel: RequestChannel):
DataPlaneAcceptor = {
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas,
time, isPrivilegedListener, requestChannel, metrics, credentialProvider,
logContext, memoryPool, apiVersionManager)
@@ -294,18 +254,18 @@ class SocketServer(val config: KafkaConfig,
/**
* Stop processing requests and new connections.
*/
- def stopProcessingRequests(): Unit = {
- info("Stopping socket server request processors")
- this.synchronized {
- dataPlaneAcceptors.asScala.values.foreach(_.initiateShutdown())
- dataPlaneAcceptors.asScala.values.foreach(_.awaitShutdown())
- controlPlaneAcceptorOpt.foreach(_.initiateShutdown())
- controlPlaneAcceptorOpt.foreach(_.awaitShutdown())
+ def stopProcessingRequests(): Unit = synchronized {
+ if (!stopped) {
+ stopped = true
+ info("Stopping socket server request processors")
+ dataPlaneAcceptors.asScala.values.foreach(_.beginShutdown())
+ controlPlaneAcceptorOpt.foreach(_.beginShutdown())
+ dataPlaneAcceptors.asScala.values.foreach(_.close())
+ controlPlaneAcceptorOpt.foreach(_.close())
dataPlaneRequestChannel.clear()
controlPlaneRequestChannelOpt.foreach(_.clear())
- stoppedProcessingRequests = true
+ info("Stopped socket server request processors")
}
- info("Stopped socket server request processors")
}
/**
@@ -314,9 +274,10 @@ class SocketServer(val config: KafkaConfig,
*/
def shutdown(): Unit = {
info("Shutting down socket server")
+ allAuthorizerFuturesComplete.completeExceptionally(new
TimeoutException("The socket " +
+ "server was shut down before the Authorizer could be completely
initialized."))
this.synchronized {
- if (!stoppedProcessingRequests)
- stopProcessingRequests()
+ stopProcessingRequests()
dataPlaneRequestChannel.shutdown()
controlPlaneRequestChannelOpt.foreach(_.shutdown())
connectionQuotas.close()
@@ -338,12 +299,20 @@ class SocketServer(val config: KafkaConfig,
}
}
+ /**
+ * This method is called to dynamically add listeners.
+ */
def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
+ if (stopped) {
+ throw new RuntimeException("can't add new listeners: SocketServer is
stopped.")
+ }
info(s"Adding data-plane listeners for endpoints $listenersAdded")
- createDataPlaneAcceptorsAndProcessors(listenersAdded)
listenersAdded.foreach { endpoint =>
+ createDataPlaneAcceptorAndProcessors(endpoint)
val acceptor = dataPlaneAcceptors.get(endpoint)
- startAcceptorAndProcessors(acceptor)
+ // There is no authorizer future for this new listener endpoint. So
start the
+ // listener once all authorizer futures are complete.
+ chainFuture(allAuthorizerFuturesComplete, acceptor.startFuture)
}
}
@@ -352,8 +321,8 @@ class SocketServer(val config: KafkaConfig,
listenersRemoved.foreach { endpoint =>
connectionQuotas.removeListener(config, endpoint.listenerName)
dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
- acceptor.initiateShutdown()
- acceptor.awaitShutdown()
+ acceptor.beginShutdown()
+ acceptor.close()
}
}
}
@@ -387,15 +356,6 @@ class SocketServer(val config: KafkaConfig,
}
}
- private def waitForAuthorizerFuture(acceptor: Acceptor,
- authorizerFutures: Map[Endpoint,
CompletableFuture[Void]]): Unit = {
- //we can't rely on authorizerFutures.get() due to ephemeral ports. Get the
future using listener name
- authorizerFutures.forKeyValue { (endpoint, future) =>
- if (endpoint.listenerName ==
Optional.of(acceptor.endPoint.listenerName.value))
- future.join()
- }
- }
-
// For test usage
private[network] def connectionCount(address: InetAddress): Int =
Option(connectionQuotas).fold(0)(_.get(address))
@@ -420,80 +380,22 @@ object SocketServer {
KafkaConfig.MaxConnectionCreationRateProp)
val ListenerReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsProp,
KafkaConfig.MaxConnectionCreationRateProp)
-}
-/**
- * A base class with some helper variables and methods
- */
-private[kafka] abstract class AbstractServerThread(connectionQuotas:
ConnectionQuotas) extends Runnable with Logging {
-
- private val startupLatch = new CountDownLatch(1)
-
- // `shutdown()` is invoked before `startupComplete` and `shutdownComplete`
if an exception is thrown in the constructor
- // (e.g. if the address is already in use). We want `shutdown` to proceed in
such cases, so we first assign an open
- // latch and then replace it in `startupComplete()`.
- @volatile private var shutdownLatch = new CountDownLatch(0)
-
- private val alive = new AtomicBoolean(true)
-
- def wakeup(): Unit
-
- /**
- * Initiates a graceful shutdown by signaling to stop
- */
- def initiateShutdown(): Unit = {
- if (alive.getAndSet(false))
- wakeup()
+ def closeSocket(
+ channel: SocketChannel,
+ logging: Logging
+ ): Unit = {
+ CoreUtils.swallow(channel.socket().close(), logging, Level.ERROR)
+ CoreUtils.swallow(channel.close(), logging, Level.ERROR)
}
- /**
- * Wait for the thread to completely shutdown
- */
- def awaitShutdown(): Unit = shutdownLatch.await
-
- /**
- * Returns true if the thread is completely started
- */
- def isStarted(): Boolean = startupLatch.getCount == 0
-
- /**
- * Wait for the thread to completely start up
- */
- def awaitStartup(): Unit = startupLatch.await
-
- /**
- * Record that the thread startup is complete
- */
- protected def startupComplete(): Unit = {
- // Replace the open latch with a closed one
- shutdownLatch = new CountDownLatch(1)
- startupLatch.countDown()
- }
-
- /**
- * Record that the thread shutdown is complete
- */
- protected def shutdownComplete(): Unit = shutdownLatch.countDown()
-
- /**
- * Is the server still running?
- */
- protected def isRunning: Boolean = alive.get
-
- /**
- * Close `channel` and decrement the connection count.
- */
- def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
- if (channel != null) {
- debug(s"Closing connection from
${channel.socket.getRemoteSocketAddress}")
- connectionQuotas.dec(listenerName, channel.socket.getInetAddress)
- closeSocket(channel)
- }
- }
-
- protected def closeSocket(channel: SocketChannel): Unit = {
- CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
- CoreUtils.swallow(channel.close(), this, Level.ERROR)
+ def chainFuture(sourceFuture: CompletableFuture[Void],
+ destinationFuture: CompletableFuture[Void]): Unit = {
+ sourceFuture.whenComplete((_, t) => if (t != null) {
+ destinationFuture.completeExceptionally(t)
+ } else {
+ destinationFuture.complete(null)
+ })
}
}
@@ -650,7 +552,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
val endPoint: EndPoint,
var config: KafkaConfig,
nodeId: Int,
- connectionQuotas: ConnectionQuotas,
+ val connectionQuotas: ConnectionQuotas,
time: Time,
isPrivilegedListener: Boolean,
requestChannel: RequestChannel,
@@ -659,7 +561,9 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
logContext: LogContext,
memoryPool: MemoryPool,
apiVersionManager: ApiVersionManager)
- extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+ extends Runnable with Logging with KafkaMetricsGroup {
+
+ val shouldRun = new AtomicBoolean(true)
def metricPrefix(): String
def threadPrefix(): String
@@ -671,7 +575,6 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
private val nioSelector = NSelector.open()
private[network] val serverChannel = openServerSocket(endPoint.host,
endPoint.port, listenBacklogSize)
private[network] val processors = new ArrayBuffer[Processor]()
- private val processorsStarted = new AtomicBoolean
// Build the metric name explicitly in order to keep the existing name for
compatibility
private val blockedPercentMeterMetricName = explicitMetricName(
"kafka.network",
@@ -681,24 +584,27 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
private val blockedPercentMeter =
newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0
private[network] val throttledSockets = new
mutable.PriorityQueue[DelayedCloseSocket]()
+ private var started = false
+ private[network] val startFuture = new CompletableFuture[Void]()
- private[network] case class DelayedCloseSocket(socket: SocketChannel,
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
- override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs
compare that.endThrottleTimeMs
- }
+ val thread = KafkaThread.nonDaemon(
+
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
+ this)
- private[network] def startProcessors(): Unit = synchronized {
- if (!processorsStarted.getAndSet(true)) {
- startProcessors(processors)
+ startFuture.thenRun(() => synchronized {
+ if (!shouldRun.get()) {
+ debug(s"Ignoring start future for ${endPoint.listenerName} since the
acceptor has already been shut down.")
+ } else {
+ debug(s"Starting processors for listener ${endPoint.listenerName}")
+ started = true
+ processors.foreach(_.start())
+ debug(s"Starting acceptor thread for listener ${endPoint.listenerName}")
+ thread.start()
}
- }
+ })
- private def startProcessors(processors: Seq[Processor]): Unit = synchronized
{
- processors.foreach { processor =>
- KafkaThread.nonDaemon(
-
s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
- processor
- ).start()
- }
+ private[network] case class DelayedCloseSocket(socket: SocketChannel,
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
+ override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs
compare that.endThrottleTimeMs
}
private[network] def removeProcessors(removeCount: Int): Unit = synchronized
{
@@ -707,33 +613,34 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
// The processors are then removed from `requestChannel` and any pending
responses to these processors are dropped.
val toRemove = processors.takeRight(removeCount)
processors.remove(processors.size - removeCount, removeCount)
- toRemove.foreach(_.initiateShutdown())
- toRemove.foreach(_.awaitShutdown())
+ toRemove.foreach(_.close())
toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
}
- override def initiateShutdown(): Unit = {
- super.initiateShutdown()
- synchronized {
- processors.foreach(_.initiateShutdown())
+ def beginShutdown(): Unit = {
+ if (shouldRun.getAndSet(false)) {
+ wakeup()
+ synchronized {
+ processors.foreach(_.beginShutdown())
+ }
}
}
- override def awaitShutdown(): Unit = {
- super.awaitShutdown()
+ def close(): Unit = {
+ beginShutdown()
+ thread.join()
synchronized {
- processors.foreach(_.awaitShutdown())
+ processors.foreach(_.close())
}
}
/**
* Accept loop that checks for new connection attempts
*/
- def run(): Unit = {
+ override def run(): Unit = {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
- startupComplete()
try {
- while (isRunning) {
+ while (shouldRun.get()) {
try {
acceptNewConnections()
closeThrottledConnections()
@@ -750,9 +657,8 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
debug("Closing server socket, selector, and any throttled sockets.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
- throttledSockets.foreach(throttledSocket =>
closeSocket(throttledSocket.socket))
+ throttledSockets.foreach(throttledSocket =>
closeSocket(throttledSocket.socket, this))
throttledSockets.clear()
- shutdownComplete()
}
}
@@ -788,7 +694,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
- while (iter.hasNext && isRunning) {
+ while (iter.hasNext && shouldRun.get()) {
try {
val key = iter.next
iter.remove()
@@ -833,7 +739,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
} catch {
case e: TooManyConnectionsException =>
info(s"Rejected connection from ${e.ip}, address already has the
configured maximum of ${e.count} connections.")
- close(endPoint.listenerName, socketChannel)
+ connectionQuotas.closeChannel(this, endPoint.listenerName,
socketChannel)
None
case e: ConnectionThrottledException =>
val ip = socketChannel.socket.getInetAddress
@@ -843,7 +749,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
None
case e: IOException =>
error(s"Encountered an error while configuring the connection, closing
it.", e)
- close(endPoint.listenerName, socketChannel)
+ connectionQuotas.closeChannel(this, endPoint.listenerName,
socketChannel)
None
}
}
@@ -864,7 +770,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
while (throttledSockets.headOption.exists(_.endThrottleTimeMs < timeMs)) {
val closingSocket = throttledSockets.dequeue()
debug(s"Closing socket from ip ${closingSocket.socket.getRemoteAddress}")
- closeSocket(closingSocket.socket)
+ closeSocket(closingSocket.socket, this)
}
}
@@ -882,10 +788,9 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
/**
* Wakeup the thread for selection.
*/
- @Override
def wakeup(): Unit = nioSelector.wakeup()
- def addProcessors(toCreate: Int): Unit = {
+ def addProcessors(toCreate: Int): Unit = synchronized {
val listenerName = endPoint.listenerName
val securityProtocol = endPoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
@@ -894,14 +799,16 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
val processor = newProcessor(socketServer.nextProcessorId(),
listenerName, securityProtocol)
listenerProcessors += processor
requestChannel.addProcessor(processor)
- }
+ if (started) {
+ processor.start()
+ }
+ }
processors ++= listenerProcessors
- if (processorsStarted.get)
- startProcessors(listenerProcessors)
}
def newProcessor(id: Int, listenerName: ListenerName, securityProtocol:
SecurityProtocol): Processor = {
+ val name =
s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${id}"
new Processor(id,
time,
config.socketRequestMaxBytes,
@@ -918,9 +825,9 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
logContext,
Processor.ConnectionQueueSize,
isPrivilegedListener,
- apiVersionManager)
+ apiVersionManager,
+ name)
}
-
}
private[kafka] object Processor {
@@ -940,23 +847,29 @@ private[kafka] object Processor {
* forwarding requests; if the control plane is
not defined, the processor
* relying on the inter broker listener would be
acting as the privileged listener.
*/
-private[kafka] class Processor(val id: Int,
- time: Time,
- maxRequestSize: Int,
- requestChannel: RequestChannel,
- connectionQuotas: ConnectionQuotas,
- connectionsMaxIdleMs: Long,
- failedAuthenticationDelayMs: Int,
- listenerName: ListenerName,
- securityProtocol: SecurityProtocol,
- config: KafkaConfig,
- metrics: Metrics,
- credentialProvider: CredentialProvider,
- memoryPool: MemoryPool,
- logContext: LogContext,
- connectionQueueSize: Int,
- isPrivilegedListener: Boolean,
- apiVersionManager: ApiVersionManager) extends
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+private[kafka] class Processor(
+ val id: Int,
+ time: Time,
+ maxRequestSize: Int,
+ requestChannel: RequestChannel,
+ connectionQuotas: ConnectionQuotas,
+ connectionsMaxIdleMs: Long,
+ failedAuthenticationDelayMs: Int,
+ listenerName: ListenerName,
+ securityProtocol: SecurityProtocol,
+ config: KafkaConfig,
+ metrics: Metrics,
+ credentialProvider: CredentialProvider,
+ memoryPool: MemoryPool,
+ logContext: LogContext,
+ connectionQueueSize: Int,
+ isPrivilegedListener: Boolean,
+ apiVersionManager: ApiVersionManager,
+ threadName: String
+) extends Runnable with KafkaMetricsGroup {
+ val shouldRun = new AtomicBoolean(true)
+
+ val thread = KafkaThread.nonDaemon(threadName, this)
private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -1036,9 +949,8 @@ private[kafka] class Processor(val id: Int,
private var nextConnectionIndex = 0
override def run(): Unit = {
- startupComplete()
try {
- while (isRunning) {
+ while (shouldRun.get()) {
try {
// setup any new connections that have been queued up
configureNewConnections()
@@ -1062,7 +974,6 @@ private[kafka] class Processor(val id: Int,
} finally {
debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR)
- shutdownComplete()
}
}
@@ -1325,7 +1236,7 @@ private[kafka] class Processor(val id: Int,
case e: Throwable =>
val remoteAddress = channel.socket.getRemoteSocketAddress
// need to close the channel here to avoid a socket leak.
- close(listenerName, channel)
+ connectionQuotas.closeChannel(this, listenerName, channel)
processException(s"Processor $id closed connection from
$remoteAddress", e)
}
}
@@ -1392,15 +1303,27 @@ private[kafka] class Processor(val id: Int,
private[network] def channel(connectionId: String): Option[KafkaChannel] =
Option(selector.channel(connectionId))
+ def start(): Unit = thread.start()
+
/**
* Wakeup the thread for selection.
*/
- override def wakeup(): Unit = selector.wakeup()
+ def wakeup(): Unit = selector.wakeup()
- override def initiateShutdown(): Unit = {
- super.initiateShutdown()
- removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
- metrics.removeMetric(expiredConnectionsKilledCountMetricName)
+ def beginShutdown(): Unit = {
+ if (shouldRun.getAndSet(false)) {
+ wakeup()
+ }
+ }
+
+ def close(): Unit = {
+ try {
+ beginShutdown()
+ thread.join()
+ } finally {
+ removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
+ metrics.removeMetric(expiredConnectionsKilledCountMetricName)
+ }
}
}
@@ -1864,6 +1787,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time,
metrics: Metrics) extend
sensor
}
}
+
+ /**
+ * Close `channel` and decrement the connection count.
+ */
+ def closeChannel(log: Logging, listenerName: ListenerName, channel:
SocketChannel): Unit = {
+ if (channel != null) {
+ log.debug(s"Closing connection from
${channel.socket.getRemoteSocketAddress}")
+ dec(listenerName, channel.socket.getInetAddress)
+ closeSocket(channel, log)
+ }
+ }
+
}
class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends
KafkaException(s"Too many connections from $ip (maximum = $count)")
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 3ca2704eb3c..39dff71ad11 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -97,6 +97,11 @@ class BrokerLifecycleManager(val config: KafkaConfig,
*/
val initialCatchUpFuture = new CompletableFuture[Void]()
+ /**
+ * A future which is completed when the broker is unfenced for the first
time.
+ */
+ val initialUnfenceFuture = new CompletableFuture[Void]()
+
/**
* A future which is completed when controlled shutdown is done.
*/
@@ -189,8 +194,9 @@ class BrokerLifecycleManager(val config: KafkaConfig,
channelManager, clusterId, advertisedListeners, supportedFeatures))
}
- def setReadyToUnfence(): Unit = {
+ def setReadyToUnfence(): CompletableFuture[Void] = {
eventQueue.append(new SetReadyToUnfenceEvent())
+ initialUnfenceFuture
}
def brokerEpoch: Long = _brokerEpoch
@@ -384,6 +390,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
case BrokerState.RECOVERY =>
if (!message.data().isFenced) {
info(s"The broker has been unfenced. Transitioning from
RECOVERY to RUNNING.")
+ initialUnfenceFuture.complete(null)
_state = BrokerState.RUNNING
} else {
info(s"The broker is in RECOVERY.")
@@ -476,6 +483,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
_state = BrokerState.SHUTTING_DOWN
controlledShutdownFuture.complete(null)
initialCatchUpFuture.cancel(false)
+ initialUnfenceFuture.cancel(false)
if (_channelManager != null) {
_channelManager.shutdown()
_channelManager = null
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 5447e298636..514b94bab15 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -43,6 +43,7 @@ import
org.apache.kafka.common.security.scram.internals.ScramMechanism
import
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint}
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.raft.{RaftClient, RaftConfig}
@@ -183,7 +184,9 @@ class BrokerServer(
config.dynamicConfig.initialize(zkClientOpt = None)
- lifecycleManager = new BrokerLifecycleManager(config, time,
threadNamePrefix)
+ lifecycleManager = new BrokerLifecycleManager(config,
+ time,
+ threadNamePrefix)
/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
@@ -237,7 +240,6 @@ class BrokerServer(
// Delay starting processors until the end of the initialization
sequence to ensure
// that credentials have been loaded before processing authentications.
socketServer = new SocketServer(config, metrics, time,
credentialProvider, apiVersionManager)
- socketServer.startup(startProcessingRequests = false)
clientQuotaMetadataManager = new
ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
@@ -365,10 +367,13 @@ class BrokerServer(
endpoints.asScala.map(ep =>
ep.listenerName().orElse("(none)")).mkString(", "))
}
val authorizerInfo = ServerInfo(new ClusterResource(clusterId),
- config.nodeId, endpoints, interBrokerListener)
+ config.nodeId,
+ endpoints,
+ interBrokerListener,
+ config.earlyStartListeners.map(_.value()).asJava)
- /* Get the authorizer and initialize it if one is specified.*/
- authorizer = config.authorizer
+ // Create and initialize an authorizer if one is configured.
+ authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] =
authorizer match {
case Some(authZ) =>
@@ -432,17 +437,36 @@ class BrokerServer(
// publish operation to complete. This first operation will initialize
logManager,
// replicaManager, groupCoordinator, and txnCoordinator. The log manager
may perform
// a potentially lengthy recovery-from-unclean-shutdown operation here,
if required.
- metadataListener.startPublishing(metadataPublisher).get()
+ try {
+ metadataListener.startPublishing(metadataPublisher).get()
+ } catch {
+ case t: Throwable => throw new RuntimeException("Received a fatal
error while " +
+ "waiting for the broker to catch up with the current cluster
metadata.", t)
+ }
// Log static broker configurations.
new KafkaConfig(config.originals(), true)
- // Enable inbound TCP connections.
- socketServer.startProcessingRequests(authorizerFutures)
+ // Enable inbound TCP connections. Each endpoint will be started only
once its matching
+ // authorizer future is completed.
+ socketServer.enableRequestProcessing(authorizerFutures)
+
+ // If we are using a ClusterMetadataAuthorizer which stores its ACLs in
the metadata log,
+ // notify it that the loading process is complete.
+ authorizer match {
+ case Some(clusterMetadataAuthorizer: ClusterMetadataAuthorizer) =>
+ clusterMetadataAuthorizer.completeInitialLoad()
+ case _ => // nothing to do
+ }
// We're now ready to unfence the broker. This also allows this broker
to transition
// from RECOVERY state to RUNNING state, once the controller unfences
the broker.
- lifecycleManager.setReadyToUnfence()
+ try {
+ lifecycleManager.setReadyToUnfence().get()
+ } catch {
+ case t: Throwable => throw new RuntimeException("Received a fatal
error while " +
+ "waiting for the broker to be unfenced.", t)
+ }
maybeChangeStatus(STARTING, STARTED)
} catch {
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index ba38aa17853..e004996bf74 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -103,7 +103,6 @@ class ControllerServer(
info("Starting controller")
maybeChangeStatus(STARTING, STARTED)
- // TODO: initialize the log dir(s)
this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}]
").logPrefix()
newGauge("ClusterId", () => clusterId)
@@ -116,7 +115,7 @@ class ControllerServer(
}
val javaListeners = config.controllerListeners.map(_.toJava).asJava
- authorizer = config.authorizer
+ authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] =
authorizer match {
@@ -125,7 +124,11 @@ class ControllerServer(
// AuthorizerServerInfo, such as the assumption that there is an
inter-broker
// listener, or that ID is named brokerId.
val controllerAuthorizerInfo = ServerInfo(
- new ClusterResource(clusterId), config.nodeId, javaListeners,
javaListeners.get(0))
+ new ClusterResource(clusterId),
+ config.nodeId,
+ javaListeners,
+ javaListeners.get(0),
+ config.earlyStartListeners.map(_.value()).asJava)
authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) =>
ep -> cs.toCompletableFuture
}.toMap
@@ -144,7 +147,6 @@ class ControllerServer(
time,
credentialProvider,
apiVersionManager)
- socketServer.startup(startProcessingRequests = false,
controlPlaneListener = None, config.controllerListeners)
if (config.controllerListeners.nonEmpty) {
socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
@@ -213,7 +215,17 @@ class ControllerServer(
config.numIoThreads,
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix)
- socketServer.startProcessingRequests(authorizerFutures)
+
+ /**
+ * Enable the controller endpoint(s). If we are using an authorizer
which stores
+ * ACLs in the metadata log, such as StandardAuthorizer, we will be able
to start
+ * accepting requests from principals included super.users right after
this point,
+ * but we will not be able to process requests from non-superusers until
the
+ * QuorumController declares that we have caught up to the high water
mark of the
+ * metadata log. See
@link{QuorumController#maybeCompleteAuthorizerInitialLoad}
+ * and KIP-801 for details.
+ */
+ socketServer.enableRequestProcessing(authorizerFutures)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
@@ -241,6 +253,7 @@ class ControllerServer(
CoreUtils.swallow(quotaManagers.shutdown(), this)
if (controller != null)
controller.close()
+ CoreUtils.swallow(authorizer.foreach(_.close()), this)
createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(),
this))
alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(),
this))
socketServerFirstBoundPortFuture.completeExceptionally(new
RuntimeException("shutting down"))
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7d0d3a3d852..5942b164ea4 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -405,6 +405,8 @@ object KafkaConfig {
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
+ val EarlyStartListenersProp = "early.start.listeners"
+
/** ********* Socket Server Configuration ***********/
val ListenersProp = "listeners"
val AdvertisedListenersProp = "advertised.listeners"
@@ -725,7 +727,12 @@ object KafkaConfig {
/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = s"The fully qualified name of a class that
implements <code>${classOf[Authorizer].getName}</code>" +
- " interface, which is used by the broker for authorization."
+ " interface, which is used by the broker for authorization."
+ val EarlyStartListenersDoc = "A comma-separated list of listener names which
may be started before the authorizer has finished " +
+ "initialization. This is useful when the authorizer is dependent on the
cluster itself for bootstrapping, as is the case for " +
+ "the StandardAuthorizer (which stores ACLs in the metadata log.) By
default, all listeners included in controller.listener.names " +
+ "will also be early start listeners. A listener should not appear in this
list if it accepts external traffic."
+
/** ********* Socket Server Configuration ***********/
val ListenersDoc = "Listener List - Comma-separated list of URIs we will
listen on and the listener names." +
s" If the listener name is not a security protocol,
<code>$ListenerSecurityProtocolMapProp</code> must also be set.\n" +
@@ -1142,6 +1149,7 @@ object KafkaConfig {
/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName,
new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc)
+ .define(EarlyStartListenersProp, STRING, null, HIGH,
EarlyStartListenersDoc)
/** ********* Socket Server Configuration ***********/
.define(ListenersProp, STRING, Defaults.Listeners, HIGH, ListenersDoc)
@@ -1653,7 +1661,7 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
val metadataSnapshotMaxNewRecordBytes =
getLong(KafkaConfig.MetadataSnapshotMaxNewRecordBytesProp)
/************* Authorizer Configuration ***********/
- val authorizer: Option[Authorizer] = {
+ def createNewAuthorizer(): Option[Authorizer] = {
val className = getString(KafkaConfig.AuthorizerClassNameProp)
if (className == null || className.isEmpty)
None
@@ -1662,6 +1670,23 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
}
}
+ val earlyStartListeners: Set[ListenerName] = {
+ val listenersSet = listeners.map(_.listenerName).toSet
+ val controllerListenersSet = controllerListeners.map(_.listenerName).toSet
+ Option(getString(KafkaConfig.EarlyStartListenersProp)) match {
+ case None => controllerListenersSet
+ case Some(str) =>
+ str.split(",").map(_.trim()).filter(!_.isEmpty).map { str =>
+ val listenerName = new ListenerName(str)
+ if (!listenersSet.contains(listenerName) &&
!controllerListenersSet.contains(listenerName))
+ throw new ConfigException(s"${KafkaConfig.EarlyStartListenersProp}
contains " +
+ s"listener ${listenerName.value()}, but this is not contained in
" +
+ s"${KafkaConfig.ListenersProp} or
${KafkaConfig.ControllerListenerNamesProp}")
+ listenerName
+ }.toSet
+ }
+ }
+
/** ********* Socket Server Configuration ***********/
val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp)
val socketReceiveBufferBytes =
getInt(KafkaConfig.SocketReceiveBufferBytesProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index c69f43c9eff..b1273ed628f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -307,7 +307,6 @@ class KafkaServer(
// Note that we allow the use of KRaft mode controller APIs when
forwarding is enabled
// so that the Envelope request is exposed. This is only used in
testing currently.
socketServer = new SocketServer(config, metrics, time,
credentialProvider, apiVersionManager)
- socketServer.startup(startProcessingRequests = false)
// Start alter partition manager based on the IBP version
alterIsrManager = if
(config.interBrokerProtocolVersion.isAlterPartitionSupported) {
@@ -381,7 +380,7 @@ class KafkaServer(
)
/* Get the authorizer and initialize it if one is specified.*/
- authorizer = config.authorizer
+ authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] =
authorizer match {
case Some(authZ) =>
@@ -450,7 +449,7 @@ class KafkaServer(
dynamicConfigManager = new ZkConfigManager(zkClient,
dynamicConfigHandlers)
dynamicConfigManager.startup()
- socketServer.startProcessingRequests(authorizerFutures)
+ socketServer.enableRequestProcessing(authorizerFutures)
_brokerState = BrokerState.RUNNING
shutdownLatch = new CountDownLatch(1)
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 0b27f7fcb52..56104df8212 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -19,6 +19,7 @@ package kafka.tools
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch,
LinkedBlockingDeque, TimeUnit}
+
import joptsimple.OptionException
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.{KafkaRaftManager, RaftManager}
@@ -74,7 +75,6 @@ class TestRaftServer(
val apiVersionManager = new
SimpleApiVersionManager(ListenerType.CONTROLLER)
socketServer = new SocketServer(config, metrics, time, credentialProvider,
apiVersionManager)
- socketServer.startup(startProcessingRequests = false)
val metaProperties = MetaProperties(
clusterId = Uuid.ZERO_UUID.toString,
@@ -119,7 +119,7 @@ class TestRaftServer(
workloadGenerator.start()
raftManager.startup()
- socketServer.startProcessingRequests(Map.empty)
+ socketServer.enableRequestProcessing(Map.empty)
}
def shutdown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 9b4e81ab39e..98f92d61ff2 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -79,7 +79,7 @@ class SocketServerTest {
private val apiVersionManager = new
SimpleApiVersionManager(ListenerType.ZK_BROKER)
val server = new SocketServer(config, metrics, Time.SYSTEM,
credentialProvider, apiVersionManager)
- server.startup()
+ server.enableRequestProcessing(Map.empty)
val sockets = new ArrayBuffer[Socket]
private val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafka")
@@ -296,20 +296,18 @@ class SocketServerTest {
shutdownServerAndMetrics(server)
val testProps = new Properties
testProps ++= props
- testProps.put("listeners",
"EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0")
- testProps.put("listener.security.protocol.map",
"EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
- testProps.put("control.plane.listener.name", "CONTROLLER")
+ testProps.put("listeners",
"EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROL_PLANE://localhost:0")
+ testProps.put("listener.security.protocol.map",
"EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
+ testProps.put("control.plane.listener.name", "CONTROL_PLANE")
testProps.put("inter.broker.listener.name", "INTERNAL")
val config = KafkaConfig.fromProps(testProps)
val testableServer = new TestableSocketServer(config)
- testableServer.startup(startProcessingRequests = false)
val updatedEndPoints = config.effectiveAdvertisedListeners.map { endpoint
=>
endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
}.map(_.toJava)
val externalReadyFuture = new CompletableFuture[Void]()
- val executor = Executors.newSingleThreadExecutor()
def controlPlaneListenerStarted() = {
try {
@@ -334,18 +332,19 @@ class SocketServerTest {
try {
val externalListener = new ListenerName("EXTERNAL")
val externalEndpoint = updatedEndPoints.find(e => e.listenerName.get ==
externalListener.value).get
- val futures = Map(externalEndpoint -> externalReadyFuture)
- val startFuture = executor.submit((() =>
testableServer.startProcessingRequests(futures)): Runnable)
+ val controlPlaneListener = new ListenerName("CONTROL_PLANE")
+ val controlPlaneEndpoint = updatedEndPoints.find(e => e.listenerName.get
== controlPlaneListener.value).get
+ val futures = Map(
+ externalEndpoint -> externalReadyFuture,
+ controlPlaneEndpoint -> CompletableFuture.completedFuture[Void](null))
+ testableServer.enableRequestProcessing(futures)
TestUtils.waitUntilTrue(() => controlPlaneListenerStarted(), "Control
plane listener not started")
- TestUtils.waitUntilTrue(() =>
listenerStarted(config.interBrokerListenerName), "Inter-broker listener not
started")
- assertFalse(startFuture.isDone, "Socket server startup did not wait for
future to complete")
-
+ assertFalse(listenerStarted(config.interBrokerListenerName))
assertFalse(listenerStarted(externalListener))
-
externalReadyFuture.complete(null)
+ TestUtils.waitUntilTrue(() =>
listenerStarted(config.interBrokerListenerName), "Inter-broker listener not
started")
TestUtils.waitUntilTrue(() => listenerStarted(externalListener),
"External listener not started")
} finally {
- executor.shutdownNow()
shutdownServerAndMetrics(testableServer)
}
}
@@ -362,7 +361,6 @@ class SocketServerTest {
val config = KafkaConfig.fromProps(testProps)
val connectionQueueSize = 1
val testableServer = new TestableSocketServer(config, connectionQueueSize)
- testableServer.startup(startProcessingRequests = false)
val socket1 = connect(testableServer, new ListenerName("EXTERNAL"),
localAddr = InetAddress.getLocalHost)
sendRequest(socket1, producerRequestBytes())
@@ -468,7 +466,7 @@ class SocketServerTest {
time, credentialProvider, apiVersionManager)
try {
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
val serializedBytes = producerRequestBytes()
// Connection with no outstanding requests
@@ -536,7 +534,7 @@ class SocketServerTest {
}
try {
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
overrideServer.testableProcessor.setConnectionId(overrideConnectionId)
val socket1 = connectAndWaitForConnectionRegister()
TestUtils.waitUntilTrue(() => connectionCount == 1 &&
openChannel.isDefined, "Failed to create channel")
@@ -805,7 +803,7 @@ class SocketServerTest {
val server = new SocketServer(KafkaConfig.fromProps(newProps), new
Metrics(),
Time.SYSTEM, credentialProvider, apiVersionManager)
try {
- server.startup()
+ server.enableRequestProcessing(Map.empty)
// make the maximum allowable number of connections
val conns = (0 until 5).map(_ => connect(server))
// now try one more (should fail)
@@ -844,7 +842,7 @@ class SocketServerTest {
val overrideServer = new
SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics,
Time.SYSTEM, credentialProvider, apiVersionManager)
try {
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
// make the maximum allowable number of connections
val conns = (0 until overrideNum).map(_ => connect(overrideServer))
@@ -884,7 +882,7 @@ class SocketServerTest {
}
try {
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
val conn = connect(overrideServer)
conn.setSoTimeout(3000)
assertEquals(-1, conn.getInputStream.read())
@@ -907,7 +905,7 @@ class SocketServerTest {
// update the connection rate to 5
overrideServer.connectionQuotas.updateIpConnectionRateQuota(None,
Some(connectionRate))
try {
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
// make the (maximum allowable number + 1) of connections
(0 to connectionRate).map(_ => connect(overrideServer))
@@ -956,7 +954,7 @@ class SocketServerTest {
val overrideServer = new
SocketServer(KafkaConfig.fromProps(overrideProps), new Metrics(),
time, credentialProvider, apiVersionManager)
overrideServer.connectionQuotas.updateIpConnectionRateQuota(None,
Some(connectionRate))
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
// make the maximum allowable number of connections
(0 until connectionRate).map(_ => connect(overrideServer))
// now try one more (should get throttled)
@@ -979,7 +977,7 @@ class SocketServerTest {
val overrideServer = new
SocketServer(KafkaConfig.fromProps(sslServerProps), serverMetrics,
Time.SYSTEM, credentialProvider, apiVersionManager)
try {
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
val sslContext =
SSLContext.getInstance(TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS)
sslContext.init(null, Array(TestUtils.trustAllCerts), new
java.security.SecureRandom())
val socketFactory = sslContext.getSocketFactory
@@ -1038,7 +1036,7 @@ class SocketServerTest {
val time = new MockTime()
val overrideServer = new
TestableSocketServer(KafkaConfig.fromProps(overrideProps), time = time)
try {
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
val socket = connect(overrideServer,
ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT))
val correlationId = -1
@@ -1118,7 +1116,7 @@ class SocketServerTest {
val overrideServer = new TestableSocketServer(KafkaConfig.fromProps(props))
try {
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
val conn: Socket = connect(overrideServer)
overrideServer.testableProcessor.closeSocketOnSendResponse(conn)
val serializedBytes = producerRequestBytes()
@@ -1150,7 +1148,7 @@ class SocketServerTest {
val overrideServer = new TestableSocketServer(KafkaConfig.fromProps(props))
try {
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
val selector = overrideServer.testableSelector
// Create a channel, send some requests and close socket. Receive one
pending request after socket was closed.
@@ -1178,7 +1176,7 @@ class SocketServerTest {
val overrideServer = new SocketServer(KafkaConfig.fromProps(props),
serverMetrics,
Time.SYSTEM, credentialProvider, apiVersionManager)
try {
- overrideServer.startup()
+ overrideServer.enableRequestProcessing(Map.empty)
conn = connect(overrideServer)
val serializedBytes = producerRequestBytes()
sendRequest(conn, serializedBytes)
@@ -1559,7 +1557,7 @@ class SocketServerTest {
props.put(KafkaConfig.ConnectionsMaxIdleMsProp, idleTimeMs.toString)
props ++= sslServerProps
val testableServer = new TestableSocketServer(time = time)
- testableServer.startup()
+ testableServer.enableRequestProcessing(Map.empty)
assertTrue(testableServer.controlPlaneRequestChannelOpt.isEmpty)
@@ -1595,7 +1593,7 @@ class SocketServerTest {
val time = new MockTime()
props ++= sslServerProps
val testableServer = new TestableSocketServer(time = time)
- testableServer.startup()
+ testableServer.enableRequestProcessing(Map.empty)
val proxyServer = new ProxyServer(testableServer)
try {
val testableSelector = testableServer.testableSelector
@@ -1741,7 +1739,7 @@ class SocketServerTest {
val numConnections = 5
props.put("max.connections.per.ip", numConnections.toString)
val testableServer = new
TestableSocketServer(KafkaConfig.fromProps(props), connectionQueueSize = 1)
- testableServer.startup()
+ testableServer.enableRequestProcessing(Map.empty)
val testableSelector = testableServer.testableSelector
val errors = new mutable.HashSet[String]
@@ -1893,7 +1891,9 @@ class SocketServerTest {
startProcessingRequests: Boolean = true):
Unit = {
shutdownServerAndMetrics(server)
val testableServer = new TestableSocketServer(config)
- testableServer.startup(startProcessingRequests = startProcessingRequests)
+ if (startProcessingRequests) {
+ testableServer.enableRequestProcessing(Map.empty)
+ }
try {
testWithServer(testableServer)
} finally {
@@ -1998,7 +1998,8 @@ class SocketServerTest {
new LogContext(),
connectionQueueSize,
isPrivilegedListener,
- apiVersionManager) {
+ apiVersionManager,
+ s"TestableProcessor${id}") {
private var connectionId: Option[String] = None
private var conn: Option[Socket] = None
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e5ff0cd1080..e879a7f6ffc 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1486,13 +1486,19 @@ class KafkaConfigTest {
assertEquals("3", originals.get(KafkaConfig.NodeIdProp))
}
- @Test
- def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
+ def kraftProps(): Properties = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
props.setProperty(KafkaConfig.NodeIdProp, "3")
props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9093")
+ props
+ }
+
+ @Test
+ def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
+ val props = new Properties(kraftProps())
+ props.putAll(kraftProps())
val config = KafkaConfig.fromProps(props)
assertEquals(3, config.brokerId)
assertEquals(3, config.nodeId)
@@ -1527,4 +1533,40 @@ class KafkaConfigTest {
assertTrue(ce.getMessage.contains(KafkaConfig.InterBrokerSecurityProtocolProp))
}
+ @Test
+ def testEarlyStartListenersDefault(): Unit = {
+ val props = new Properties()
+ props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://:8092")
+ props.setProperty(KafkaConfig.NodeIdProp, "1")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9093")
+ val config = new KafkaConfig(props)
+ assertEquals(Set("CONTROLLER"), config.earlyStartListeners.map(_.value()))
+ }
+
+ @Test
+ def testEarlyStartListeners(): Unit = {
+ val props = new Properties()
+ props.putAll(kraftProps())
+ props.setProperty(KafkaConfig.EarlyStartListenersProp,
"INTERNAL,INTERNAL2")
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
+ "INTERNAL:PLAINTEXT,INTERNAL2:PLAINTEXT,CONTROLLER:PLAINTEXT")
+ props.setProperty(KafkaConfig.ListenersProp,
+ "INTERNAL://127.0.0.1:9092,INTERNAL2://127.0.0.1:9093")
+ val config = new KafkaConfig(props)
+ assertEquals(Set(new ListenerName("INTERNAL"), new
ListenerName("INTERNAL2")),
+ config.earlyStartListeners)
+ }
+
+ @Test
+ def testEarlyStartListenersMustBeListeners(): Unit = {
+ val props = new Properties()
+ props.putAll(kraftProps())
+ props.setProperty(KafkaConfig.EarlyStartListenersProp, "INTERNAL")
+ assertEquals("early.start.listeners contains listener INTERNAL, but this
is not " +
+ "contained in listeners or controller.listener.names",
+ assertThrows(classOf[ConfigException], () => new
KafkaConfig(props)).getMessage)
+ }
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 9ff6a599577..cea62c8f00b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -167,6 +167,10 @@ public final class QuorumController implements Controller {
this.clusterId = clusterId;
}
+ public int nodeId() {
+ return nodeId;
+ }
+
public Builder setTime(Time time) {
this.time = time;
return this;
@@ -753,11 +757,11 @@ public final class QuorumController implements Controller
{
}
class QuorumMetaLogListener implements
RaftClient.Listener<ApiMessageAndVersion> {
-
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() +
"]", () -> {
try {
+ maybeCompleteAuthorizerInitialLoad();
boolean isActiveController = curClaimEpoch != -1;
long processedRecordsSize = 0;
while (reader.hasNext()) {
@@ -921,12 +925,35 @@ public final class QuorumController implements Controller
{
if (this != metaLogListener) {
log.debug("Ignoring {} raft event from an old
registration", name);
} else {
- runnable.run();
+ try {
+ runnable.run();
+ } finally {
+ maybeCompleteAuthorizerInitialLoad();
+ }
}
});
}
}
+ private void maybeCompleteAuthorizerInitialLoad() {
+ if (!needToCompleteAuthorizerLoad) return;
+ OptionalLong highWatermark = raftClient.highWatermark();
+ if (highWatermark.isPresent()) {
+ if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) {
+ log.info("maybeCompleteAuthorizerInitialLoad: completing
authorizer " +
+ "initial load at last committed offset {}.",
lastCommittedOffset);
+ authorizer.get().completeInitialLoad();
+ needToCompleteAuthorizerLoad = false;
+ } else {
+ log.trace("maybeCompleteAuthorizerInitialLoad: can't proceed
because " +
+ "lastCommittedOffset = {}, but highWatermark = {}.",
+ lastCommittedOffset, highWatermark.getAsLong());
+ }
+ } else {
+ log.trace("maybeCompleteAuthorizerInitialLoad: highWatermark not
set.");
+ }
+ }
+
private void renounce() {
curClaimEpoch = -1;
controllerMetrics.setActive(false);
@@ -1276,6 +1303,12 @@ public final class QuorumController implements
Controller {
*/
private long lastCommittedTimestamp = -1;
+ /**
+ * True if we need to complete the authorizer initial load.
+ * This must be accessed only by the event queue thread.
+ */
+ private boolean needToCompleteAuthorizerLoad;
+
/**
* If we have called scheduleWrite, this is the last offset we got back
from it.
*/
@@ -1384,9 +1417,12 @@ public final class QuorumController implements
Controller {
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1;
this.writeOffset = -1L;
+ this.needToCompleteAuthorizerLoad = authorizer.isPresent();
resetState();
+ log.info("Creating new QuorumController with clusterId {}, authorizer
{}.", clusterId, authorizer);
+
this.raftClient.register(metaLogListener);
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
index cae35283335..d7673022897 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
@@ -56,6 +56,18 @@ public interface ClusterMetadataAuthorizer extends
Authorizer {
*/
AclMutator aclMutatorOrException();
+ /**
+ * Complete the initial load of the cluster metadata authorizer, so that
all
+ * principals can use it.
+ */
+ void completeInitialLoad();
+
+ /**
+ * Complete the initial load of the cluster metadata authorizer with an
exception,
+ * indicating that the loading process has failed.
+ */
+ void completeInitialLoad(Exception e);
+
/**
* Load the ACLs in the given map. Anything not in the map will be removed.
* The authorizer will also wait for this initial snapshot load to
complete when
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 510a6f87075..611625a4f3a 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -54,10 +54,9 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
public final static String ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG =
"allow.everyone.if.no.acl.found";
/**
- * A future which is completed once we have loaded a snapshot.
- * TODO: KAFKA-13649: StandardAuthorizer should not finish loading until
it reads up to the high water mark.
+ * A future which is completed once we have loaded up to the initial high
water mark.
*/
- private final CompletableFuture<Void> initialLoadFuture =
CompletableFuture.completedFuture(null);
+ private final CompletableFuture<Void> initialLoadFuture = new
CompletableFuture<>();
/**
* The current data. Can be read without a lock. Must be written while
holding the object lock.
@@ -78,6 +77,24 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
return aclMutator;
}
+ @Override
+ public synchronized void completeInitialLoad() {
+ data = data.copyWithNewLoadingComplete(true);
+ data.log.info("Completed initial ACL load process.");
+ initialLoadFuture.complete(null);
+ }
+
+ // Visible for testing
+ public CompletableFuture<Void> initialLoadFuture() {
+ return initialLoadFuture;
+ }
+
+ @Override
+ public void completeInitialLoad(Exception e) {
+ data.log.error("Failed to complete initial ACL load process.", e);
+ initialLoadFuture.completeExceptionally(e);
+ }
+
@Override
public void addAcl(Uuid id, StandardAcl acl) {
data.addAcl(id, acl);
@@ -98,7 +115,12 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
AuthorizerServerInfo serverInfo) {
Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
for (Endpoint endpoint : serverInfo.endpoints()) {
- result.put(endpoint, initialLoadFuture);
+ if (serverInfo.earlyStartListeners().contains(
+ endpoint.listenerName().orElseGet(() -> ""))) {
+ result.put(endpoint, CompletableFuture.completedFuture(null));
+ } else {
+ result.put(endpoint, initialLoadFuture);
+ }
}
return result;
}
@@ -131,7 +153,6 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
// Complete the initialLoadFuture, if it hasn't been completed already.
initialLoadFuture.completeExceptionally(new TimeoutException("The
authorizer was " +
"closed before the initial load could complete."));
- // Nothing else to do here.
}
@Override
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index 27cca4271b8..b46c839e4fa 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.AuthorizerNotReadyException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
@@ -92,6 +93,11 @@ public class StandardAuthorizerData {
*/
final AclMutator aclMutator;
+ /**
+ * True if the authorizer loading process is complete.
+ */
+ final boolean loadingComplete;
+
/**
* A statically configured set of users that are authorized to do anything.
*/
@@ -123,6 +129,7 @@ public class StandardAuthorizerData {
static StandardAuthorizerData createEmpty() {
return new StandardAuthorizerData(createLogger(-1),
null,
+ false,
Collections.emptySet(),
DENIED,
new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
@@ -130,6 +137,7 @@ public class StandardAuthorizerData {
private StandardAuthorizerData(Logger log,
AclMutator aclMutator,
+ boolean loadingComplete,
Set<String> superUsers,
AuthorizationResult defaultResult,
ConcurrentSkipListSet<StandardAcl>
aclsByResource,
@@ -137,6 +145,7 @@ public class StandardAuthorizerData {
this.log = log;
this.auditLog = auditLogger();
this.aclMutator = aclMutator;
+ this.loadingComplete = loadingComplete;
this.superUsers = superUsers;
this.defaultRule = new DefaultRule(defaultResult);
this.aclsByResource = aclsByResource;
@@ -147,6 +156,17 @@ public class StandardAuthorizerData {
return new StandardAuthorizerData(
log,
newAclMutator,
+ loadingComplete,
+ superUsers,
+ defaultRule.result,
+ aclsByResource,
+ aclsById);
+ }
+
+ StandardAuthorizerData copyWithNewLoadingComplete(boolean
newLoadingComplete) {
+ return new StandardAuthorizerData(log,
+ aclMutator,
+ newLoadingComplete,
superUsers,
defaultRule.result,
aclsByResource,
@@ -159,6 +179,7 @@ public class StandardAuthorizerData {
return new StandardAuthorizerData(
createLogger(nodeId),
aclMutator,
+ loadingComplete,
newSuperUsers,
newDefaultResult,
aclsByResource,
@@ -169,6 +190,7 @@ public class StandardAuthorizerData {
StandardAuthorizerData newData = new StandardAuthorizerData(
log,
aclMutator,
+ loadingComplete,
superUsers,
defaultRule.result,
new ConcurrentSkipListSet<>(),
@@ -250,6 +272,8 @@ public class StandardAuthorizerData {
// Superusers are authorized to do anything.
if (superUsers.contains(principal.toString())) {
rule = SuperUserRule.INSTANCE;
+ } else if (!loadingComplete) {
+ throw new AuthorizerNotReadyException();
} else {
MatchingAclRule aclRule = findAclRule(
matchingPrincipals(requestContext),
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
index 08b362a7276..fdc03276451 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
@@ -148,6 +148,16 @@ public class AclControlManagerTest {
throw new NotControllerException("The current node is not the
active controller.");
}
+ @Override
+ public void completeInitialLoad() {
+ // do nothing
+ }
+
+ @Override
+ public void completeInitialLoad(Exception e) {
+ // do nothing
+ }
+
@Override
public void loadSnapshot(Map<Uuid, StandardAcl> acls) {
this.acls = new HashMap<>(acls);
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 9c679ad7d1e..148c5720b6d 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.controller;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -81,6 +82,7 @@ import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -91,6 +93,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import static java.util.function.Function.identity;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
@@ -1071,4 +1074,50 @@ public class QuorumControllerTest {
}
}
}
+
+ private static final Uuid FOO_ID =
Uuid.fromString("igRktLOnR8ektWHr79F8mw");
+
+ private static final Map<Integer, Long> ALL_ZERO_BROKER_EPOCHS =
+ IntStream.of(0, 1, 2, 3).boxed().collect(Collectors.toMap(identity(),
__ -> 0L));
+
+ @Test
+ public void testQuorumControllerCompletesAuthorizerInitialLoad() throws
Throwable {
+ final int numControllers = 3;
+ List<StandardAuthorizer> authorizers = new ArrayList<>(numControllers);
+ for (int i = 0; i < numControllers; i++) {
+ StandardAuthorizer authorizer = new StandardAuthorizer();
+ authorizer.configure(Collections.emptyMap());
+ authorizers.add(authorizer);
+ }
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(
+ numControllers,
+ Optional.empty(),
+ shared -> {
+ shared.setInitialMaxReadOffset(2);
+ }
+ )) {
+ logEnv.appendInitialRecords(expectedSnapshotContent(FOO_ID,
ALL_ZERO_BROKER_EPOCHS));
+ logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2));
+ try (QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv(logEnv, b -> {
+ b.setAuthorizer(authorizers.get(b.nodeId()));
+ })) {
+ assertInitialLoadFuturesNotComplete(authorizers);
+ logEnv.logManagers().get(0).setMaxReadOffset(Long.MAX_VALUE);
+ QuorumController active = controlEnv.activeController();
+ active.unregisterBroker(ANONYMOUS_CONTEXT, 3).get();
+
assertInitialLoadFuturesNotComplete(authorizers.stream().skip(1).collect(Collectors.toList()));
+ logEnv.logManagers().forEach(m ->
m.setMaxReadOffset(Long.MAX_VALUE));
+ TestUtils.waitForCondition(() -> {
+ return authorizers.stream().allMatch(a ->
a.initialLoadFuture().isDone());
+ }, "Failed to complete initial authorizer load for all
controllers.");
+ }
+ }
+ }
+
+ private static void
assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
+ for (int i = 0; i < authorizers.size(); i++) {
+ assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
+ "authorizer " + i + " should not have completed loading.");
+ }
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
index 19b07abed2c..3242170bcdf 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
@@ -106,6 +106,16 @@ public class ClusterMetadataAuthorizerTest {
return aclMutator;
}
+ @Override
+ public void completeInitialLoad() {
+ // do nothing
+ }
+
+ @Override
+ public void completeInitialLoad(Exception e) {
+ // do nothing
+ }
+
@Override
public void loadSnapshot(Map<Uuid, StandardAcl> acls) {
// do nothing
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index 7ed37785c42..987c00155c4 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -17,19 +17,25 @@
package org.apache.kafka.metadata.authorizer;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.AuthorizerNotReadyException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
@@ -40,12 +46,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.CompletionStage;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -82,6 +93,56 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class StandardAuthorizerTest {
+ public static final Endpoint PLAINTEXT = new Endpoint("PLAINTEXT",
+ SecurityProtocol.PLAINTEXT,
+ "127.0.0.1",
+ 9020);
+
+ public static final Endpoint CONTROLLER = new Endpoint("CONTROLLER",
+ SecurityProtocol.PLAINTEXT,
+ "127.0.0.1",
+ 9020);
+
+ static class AuthorizerTestServerInfo implements AuthorizerServerInfo {
+ private final Collection<Endpoint> endpoints;
+
+ AuthorizerTestServerInfo(Collection<Endpoint> endpoints) {
+ assertFalse(endpoints.isEmpty());
+ this.endpoints = endpoints;
+ }
+
+ @Override
+ public ClusterResource clusterResource() {
+ return new
ClusterResource(Uuid.fromString("r7mqHQrxTNmzbKvCvWZzLQ").toString());
+ }
+
+ @Override
+ public int brokerId() {
+ return 0;
+ }
+
+ @Override
+ public Collection<Endpoint> endpoints() {
+ return endpoints;
+ }
+
+ @Override
+ public Endpoint interBrokerEndpoint() {
+ return endpoints.iterator().next();
+ }
+
+ @Override
+ public Collection<String> earlyStartListeners() {
+ List<String> result = new ArrayList<>();
+ for (Endpoint endpoint : endpoints) {
+ if (endpoint.listenerName().get().equals("CONTROLLER")) {
+ result.add(endpoint.listenerName().get());
+ }
+ }
+ return result;
+ }
+ }
+
@Test
public void testGetConfiguredSuperUsers() {
assertEquals(Collections.emptySet(),
@@ -124,6 +185,16 @@ public class StandardAuthorizerTest {
new ResourcePattern(resourceType, resourceName, LITERAL), 1,
false, false);
}
+ static StandardAuthorizer createAndInitializeStandardAuthorizer() {
+ StandardAuthorizer authorizer = new StandardAuthorizer();
+ authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG,
"User:superman"));
+ authorizer.start(new
AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
+ authorizer.completeInitialLoad();
+ return authorizer;
+ }
+
+ private final static AtomicLong NEXT_ID = new AtomicLong(0);
+
static StandardAcl newFooAcl(AclOperation op, AclPermissionType
permission) {
return new StandardAcl(
TOPIC,
@@ -225,8 +296,7 @@ public class StandardAuthorizerTest {
@Test
public void testListAcls() throws Exception {
- StandardAuthorizer authorizer = new StandardAuthorizer();
- authorizer.configure(Collections.emptyMap());
+ StandardAuthorizer authorizer =
createAndInitializeStandardAuthorizer();
List<StandardAclWithId> fooAcls = asList(
withId(newFooAcl(READ, ALLOW)),
withId(newFooAcl(WRITE, ALLOW)));
@@ -247,8 +317,7 @@ public class StandardAuthorizerTest {
@Test
public void testSimpleAuthorizations() throws Exception {
- StandardAuthorizer authorizer = new StandardAuthorizer();
- authorizer.configure(Collections.emptyMap());
+ StandardAuthorizer authorizer =
createAndInitializeStandardAuthorizer();
List<StandardAclWithId> fooAcls = asList(
withId(newFooAcl(READ, ALLOW)),
withId(newFooAcl(WRITE, ALLOW)));
@@ -269,20 +338,17 @@ public class StandardAuthorizerTest {
@Test
public void testDenyPrecedenceWithOperationAll() throws Exception {
- StandardAuthorizer authorizer = new StandardAuthorizer();
- authorizer.configure(Collections.emptyMap());
+ StandardAuthorizer authorizer =
createAndInitializeStandardAuthorizer();
List<StandardAcl> acls = Arrays.asList(
new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", "*", ALL,
DENY),
new StandardAcl(TOPIC, "foo", PREFIXED, "User:alice", "*", READ,
ALLOW),
new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, DENY),
new StandardAcl(TOPIC, "foo", PREFIXED, "User:*", "*", DESCRIBE,
ALLOW)
);
-
acls.forEach(acl -> {
StandardAclWithId aclWithId = withId(acl);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
});
-
assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED),
authorizer.authorize(
newRequestContext("alice"),
Arrays.asList(
@@ -290,7 +356,6 @@ public class StandardAuthorizerTest {
newAction(READ, TOPIC, "foo"),
newAction(DESCRIBE, TOPIC, "foo"),
newAction(READ, TOPIC, "foobar"))));
-
assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED, DENIED),
authorizer.authorize(
newRequestContext("bob"),
Arrays.asList(
@@ -303,19 +368,16 @@ public class StandardAuthorizerTest {
@Test
public void testTopicAclWithOperationAll() throws Exception {
- StandardAuthorizer authorizer = new StandardAuthorizer();
- authorizer.configure(Collections.emptyMap());
+ StandardAuthorizer authorizer =
createAndInitializeStandardAuthorizer();
List<StandardAcl> acls = Arrays.asList(
new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, ALLOW),
new StandardAcl(TOPIC, "bar", PREFIXED, "User:alice", "*", ALL,
ALLOW),
new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW)
);
-
acls.forEach(acl -> {
StandardAclWithId aclWithId = withId(acl);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
});
-
assertEquals(Arrays.asList(ALLOWED, ALLOWED, DENIED),
authorizer.authorize(
newRequestContext("alice"),
Arrays.asList(
@@ -349,8 +411,7 @@ public class StandardAuthorizerTest {
InetAddress host1 = InetAddress.getByName("192.168.1.1");
InetAddress host2 = InetAddress.getByName("192.168.1.2");
- StandardAuthorizer authorizer = new StandardAuthorizer();
- authorizer.configure(Collections.emptyMap());
+ StandardAuthorizer authorizer =
createAndInitializeStandardAuthorizer();
List<StandardAcl> acls = Arrays.asList(
new StandardAcl(TOPIC, "foo", LITERAL, "User:alice",
host1.getHostAddress(), READ, DENY),
new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", "*", READ,
ALLOW),
@@ -395,9 +456,7 @@ public class StandardAuthorizerTest {
.build();
}
- private static StandardAuthorizer createAuthorizerWithManyAcls() {
- StandardAuthorizer authorizer = new StandardAuthorizer();
- authorizer.configure(Collections.emptyMap());
+ private static void addManyAcls(StandardAuthorizer authorizer) {
List<StandardAcl> acls = Arrays.asList(
new StandardAcl(TOPIC, "green2", LITERAL, "User:*", "*", READ,
ALLOW),
new StandardAcl(TOPIC, "green", PREFIXED, "User:bob", "*", READ,
ALLOW),
@@ -413,12 +472,12 @@ public class StandardAuthorizerTest {
StandardAclWithId aclWithId = withId(acl);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
});
- return authorizer;
}
@Test
public void testAuthorizationWithManyAcls() throws Exception {
- StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
+ StandardAuthorizer authorizer =
createAndInitializeStandardAuthorizer();
+ addManyAcls(authorizer);
assertEquals(Arrays.asList(ALLOWED, DENIED),
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
@@ -449,7 +508,8 @@ public class StandardAuthorizerTest {
Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
- StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
+ StandardAuthorizer authorizer =
createAndInitializeStandardAuthorizer();
+ addManyAcls(authorizer);
ResourcePattern topicResource = new ResourcePattern(TOPIC,
"alpha", LITERAL);
Action action = new Action(READ, topicResource, 1, false,
logIfDenied);
MockAuthorizableRequestContext requestContext = new
MockAuthorizableRequestContext.Builder()
@@ -490,7 +550,8 @@ public class StandardAuthorizerTest {
Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
- StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
+ StandardAuthorizer authorizer =
createAndInitializeStandardAuthorizer();
+ addManyAcls(authorizer);
ResourcePattern topicResource = new ResourcePattern(TOPIC,
"green1", LITERAL);
Action action = new Action(READ, topicResource, 1, logIfAllowed,
false);
MockAuthorizableRequestContext requestContext = new
MockAuthorizableRequestContext.Builder()
@@ -514,4 +575,69 @@ public class StandardAuthorizerTest {
}
}
+ /**
+ * Test that StandardAuthorizer#start returns a completed future for early
start
+ * listeners.
+ */
+ @Test
+ public void testStartWithEarlyStartListeners() throws Exception {
+ StandardAuthorizer authorizer = new StandardAuthorizer();
+ authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG,
"User:superman"));
+ Map<Endpoint, ? extends CompletionStage<Void>> futures2 = authorizer.
+ start(new AuthorizerTestServerInfo(Arrays.asList(PLAINTEXT,
CONTROLLER)));
+ assertEquals(new HashSet<>(Arrays.asList(PLAINTEXT, CONTROLLER)),
futures2.keySet());
+ assertFalse(futures2.get(PLAINTEXT).toCompletableFuture().isDone());
+ assertTrue(futures2.get(CONTROLLER).toCompletableFuture().isDone());
+ }
+
+ /**
+ * Test attempts to authorize prior to completeInitialLoad. During this
time, only
+ * superusers can be authorized. Other users will get an
AuthorizerNotReadyException
+ * exception. Not even an authorization result, just an exception thrown
for the whole
+ * batch.
+ */
+ @Test
+ public void testAuthorizationPriorToCompleteInitialLoad() throws Exception
{
+ StandardAuthorizer authorizer = new StandardAuthorizer();
+ authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG,
"User:superman"));
+ assertThrows(AuthorizerNotReadyException.class, () ->
+ authorizer.authorize(new MockAuthorizableRequestContext.Builder().
+ setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
+ Arrays.asList(newAction(READ, TOPIC, "green1"),
+ newAction(READ, TOPIC, "green2"))));
+ assertEquals(Arrays.asList(ALLOWED, ALLOWED),
+ authorizer.authorize(new MockAuthorizableRequestContext.Builder().
+ setPrincipal(new KafkaPrincipal(USER_TYPE,
"superman")).build(),
+ Arrays.asList(newAction(READ, TOPIC, "green1"),
+ newAction(WRITE, GROUP, "wheel"))));
+ }
+
+ @Test
+ public void testCompleteInitialLoad() throws Exception {
+ StandardAuthorizer authorizer = new StandardAuthorizer();
+ authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG,
"User:superman"));
+ Map<Endpoint, ? extends CompletionStage<Void>> futures = authorizer.
+ start(new
AuthorizerTestServerInfo(Collections.singleton(PLAINTEXT)));
+ assertEquals(Collections.singleton(PLAINTEXT), futures.keySet());
+ assertFalse(futures.get(PLAINTEXT).toCompletableFuture().isDone());
+ authorizer.completeInitialLoad();
+ assertTrue(futures.get(PLAINTEXT).toCompletableFuture().isDone());
+
assertFalse(futures.get(PLAINTEXT).toCompletableFuture().isCompletedExceptionally());
+ }
+
+ @Test
+ public void testCompleteInitialLoadWithException() throws Exception {
+ StandardAuthorizer authorizer = new StandardAuthorizer();
+ authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG,
"User:superman"));
+ Map<Endpoint, ? extends CompletionStage<Void>> futures = authorizer.
+ start(new AuthorizerTestServerInfo(Arrays.asList(PLAINTEXT,
CONTROLLER)));
+ assertEquals(new HashSet<>(Arrays.asList(PLAINTEXT, CONTROLLER)),
futures.keySet());
+ assertFalse(futures.get(PLAINTEXT).toCompletableFuture().isDone());
+ assertTrue(futures.get(CONTROLLER).toCompletableFuture().isDone());
+ authorizer.completeInitialLoad(new TimeoutException("timed out"));
+ assertTrue(futures.get(PLAINTEXT).toCompletableFuture().isDone());
+
assertTrue(futures.get(PLAINTEXT).toCompletableFuture().isCompletedExceptionally());
+ assertTrue(futures.get(CONTROLLER).toCompletableFuture().isDone());
+
assertFalse(futures.get(CONTROLLER).toCompletableFuture().isCompletedExceptionally());
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 855fd468cba..3c495adff92 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -77,10 +77,10 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
int size();
}
- static class LeaderChangeBatch implements LocalBatch {
+ public static class LeaderChangeBatch implements LocalBatch {
private final LeaderAndEpoch newLeader;
- LeaderChangeBatch(LeaderAndEpoch newLeader) {
+ public LeaderChangeBatch(LeaderAndEpoch newLeader) {
this.newLeader = newLeader;
}
@@ -113,12 +113,12 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
}
}
- static class LocalRecordBatch implements LocalBatch {
+ public static class LocalRecordBatch implements LocalBatch {
private final int leaderEpoch;
private final long appendTimestamp;
private final List<ApiMessageAndVersion> records;
- LocalRecordBatch(int leaderEpoch, long appendTimestamp,
List<ApiMessageAndVersion> records) {
+ public LocalRecordBatch(int leaderEpoch, long appendTimestamp,
List<ApiMessageAndVersion> records) {
this.leaderEpoch = leaderEpoch;
this.appendTimestamp = appendTimestamp;
this.records = records;
@@ -184,6 +184,11 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
*/
private long prevOffset;
+ /**
+ * The initial max read offset which LocalLog instances will be
configured with.
+ */
+ private long initialMaxReadOffset = Long.MAX_VALUE;
+
/**
* Maps committed offset to snapshot reader.
*/
@@ -237,7 +242,12 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
return offset;
}
- synchronized long append(LocalBatch batch) {
+ public synchronized long append(LocalBatch batch) {
+ try {
+ throw new RuntimeException("foo");
+ } catch (Exception e) {
+ log.info("WATERMELON: appending {}", batch, e);
+ }
prevOffset += batch.size();
log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
batches.put(prevOffset, batch);
@@ -352,6 +362,15 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
})
.sum();
}
+
+ public SharedLogData setInitialMaxReadOffset(long
initialMaxReadOffset) {
+ this.initialMaxReadOffset = initialMaxReadOffset;
+ return this;
+ }
+
+ public long initialMaxReadOffset() {
+ return initialMaxReadOffset;
+ }
}
private static class MetaLogListenerData {
@@ -454,6 +473,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
this.log = logContext.logger(LocalLogManager.class);
this.nodeId = nodeId;
this.shared = shared;
+ this.maxReadOffset = shared.initialMaxReadOffset();
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
threadNamePrefix);
shared.registerLogManager(this);
}
@@ -502,6 +522,8 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
// Only notify the listener if it equals the
shared leader state
LeaderAndEpoch sharedLeader =
shared.leaderAndEpoch();
if (batch.newLeader.equals(sharedLeader)) {
+ log.debug("Node {}: Executing
handleLeaderChange {}",
+ nodeId, sharedLeader);
listenerData.handleLeaderChange(entryOffset,
batch.newLeader);
if (batch.newLeader.epoch() > leader.epoch()) {
leader = batch.newLeader;
@@ -658,6 +680,15 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
});
}
+ @Override
+ public synchronized OptionalLong highWatermark() {
+ if (shared.prevOffset > 0) {
+ return OptionalLong.of(shared.prevOffset);
+ } else {
+ return OptionalLong.empty();
+ }
+ }
+
@Override
public long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
if (batch.isEmpty()) {
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index ed18ab6053d..17c9c467124 100644
---
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -20,8 +20,11 @@ package org.apache.kafka.metalog;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metalog.LocalLogManager.LeaderChangeBatch;
+import org.apache.kafka.metalog.LocalLogManager.LocalRecordBatch;
import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
@@ -32,7 +35,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
public class LocalLogManagerTestEnv implements AutoCloseable {
private static final Logger log =
@@ -77,10 +82,15 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
return testEnv;
}
- public LocalLogManagerTestEnv(int numManagers, Optional<RawSnapshotReader>
snapshot) throws Exception {
+ public LocalLogManagerTestEnv(
+ int numManagers,
+ Optional<RawSnapshotReader> snapshot,
+ Consumer<SharedLogData> dataSetup
+ ) throws Exception {
clusterId = Uuid.randomUuid().toString();
dir = TestUtils.tempDirectory();
shared = new SharedLogData(snapshot);
+ dataSetup.accept(shared);
List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
try {
for (int nodeId = 0; nodeId < numManagers; nodeId++) {
@@ -102,6 +112,28 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
this.logManagers = newLogManagers;
}
+ public LocalLogManagerTestEnv(
+ int numManagers,
+ Optional<RawSnapshotReader> snapshot
+ ) throws Exception {
+ this(numManagers, snapshot, __ -> { });
+ }
+
+ /**
+ * Append some records to the log. This method is meant to be called
before the
+ * controllers are started, to simulate a pre-existing metadata log.
+ *
+ * @param records The records to be appended. Will be added in a single
batch.
+ */
+ public void appendInitialRecords(List<ApiMessageAndVersion> records) {
+ int initialLeaderEpoch = 1;
+ shared.append(new LeaderChangeBatch(
+ new LeaderAndEpoch(OptionalInt.empty(), initialLeaderEpoch + 1)));
+ shared.append(new LocalRecordBatch(initialLeaderEpoch + 1, 0,
records));
+ shared.append(new LeaderChangeBatch(
+ new LeaderAndEpoch(OptionalInt.of(0), initialLeaderEpoch + 2)));
+ }
+
public String clusterId() {
return clusterId;
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 8e4f50e7488..952cc60a387 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -24,6 +24,7 @@ import org.apache.kafka.snapshot.SnapshotWriter;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
+import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
public interface RaftClient<T> extends AutoCloseable {
@@ -113,6 +114,11 @@ public interface RaftClient<T> extends AutoCloseable {
*/
void unregister(Listener<T> listener);
+ /**
+ * Returns the current high water mark, or OptionalLong.empty if it is not
known.
+ */
+ OptionalLong highWatermark();
+
/**
* Return the current {@link LeaderAndEpoch}.
*