This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new b01b360  MINOR: Use self-managed mode instead of KIP-500 and nozk 
(#10362)
b01b360 is described below

commit b01b360d415337c423d75c481ab25ae8215fb75f
Author: Ismael Juma <[email protected]>
AuthorDate: Fri Mar 19 16:42:37 2021 -0700

    MINOR: Use self-managed mode instead of KIP-500 and nozk (#10362)
    
    KIP-500 is not particularly descriptive. I also tweaked the readme text a 
bit.
    
    Tested that the readme for self-managed still works after these changes.
    
    Reviewers: Colin P. McCabe <[email protected]>, Ron Dagostino 
<[email protected]>, Jason Gustafson <[email protected]>
---
 README.md                                          |  9 +++++
 build.gradle                                       |  2 +-
 KIP-500.md => config/self-managed/README.md        | 46 ++++++++++++----------
 .../broker.properties}                             |  6 +--
 .../controller.properties}                         |  6 +--
 .../server.properties}                             |  6 +--
 .../scala/kafka/server/ApiVersionManager.scala     |  2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  2 +-
 .../main/scala/kafka/server/ControllerServer.scala |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 24 ++++++-----
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  7 ++--
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  | 18 ++++-----
 .../test/scala/unit/kafka/api/ApiVersionTest.scala |  2 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  2 +-
 .../scala/unit/kafka/tools/StorageToolTest.scala   | 14 +++----
 .../apache/kafka/controller/QuorumController.java  |  2 +-
 tests/kafkatest/services/kafka/quorum.py           |  4 +-
 .../client/client_compatibility_features_test.py   |  3 +-
 20 files changed, 86 insertions(+), 75 deletions(-)

diff --git a/README.md b/README.md
index 3c3f013..7de41af 100644
--- a/README.md
+++ b/README.md
@@ -81,6 +81,15 @@ fail due to code changes. You can just run:
  
     ./gradlew processMessages processTestMessages
 
+### Running a Kafka broker with ZooKeeper
+
+    ./bin/zookeeper-server-start.sh config/zookeeper.properties
+    ./bin/kafka-server-start.sh config/server.properties
+
+### Running a Kafka broker in self-managed mode
+
+See 
[config/self-managed/README.md](https://github.com/apache/kafka/blob/trunk/config/self-managed/README.md).
+
 ### Cleaning the build ###
     ./gradlew clean
 
diff --git a/build.gradle b/build.gradle
index b7aff53..d62b56f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -154,7 +154,7 @@ if (file('.git').exists()) {
         'gradlew',
         'gradlew.bat',
         'gradle/wrapper/gradle-wrapper.properties',
-        'KIP-500.md',
+        'config/self-managed/README.md',
         'TROGDOR.md',
         '**/README.md',
         '**/id_rsa',
diff --git a/KIP-500.md b/config/self-managed/README.md
similarity index 78%
rename from KIP-500.md
rename to config/self-managed/README.md
index bc2778a..9260e45 100644
--- a/KIP-500.md
+++ b/config/self-managed/README.md
@@ -1,5 +1,5 @@
-KIP-500 Early Access Release
-============================
+Self-managed mode (aka KIP-500 mode) Early Access Release
+=========================================================
 
 # Introduction
 It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
@@ -11,7 +11,7 @@ Self-managed mode has many benefits -- some obvious, and some 
not so obvious.  C
 # Quickstart
 
 ## Warning
-Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0.  There may 
be bugs, including serious ones.  You should *assume that your data could be 
lost at any time* if you try the early access release of KIP-500.
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it will not be 
possible to upgrade your self-managed clusters from 2.8 to 3.0.  There may be 
bugs, including serious ones.  You should *assume that your data could be lost 
at any time* if you try the early access release of self-managed mode.
 
 ## Generate a cluster ID
 The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
@@ -25,8 +25,8 @@ xtzWWN4bTjitpL3kfd9s5g
 The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
 
 ~~~~
-$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c 
./config/nozk-combined.properties
-Formatting /tmp/nozk-combined-logs
+$ ./bin/kafka-storage.sh format -t <uuid> -c 
./config/self-managed/server.properties
+Formatting /tmp/self-managed-combined-logs
 ~~~~
 
 If you are using multiple nodes, then you should run the format command on 
each node.  Be sure to use the same cluster ID for each one.
@@ -35,10 +35,10 @@ If you are using multiple nodes, then you should run the 
format command on each
 Finally, you are ready to start the Kafka server on each node.
 
 ~~~~
-$ ./bin/kafka-server-start.sh ./config/nozk-combined.properties
+$ ./bin/kafka-server-start.sh ./config/self-managed/server.properties
 [2021-02-26 15:37:11,071] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
 [2021-02-26 15:37:11,294] INFO Setting -D 
jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS 
renegotiation (org.apache.zookeeper.common.X509Util)
-[2021-02-26 15:37:11,466] INFO [Log partition=@metadata-0, 
dir=/tmp/nozk-combined-logs] Loading producer state till offset 0 with message 
format version 2 (kafka.log.Log)
+[2021-02-26 15:37:11,466] INFO [Log partition=@metadata-0, 
dir=/tmp/self-managed-combined-logs] Loading producer state till offset 0 with 
message format version 2 (kafka.log.Log)
 [2021-02-26 15:37:11,509] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
 [2021-02-26 15:37:11,640] INFO [RaftManager nodeId=1] Completed transition to 
Unattached(epoch=0, voters=[1], electionTimeoutMs=9037) 
(org.apache.kafka.raft.QuorumState)
 ...
@@ -54,19 +54,24 @@ Created topic foo.
 # Deployment
 
 ## Controller Servers
-Unlike in ZooKeeper-based mode, where any server can become the controller, in 
self-managed mode, only a small group of specially selected servers can act as 
controllers.  The specially selected controller servers will participate in the 
metadata quorum.  Each KIP-500 controller server is either active, or a hot 
standby for the current active controller server.
+In self-managed mode, only a small group of specially selected servers can act 
as controllers (unlike the ZooKeeper-based mode, where any server can become the
+Controller).  The specially selected controller servers will participate in 
the metadata quorum.  Each self-managed controller server is either active, or 
a hot
+standby for the current active controller server.
 
-Typically you will select either 3 or 5 servers for this role, depending on 
the size of your cluster.  Just like with ZooKeeper, you must keep a majority 
of the controllers alive in order to maintain availability.  So if you have 3 
controllers, you can tolerate 1 failure; with 5 controllers, you can tolerate 2 
failures.
+You will typically select 3 or 5 servers for this role, depending on factors 
like cost and the number of concurrent failures your system should withstand
+without availability impact.  Just like with ZooKeeper, you must keep a 
majority of the controllers alive in order to maintain availability.  So if you 
have 3
+controllers, you can tolerate 1 failure; with 5 controllers, you can tolerate 
2 failures.
 
 ## Process Roles
 Each Kafka server now has a new configuration key called `process.roles` which 
can have the following values:
 
 * If `process.roles` is set to `broker`, the server acts as a self-managed 
broker.
 * If `process.roles` is set to `controller`, the server acts as a self-managed 
controller.
-* If `process.roles` is set to `broker,controller`, the server acts as both a 
self-managed broker and a self-managd controller.
-* If `process.roles` is not set at all then we are assumed to be in ZooKeeper 
mode.  As mentioned earlier, you can't yet transition back and forth between ZK 
mode and self-managed mode without reformatting.
+* If `process.roles` is set to `broker,controller`, the server acts as both a 
self-managed broker and a self-managed controller.
+* If `process.roles` is not set at all then we are assumed to be in ZooKeeper 
mode.  As mentioned earlier, you can't currently transition back and forth 
between ZK mode and self-managed mode without reformatting.
 
-Nodes that act as both brokers and controllers are referred to as "combined" 
nodes.  The advantage of using combined nodes you will have uses fewer Java 
Virtual Machines (JVMs).  This will allow you to avoid some of the fixed memory 
overheads associated with JVMs.  The disdavantage is that the controller will 
be less isolated from the rest of the system.  For example, if activity on the 
broker causes an out of memory condition, the controller part of the server is 
not isolated from that  [...]
+Nodes that act as both brokers and controllers are referred to as "combined" 
nodes.  Combined nodes are simpler to operate for simple use cases and allow 
you to avoid
+some of the fixed memory overheads associated with JVMs.  The key disadvantage 
is that the controller will be less isolated from the rest of the system.  For 
example, if activity on the broker causes an out of memory condition, the 
controller part of the server is not isolated from that OOM condition.
 
 ## Quorum Voters
 All nodes in the system must set the `controller.quorum.voters` configuration. 
 This identifies the quorum controller servers that should be used.  All the 
controllers must be enumerated.  This is similar to how, when using ZooKeeper, 
the `zookeeper.connect` configuration must contain all the ZooKeeper servers.  
Unlike with the ZK config, however, `controller.quorum.voters` also has IDs for 
each node.  The format is id1@host1:port1,id2@host2:port2, etc.
@@ -84,20 +89,20 @@ Each broker and each controller must set 
`controller.quorum.voters`.  Note that
 Note that clients never need to configure `controller.quorum.voters`; only 
servers do.
 
 ## Kafka Storage Tool
-As described above in the QuickStart section, you must use the 
kafka-storage.sh tool to generate a cluster ID for your new cluster, and then 
run the format command on each node before starting the node.
+As described above in the QuickStart section, you must use the 
`kafka-storage.sh` tool to generate a cluster ID for your new cluster, and then 
run the format command on each node before starting the node.
 
 This is different from how Kafka has operated in the past.  Previously, Kafka 
would format blank storage directories automatically, and also generate a new 
cluster UUID automatically.  One reason for the change is that auto-formatting 
can sometimes obscure an error condition.  For example, under UNIX, if a data 
directory can't be mounted, it may show up as blank.  In this case, 
auto-formatting would be the wrong thing to do.
 
 This is particularly important for the metadata log maintained by the 
controller servers.  If two controllers out of three controllers were able to 
start with blank logs, a leader might be able to be elected with nothing in the 
log, which would cause all metadata to be lost.
 
 # Missing Features
-We do not yet support generating or loading KIP-630 metadata snapshots.  This 
means that after a while, the time required to restart a broker will become 
very large.  This is a known issue and we are working on implementing snapshots 
for the next release.
+We do not yet support generating or loading KIP-630 metadata snapshots.  This 
means that after a while, the time required to restart a broker will become 
very large.  This is a known issue and we are working on completing snapshots 
for the next release.
 
-We also don't support any kind of upgrade right now, either to or from 
self-managed mode.  This is another big gap that we are working on.
+We also don't support any kind of upgrade right now, either to or from 
self-managed mode.  This is another important gap that we are working on.
 
 Finally, the following Kafka features have not yet been fully implemented:
 
-* Support for security (configuring an Authorizer, setting up SCRAM, 
delegation tokens, and so forth)
+* Support for certain security features: configuring an Authorizer, setting up 
SCRAM, delegation tokens, and so forth
 * Support for transactions and exactly-once semantics
 * Support for adding partitions to existing topics
 * Support for partition reassignment
@@ -105,7 +110,7 @@ Finally, the following Kafka features have not yet been 
fully implemented:
 * Support for KIP-112 "JBOD" modes
 * Support for KIP-631 controller metrics
 
-We've tried to make it clear when a feature is not supported in the early 
access release, but you may encounter some rough edges.
+We've tried to make it clear when a feature is not supported in the early 
access release, but you may encounter some rough edges. We will cover these 
feature gaps incrementally in the `trunk` branch.
 
 # Debugging
 If you encounter an issue, you might want to take a look at the metadata log.
@@ -114,8 +119,8 @@ If you encounter an issue, you might want to take a look at 
the metadata log.
 One way to view the metadata log is with kafka-dump-log.sh tool, like so:
 
 ~~~~
-[cmccabe@zeratul kafka3]$ ./bin/kafka-dump-log.sh  --cluster-metadata-decoder 
--skip-record-metadata --files /tmp/nozk-combined-logs/\@metadata-0/*.log
-Dumping /tmp/nozk-combined-logs/@metadata-0/00000000000000000000.log
+[cmccabe@zeratul kafka3]$ ./bin/kafka-dump-log.sh  --cluster-metadata-decoder 
--skip-record-metadata --files 
/tmp/self-managed-combined-logs/\@metadata-0/*.log
+Dumping /tmp/self-managed-combined-logs/@metadata-0/00000000000000000000.log
 Starting offset: 0
 baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false 
isControl: true position: 0 CreateTime: 1614382631640 size: 89 magic: 2 
compresscodec: NONE crc: 1438115474 isvalid: true
 
@@ -140,7 +145,7 @@ baseOffset: 7 lastOffset: 8 count: 2 baseSequence: -1 
lastSequence: -1 producerI
 Another tool for examining the metadata logs is the Kafka metadata shell.  
Just like the ZooKeeper shell, this allows you to inspect the metadata of the 
cluster.
 
 ~~~~
-$ ./bin/kafka-metadata-shell.sh  --snapshot 
/tmp/nozk-combined-logs/\@metadata-0/00000000000000000000.log
+$ ./bin/kafka-metadata-shell.sh  --snapshot 
/tmp/self-managed-combined-logs/\@metadata-0/00000000000000000000.log
 >> ls /
 brokers  local  metadataQuorum  topicIds  topics
 >> ls /topics
@@ -157,4 +162,5 @@ foo
   "leaderEpoch" : 0,
   "partitionEpoch" : 0
 }
+>> exit
 ~~~~
diff --git a/config/nozk-broker.properties 
b/config/self-managed/broker.properties
similarity index 96%
rename from config/nozk-broker.properties
rename to config/self-managed/broker.properties
index a8762a7..ad9ec0a 100644
--- a/config/nozk-broker.properties
+++ b/config/self-managed/broker.properties
@@ -15,12 +15,12 @@
 
 #
 # This configuration file is intended for use in self-managed mode, where
-# Apache ZooKeeper is not present.  See KIP-500.md for details.
+# Apache ZooKeeper is not present.  See config/self-managed/README.md for 
details.
 #
 
 ############################# Server Basics #############################
 
-# The role of this server. Setting this puts us in kip-500 mode
+# The role of this server. Setting this puts us in self-managed mode
 process.roles=broker
 
 # The node id associated with this instance's roles
@@ -71,7 +71,7 @@ socket.request.max.bytes=104857600
 ############################# Log Basics #############################
 
 # A comma separated list of directories under which to store log files
-log.dirs=/tmp/nozk-broker-logs
+log.dirs=/tmp/self-managed-broker-logs
 
 # The default number of log partitions per topic. More partitions allow greater
 # parallelism for consumption, but this will also result in more files across
diff --git a/config/nozk-controller.properties 
b/config/self-managed/controller.properties
similarity index 96%
rename from config/nozk-controller.properties
rename to config/self-managed/controller.properties
index 292a26c..9dcddaf 100644
--- a/config/nozk-controller.properties
+++ b/config/self-managed/controller.properties
@@ -15,12 +15,12 @@
 
 #
 # This configuration file is intended for use in self-managed mode, where
-# Apache ZooKeeper is not present.  See KIP-500.md for details.
+# Apache ZooKeeper is not present.  See config/self-managed/README.md for 
details.
 #
 
 ############################# Server Basics #############################
 
-# The role of this server. Setting this puts us in kip-500 mode
+# The role of this server. Setting this puts us in self-managed mode
 process.roles=controller
 
 # The node id associated with this instance's roles
@@ -70,7 +70,7 @@ socket.request.max.bytes=104857600
 ############################# Log Basics #############################
 
 # A comma separated list of directories under which to store log files
-log.dirs=/tmp/nozk-controller-logs
+log.dirs=/tmp/self-managed-controller-logs
 
 # The default number of log partitions per topic. More partitions allow greater
 # parallelism for consumption, but this will also result in more files across
diff --git a/config/nozk-combined.properties 
b/config/self-managed/server.properties
similarity index 96%
rename from config/nozk-combined.properties
rename to config/self-managed/server.properties
index d680d39..acdcf4e 100644
--- a/config/nozk-combined.properties
+++ b/config/self-managed/server.properties
@@ -15,12 +15,12 @@
 
 #
 # This configuration file is intended for use in self-managed mode, where
-# Apache ZooKeeper is not present.  See KIP-500.md for details.
+# Apache ZooKeeper is not present.  See config/self-managed/README.md for 
details.
 #
 
 ############################# Server Basics #############################
 
-# The role of this server. Setting this puts us in kip-500 mode
+# The role of this server. Setting this puts us in self-managed mode
 process.roles=broker,controller
 
 # The node id associated with this instance's roles
@@ -71,7 +71,7 @@ socket.request.max.bytes=104857600
 ############################# Log Basics #############################
 
 # A comma separated list of directories under which to store log files
-log.dirs=/tmp/nozk-combined-logs
+log.dirs=/tmp/self-managed-combined-logs
 
 # The default number of log partitions per topic. More partitions allow greater
 # parallelism for consumption, but this will also result in more files across
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala 
b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index ebf3e74..51212a7 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -99,7 +99,7 @@ class DefaultApiVersionManager(
     }
 
     // This is a temporary workaround in order to allow testing of forwarding
-    // in integration tests. We can remove this after the KIP-500 controller
+    // in integration tests. We can remove this after the self-managed 
controller
     // is available for integration testing.
     if (forwardingManager.isDefined) {
       response.data.apiKeys.add(
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 0ce1975..0581546 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -51,7 +51,7 @@ import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 /**
- * A KIP-500 Kafka broker.
+ * A self-managed Kafka broker.
  */
 class BrokerServer(
                     val config: KafkaConfig,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 18ea2c4..f99cd68 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.server.authorizer.Authorizer
 import scala.jdk.CollectionConverters._
 
 /**
- * A KIP-500 Kafka controller.
+ * A self-managed Kafka controller.
  */
 class ControllerServer(
                         val metaProperties: MetaProperties,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 39a93aa..023c8ea 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -74,7 +74,7 @@ object Defaults {
   val BrokerHeartbeatIntervalMs = 2000
   val BrokerSessionTimeoutMs = 9000
 
-  /** KIP-500 Configuration */
+  /** Self-managed mode configs */
   val EmptyNodeId: Int = -1
 
   /************* Authorizer Configuration ***********/
@@ -370,7 +370,7 @@ object KafkaConfig {
   val ConnectionSetupTimeoutMsProp = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMaxMsProp = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
 
-  /** KIP-500 Configuration */
+  /** Self-managed mode configs */
   val ProcessRolesProp = "process.roles"
   val InitialBrokerRegistrationTimeoutMsProp = 
"initial.broker.registration.timeout.ms"
   val BrokerHeartbeatIntervalMsProp = "broker.heartbeat.interval.ms"
@@ -663,19 +663,18 @@ object KafkaConfig {
   val ConnectionSetupTimeoutMsDoc = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC
   val ConnectionSetupTimeoutMaxMsDoc = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC
 
-  /** KIP-500 Config Documentation */
+  /** Self-managed mode configs */
   val ProcessRolesDoc = "The roles that this process plays: 'broker', 
'controller', or 'broker,controller' if it is both. " +
-    "This configuration is only for clusters upgraded for KIP-500, which 
replaces the dependence on Zookeeper with " +
-    "a self-managed Raft quorum. Leave this config undefined or empty for 
Zookeeper clusters."
+    "This configuration is only for clusters in self-managed mode, which rely 
on a Raft quorum instead of ZooKeeper. Leave this config undefined or empty for 
Zookeeper clusters."
   val InitialBrokerRegistrationTimeoutMsDoc = "When initially registering with 
the controller quorum, the number of milliseconds to wait before declaring 
failure and exiting the broker process."
-  val BrokerHeartbeatIntervalMsDoc = "The length of time in milliseconds 
between broker heartbeats. Used when running in KIP-500 mode."
-  val BrokerSessionTimeoutMsDoc = "The length of time in milliseconds that a 
broker lease lasts if no heartbeats are made. Used when running in KIP-500 
mode."
+  val BrokerHeartbeatIntervalMsDoc = "The length of time in milliseconds 
between broker heartbeats. Used when running in self-managed mode."
+  val BrokerSessionTimeoutMsDoc = "The length of time in milliseconds that a 
broker lease lasts if no heartbeats are made. Used when running in self-managed 
mode."
   val NodeIdDoc = "The node ID associated with the roles this process is 
playing when `process.roles` is non-empty. " +
     "This is required configuration when the self-managed quorum is enabled."
-  val MetadataLogDirDoc = "This configuration determines where we put the 
metadata log for clusters upgraded to " +
-    "KIP-500. If it is not set, the metadata log is placed in the first log 
directory from log.dirs."
-  val ControllerListenerNamesDoc = "A comma-separated list of the names of the 
listeners used by the KIP-500 controller. This is required " +
-    "if this process is a KIP-500 controller. The ZK-based controller will not 
use this configuration."
+  val MetadataLogDirDoc = "This configuration determines where we put the 
metadata log for clusters in self-managed mode. " +
+    "If it is not set, the metadata log is placed in the first log directory 
from log.dirs."
+  val ControllerListenerNamesDoc = "A comma-separated list of the names of the 
listeners used by the self-managed controller. This is required " +
+    "if the process is part of a self-managed cluster. The ZK-based controller 
will not use this configuration."
   val SaslMechanismControllerProtocolDoc = "SASL mechanism used for 
communication with controllers. Default is GSSAPI."
 
   /************* Authorizer Configuration ***********/
@@ -1073,8 +1072,7 @@ object KafkaConfig {
       .define(ConnectionSetupTimeoutMaxMsProp, LONG, 
Defaults.ConnectionSetupTimeoutMaxMs, MEDIUM, ConnectionSetupTimeoutMaxMsDoc)
 
       /*
-       * KIP-500 Configuration. Note that these configs are defined as 
internal. We will make
-       * them public once we are ready to enable KIP-500 in a release.
+       * Self-managed mode configs. Note that these configs are defined as 
internal. We will make them public in the 3.0.0 release.
        */
       .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), 
ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
       .defineInternal(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, 
NodeIdDoc)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 7ec4622..14e5d3a 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -33,14 +33,13 @@ import org.apache.kafka.raft.metadata.{MetaLogRaftShim, 
MetadataRecordSerde}
 import scala.collection.Seq
 
 /**
- * This class implements the KIP-500 server which relies on a self-managed
+ * This class implements the self-managed mode server (aka KIP-500) which 
relies on a
  * Raft quorum for maintaining cluster metadata. It is responsible for
  * constructing the controller and/or broker based on the `process.roles`
  * configuration and for managing their basic lifecycle (startup and shutdown).
  *
- * Note that this server is a work in progress and relies on stubbed
- * implementations of the controller [[ControllerServer]] and broker
- * [[BrokerServer]].
+ * Note that this server is a work in progress and we are releasing it as
+ * early access in 2.8.0.
  */
 class KafkaRaftServer(
   config: KafkaConfig,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9951d58..d41adb2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -279,7 +279,7 @@ class KafkaServer(
         // Delay starting processors until the end of the initialization 
sequence to ensure
         // that credentials have been loaded before processing authentications.
         //
-        // Note that we allow the use of KIP-500 controller APIs when 
forwarding is enabled
+        // Note that we allow the use of self-managed mode controller APIs 
when forwarding is enabled
         // so that the Envelope request is exposed. This is only used in 
testing currently.
         socketServer = new SocketServer(config, metrics, time, 
credentialProvider, apiVersionManager)
         socketServer.startup(startProcessingRequests = false)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 820af7b..917cab8 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1861,7 +1861,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     if (sendZkNotification)
       if (zkClient.isEmpty) {
-        warn("Unable to propagate log dir failure via Zookeeper in KIP-500 
mode") // will be handled via KIP-589
+        warn("Unable to propagate log dir failure via Zookeeper in 
self-managed mode")
       } else {
         zkClient.get.propagateLogDirEvent(localBrokerId)
       }
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 549019d..12a2f69 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -64,17 +64,17 @@ object StorageTool extends Logging {
       command match {
         case "info" =>
           val directories = configToLogDirectories(config.get)
-          val kip500Mode = configToKip500Mode(config.get)
-          Exit.exit(infoCommand(System.out, kip500Mode, directories))
+          val selfManagedMode = configToSelfManagedMode(config.get)
+          Exit.exit(infoCommand(System.out, selfManagedMode, directories))
 
         case "format" =>
           val directories = configToLogDirectories(config.get)
           val clusterId = namespace.getString("cluster_id")
           val metaProperties = buildMetadataProperties(clusterId, config.get)
           val ignoreFormatted = namespace.getBoolean("ignore_formatted")
-          if (!configToKip500Mode(config.get)) {
+          if (!configToSelfManagedMode(config.get)) {
             throw new TerseFailure("The kafka configuration file appears to be 
for " +
-              "a legacy cluster. Formatting is only supported for kip-500 
clusters.")
+              "a legacy cluster. Formatting is only supported for clusters in 
self-managed mode.")
           }
           Exit.exit(formatCommand(System.out, directories, metaProperties, 
ignoreFormatted ))
 
@@ -99,9 +99,9 @@ object StorageTool extends Logging {
     directories.toSeq
   }
 
-  def configToKip500Mode(config: KafkaConfig): Boolean = 
config.processRoles.nonEmpty
+  def configToSelfManagedMode(config: KafkaConfig): Boolean = 
config.processRoles.nonEmpty
 
-  def infoCommand(stream: PrintStream, kip500Mode: Boolean, directories: 
Seq[String]): Int = {
+  def infoCommand(stream: PrintStream, selfManagedMode: Boolean, directories: 
Seq[String]): Int = {
     val problems = new mutable.ArrayBuffer[String]
     val foundDirectories = new mutable.ArrayBuffer[String]
     var prevMetadata: Option[RawMetaProperties] = None
@@ -142,14 +142,14 @@ object StorageTool extends Logging {
     })
 
     prevMetadata.foreach { prev =>
-      if (kip500Mode) {
+      if (selfManagedMode) {
         if (prev.version == 0) {
-          problems += "The kafka configuration file appears to be for a 
kip-500 cluster, but " +
+          problems += "The kafka configuration file appears to be for a 
cluster in self-managed mode, but " +
             "the directories are formatted for legacy mode."
         }
       } else if (prev.version == 1) {
         problems += "The kafka configuration file appears to be for a legacy 
cluster, but " +
-          "the directories are formatted for kip-500."
+          "the directories are formatted for a cluster in self-managed mode."
       }
     }
 
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala 
b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 6a3eb31..d54b0a0 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -253,7 +253,7 @@ class ApiVersionTest {
       ListenerType.ZK_BROKER
     )
 
-    // Ensure that APIs needed for the internal metadata quorum (KIP-500)
+    // Ensure that APIs needed for the self-managed mode (aka KIP-500)
     // are not exposed through ApiVersions until we are ready for them
     val exposedApis = apiKeysInResponse(response)
     assertFalse(exposedApis.contains(ApiKeys.ENVELOPE))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 987a1fd..b9e331d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -645,7 +645,7 @@ class KafkaConfigTest {
         case KafkaConfig.ConnectionSetupTimeoutMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case KafkaConfig.ConnectionSetupTimeoutMaxMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
 
-          // KIP-500 Configurations
+          // Self-managed mode configs
         case KafkaConfig.ProcessRolesProp => // ignore
         case KafkaConfig.InitialBrokerRegistrationTimeoutMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case KafkaConfig.BrokerHeartbeatIntervalMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 787c767..c28ceee 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -32,7 +32,7 @@ import org.junit.jupiter.api.{Test, Timeout}
 
 @Timeout(value = 40)
 class StorageToolTest {
-  private def newKip500Properties() = {
+  private def newSelfManagedProperties() = {
     val properties = new Properties()
     properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo,/tmp/bar")
     properties.setProperty(KafkaConfig.ProcessRolesProp, "controller")
@@ -42,13 +42,13 @@ class StorageToolTest {
 
   @Test
   def testConfigToLogDirectories(): Unit = {
-    val config = new KafkaConfig(newKip500Properties())
+    val config = new KafkaConfig(newSelfManagedProperties())
     assertEquals(Seq("/tmp/bar", "/tmp/foo"), 
StorageTool.configToLogDirectories(config))
   }
 
   @Test
   def testConfigToLogDirectoriesWithMetaLogDir(): Unit = {
-    val properties = newKip500Properties()
+    val properties = newSelfManagedProperties()
     properties.setProperty(KafkaConfig.MetadataLogDirProp, "/tmp/baz")
     val config = new KafkaConfig(properties)
     assertEquals(Seq("/tmp/bar", "/tmp/baz", "/tmp/foo"),
@@ -119,14 +119,14 @@ Found problem:
 Found metadata: {cluster.id=XcZZOzUqS4yHOjhMQB6JLQ, version=1}
 
 Found problem:
-  The kafka configuration file appears to be for a legacy cluster, but the 
directories are formatted for kip-500.
+  The kafka configuration file appears to be for a legacy cluster, but the 
directories are formatted for a cluster in self-managed mode.
 
 """, stream.toString())
     } finally Utils.delete(tempDir)
   }
 
   @Test
-  def testInfoWithMismatchedKip500KafkaConfig(): Unit = {
+  def testInfoWithMismatchedSelfManagedKafkaConfig(): Unit = {
     val stream = new ByteArrayOutputStream()
     val tempDir = TestUtils.tempDir()
     try {
@@ -144,7 +144,7 @@ Found problem:
 Found metadata: {broker.id=1, cluster.id=26c36907-4158-4a35-919d-6534229f5241, 
version=0}
 
 Found problem:
-  The kafka configuration file appears to be for a kip-500 cluster, but the 
directories are formatted for legacy mode.
+  The kafka configuration file appears to be for a cluster in self-managed 
mode, but the directories are formatted for legacy mode.
 
 """, stream.toString())
     } finally Utils.delete(tempDir)
@@ -177,7 +177,7 @@ Found problem:
 
   @Test
   def testFormatWithInvalidClusterId(): Unit = {
-    val config = new KafkaConfig(newKip500Properties())
+    val config = new KafkaConfig(newSelfManagedProperties())
     assertEquals("Cluster ID string invalid does not appear to be a valid 
UUID: " +
       "Input string `invalid` decoded as 5 bytes, which is not equal to the 
expected " +
         "16 bytes of a base64-encoded UUID", 
assertThrows(classOf[TerseFailure],
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index eb73eef..9c86f25 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -82,7 +82,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 
 /**
- * QuorumController implements the main logic of the KIP-500 controller.
+ * QuorumController implements the main logic of the self-managed controller 
(aka KIP-500).
  *
  * The node which is the leader of the metadata log becomes the active 
controller.  All
  * other nodes remain in standby mode.  Standby controllers cannot create new 
metadata log
diff --git a/tests/kafkatest/services/kafka/quorum.py 
b/tests/kafkatest/services/kafka/quorum.py
index 7153463..6d59176 100644
--- a/tests/kafkatest/services/kafka/quorum.py
+++ b/tests/kafkatest/services/kafka/quorum.py
@@ -15,8 +15,8 @@
 
 # the types of metadata quorums we support
 zk = 'ZK' # ZooKeeper, used before/during the KIP-500 bridge release(s)
-colocated_raft = 'COLOCATED_RAFT' # co-located KIP-500 Controllers, used 
during/after the KIP-500 bridge release(s)
-remote_raft = 'REMOTE_RAFT' # separate KIP-500 Controllers, used during/after 
the KIP-500 bridge release(s)
+colocated_raft = 'COLOCATED_RAFT' # co-located self-managed Controllers, used 
during/after the KIP-500 bridge release(s)
+remote_raft = 'REMOTE_RAFT' # separate self-managed Controllers, used 
during/after the KIP-500 bridge release(s)
 
 # How we will parameterize tests that exercise all quorum styles
 #   [“ZK”, “REMOTE_RAFT”, "COLOCATED_RAFT"] during the KIP-500 bridge 
release(s)
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py 
b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index e04c65e..49e4aeb 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -134,7 +134,6 @@ class ClientCompatibilityFeaturesTest(Test):
         self.kafka.start()
         features = get_broker_features(broker_version)
         if not self.zk:
-            #  this check/disabling is only necessary due to the fact that we 
are in early access mode with
-            #  KIP-500 and we should remove the special casing when that his 
fully implemented
+            #  The self-managed mode doesn't support acls yet, we should 
remove this once it does
             features["describe-acls-supported"] = False
         self.invoke_compatibility_program(features)

Reply via email to