This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d00f0ecf1a1 KAFKA-18124 Remove zk migration from `RaftManagerTest`,
`BrokerLifecycleManagerTest`, `KafkaConfigTest`, and `ReplicaManagerTest`
(#17990)
d00f0ecf1a1 is described below
commit d00f0ecf1a1a082c97564f4b807e7a342472b57a
Author: Chia-Chuan Yu <[email protected]>
AuthorDate: Tue Dec 3 03:27:12 2024 +0800
KAFKA-18124 Remove zk migration from `RaftManagerTest`,
`BrokerLifecycleManagerTest`, `KafkaConfigTest`, and `ReplicaManagerTest`
(#17990)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../test/scala/unit/kafka/KafkaConfigTest.scala | 178 +---------------
.../scala/unit/kafka/raft/RaftManagerTest.scala | 80 +------
.../kafka/server/BrokerLifecycleManagerTest.scala | 54 +----
.../unit/kafka/server/ReplicaManagerTest.scala | 229 +++------------------
.../test/scala/unit/kafka/utils/TestUtils.scala | 4 +
5 files changed, 39 insertions(+), 506 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 07b52dd3c65..53789213ddd 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -29,13 +29,10 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs}
-import org.apache.kafka.server.config.ReplicationConfigs
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ZkConfigs}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
-import scala.jdk.CollectionConverters._
-
class KafkaConfigTest {
@BeforeEach
@@ -176,17 +173,13 @@ class KafkaConfigTest {
val controllerListener = "SASL_PLAINTEXT://localhost:9092"
val brokerListener = "PLAINTEXT://localhost:9093"
- if (hasBrokerRole || hasControllerRole) { // KRaft
- props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"SASL_PLAINTEXT")
- if (hasBrokerRole && hasControllerRole) {
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
s"$brokerListener,$controllerListener")
- } else if (hasControllerRole) {
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
controllerListener)
- } else if (hasBrokerRole) {
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, brokerListener)
- }
- } else { // ZK-based
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, brokerListener)
+ props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"SASL_PLAINTEXT")
+ if (hasBrokerRole && hasControllerRole) {
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
s"$brokerListener,$controllerListener")
+ } else if (hasControllerRole) {
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
controllerListener)
+ } else if (hasBrokerRole) {
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, brokerListener)
}
if (!(hasControllerRole & !hasBrokerRole)) { // not controller-only
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG,
"PLAINTEXT")
@@ -235,118 +228,6 @@ class KafkaConfigTest {
assertEquals(password,
config.getPassword(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value)
}
- private val booleanPropValueToSet = true
- private val stringPropValueToSet = "foo"
- private val passwordPropValueToSet = "ThePa$$word!"
- private val listPropValueToSet = List("A", "B")
-
- @Test
- def testZkSslClientEnable(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG,
"zookeeper.ssl.client.enable",
- "zookeeper.client.secure", booleanPropValueToSet, config =>
Some(config.zkSslClientEnable), booleanPropValueToSet, Some(false))
- }
-
- @Test
- def testZkSslKeyStoreLocation(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG,
"zookeeper.ssl.keystore.location",
- "zookeeper.ssl.keyStore.location", stringPropValueToSet, config =>
config.zkSslKeyStoreLocation, stringPropValueToSet)
- }
-
- @Test
- def testZkSslTrustStoreLocation(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG,
"zookeeper.ssl.truststore.location",
- "zookeeper.ssl.trustStore.location", stringPropValueToSet, config =>
config.zkSslTrustStoreLocation, stringPropValueToSet)
- }
-
- @Test
- def testZookeeperKeyStorePassword(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG,
"zookeeper.ssl.keystore.password",
- "zookeeper.ssl.keyStore.password", passwordPropValueToSet, config =>
config.zkSslKeyStorePassword, new Password(passwordPropValueToSet))
- }
-
- @Test
- def testZookeeperTrustStorePassword(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG,
"zookeeper.ssl.truststore.password",
- "zookeeper.ssl.trustStore.password", passwordPropValueToSet, config =>
config.zkSslTrustStorePassword, new Password(passwordPropValueToSet))
- }
-
- @Test
- def testZkSslKeyStoreType(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG,
"zookeeper.ssl.keystore.type",
- "zookeeper.ssl.keyStore.type", stringPropValueToSet, config =>
config.zkSslKeyStoreType, stringPropValueToSet)
- }
-
- @Test
- def testZkSslTrustStoreType(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG,
"zookeeper.ssl.truststore.type",
- "zookeeper.ssl.trustStore.type", stringPropValueToSet, config =>
config.zkSslTrustStoreType, stringPropValueToSet)
- }
-
- @Test
- def testZkSslProtocol(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_PROTOCOL_CONFIG, "zookeeper.ssl.protocol",
- "zookeeper.ssl.protocol", stringPropValueToSet, config =>
Some(config.ZkSslProtocol), stringPropValueToSet, Some("TLSv1.2"))
- }
-
- @Test
- def testZkSslEnabledProtocols(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG,
"zookeeper.ssl.enabled.protocols",
- "zookeeper.ssl.enabledProtocols", listPropValueToSet.mkString(","),
config => config.ZkSslEnabledProtocols, listPropValueToSet.asJava)
- }
-
- @Test
- def testZkSslCipherSuites(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG,
"zookeeper.ssl.cipher.suites",
- "zookeeper.ssl.ciphersuites", listPropValueToSet.mkString(","), config
=> config.ZkSslCipherSuites, listPropValueToSet.asJava)
- }
-
- @Test
- def testZkSslEndpointIdentificationAlgorithm(): Unit = {
- // this property is different than the others
- // because the system property values and the Kafka property values don't
match
- val kafkaPropName =
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
- assertEquals("zookeeper.ssl.endpoint.identification.algorithm",
kafkaPropName)
- val sysProp = "zookeeper.ssl.hostnameVerification"
- val expectedDefaultValue = "HTTPS"
- val propertiesFile = prepareDefaultConfig()
- // first make sure there is the correct default value
- val emptyConfig =
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
- assertNull(emptyConfig.originals.get(kafkaPropName)) // doesn't appear in
the originals
- assertEquals(expectedDefaultValue, emptyConfig.values.get(kafkaPropName))
// but default value appears in the values
- assertEquals(expectedDefaultValue,
emptyConfig.ZkSslEndpointIdentificationAlgorithm) // and has the correct
default value
- // next set system property alone
- Map("true" -> "HTTPS", "false" -> "").foreach { case (sysPropValue,
expected) =>
- try {
- System.setProperty(sysProp, sysPropValue)
- val config =
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
- assertNull(config.originals.get(kafkaPropName)) // doesn't appear in
the originals
- assertEquals(expectedDefaultValue, config.values.get(kafkaPropName))
// default value appears in the values
- assertEquals(expected, config.ZkSslEndpointIdentificationAlgorithm) //
system property impacts the ultimate value of the property
- } finally {
- System.clearProperty(sysProp)
- }
- }
- // finally set Kafka config alone
- List("https", "").foreach(expected => {
- val config =
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile,
"--override", s"$kafkaPropName=$expected")))
- assertEquals(expected, config.originals.get(kafkaPropName)) // appears
in the originals
- assertEquals(expected, config.values.get(kafkaPropName)) // appears in
the values
- assertEquals(expected, config.ZkSslEndpointIdentificationAlgorithm) //
is the ultimate value
- })
- }
-
- @Test
- def testZkSslCrlEnable(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG,
"zookeeper.ssl.crl.enable",
- "zookeeper.ssl.crl", booleanPropValueToSet, config =>
Some(config.ZkSslCrlEnable), booleanPropValueToSet, Some(false))
- }
-
- @Test
- def testZkSslOcspEnable(): Unit = {
- testZkConfig(ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG,
"zookeeper.ssl.ocsp.enable",
- "zookeeper.ssl.ocsp", booleanPropValueToSet, config =>
Some(config.ZkSslOcspEnable), booleanPropValueToSet, Some(false))
- }
-
@Test
def testConnectionsMaxReauthMsDefault(): Unit = {
val propertiesFile = prepareDefaultConfig()
@@ -362,49 +243,6 @@ class KafkaConfigTest {
assertEquals(expected,
config.valuesWithPrefixOverride("sasl_ssl.oauthbearer.").get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG).asInstanceOf[Long])
}
- private def testZkConfig[T, U](kafkaPropName: String,
- expectedKafkaPropName: String,
- sysPropName: String,
- propValueToSet: T,
- getPropValueFrom: KafkaConfig => Option[T],
- expectedPropertyValue: U,
- expectedDefaultValue: Option[T] = None): Unit
= {
- assertEquals(expectedKafkaPropName, kafkaPropName)
- val propertiesFile = prepareDefaultConfig()
- // first make sure there is the correct default value (if any)
- val emptyConfig =
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
- assertNull(emptyConfig.originals.get(kafkaPropName)) // doesn't appear in
the originals
- if (expectedDefaultValue.isDefined) {
- // confirm default value behavior
- assertEquals(expectedDefaultValue.get,
emptyConfig.values.get(kafkaPropName)) // default value appears in the values
- assertEquals(expectedDefaultValue.get,
getPropValueFrom(emptyConfig).get) // default value appears in the property
- } else {
- // confirm no default value behavior
- assertNull(emptyConfig.values.get(kafkaPropName)) // doesn't appear in
the values
- assertEquals(None, getPropValueFrom(emptyConfig)) // has no default value
- }
- // next set system property alone
- try {
- System.setProperty(sysPropName, s"$propValueToSet")
- // need to create a new Kafka config for the system property to be
recognized
- val config =
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
- assertNull(config.originals.get(kafkaPropName)) // doesn't appear in the
originals
- // confirm default value (if any) overridden by system property
- if (expectedDefaultValue.isDefined)
- assertEquals(expectedDefaultValue.get,
config.values.get(kafkaPropName)) // default value (different from system
property) appears in the values
- else
- assertNull(config.values.get(kafkaPropName)) // doesn't appear in the
values
- // confirm system property appears in the property
- assertEquals(Some(expectedPropertyValue), getPropValueFrom(config))
- } finally {
- System.clearProperty(sysPropName)
- }
- // finally set Kafka config alone
- val config =
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile,
"--override", s"$kafkaPropName=$propValueToSet")))
- assertEquals(expectedPropertyValue, config.values.get(kafkaPropName)) //
appears in the values
- assertEquals(Some(expectedPropertyValue), getPropValueFrom(config)) //
appears in the property
- }
-
def prepareDefaultConfig(): String = {
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
}
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 09b35318818..6391bb45c35 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.ProcessRole
-import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs,
ReplicationConfigs, ServerLogConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerLogConfigs}
import org.apache.kafka.server.fault.FaultHandler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -46,29 +46,6 @@ import scala.util.Using
import scala.jdk.CollectionConverters._
class RaftManagerTest {
- private def createZkBrokerConfig(
- migrationEnabled: Boolean,
- nodeId: Int,
- logDir: Seq[Path],
- metadataDir: Option[Path]
- ): KafkaConfig = {
- val props = new Properties
- logDir.foreach { value =>
- props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, value.toString)
- }
- if (migrationEnabled) {
- metadataDir.foreach { value =>
- props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, value.toString)
- }
- props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
- props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG,
s"$nodeId@localhost:9093")
- props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
- }
-
- props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
- props.setProperty(ServerConfigs.BROKER_ID_CONFIG, nodeId.toString)
- new KafkaConfig(props)
- }
private def createConfig(
processRoles: Set[ProcessRole],
@@ -245,61 +222,6 @@ class RaftManagerTest {
}
}
- @Test
- def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
- val logDirs = Seq(TestUtils.tempDir().toPath)
- val metadataLogDir = Some(TestUtils.tempDir().toPath)
- val nodeId = 1
- val config = createZkBrokerConfig(migrationEnabled = true, nodeId,
logDirs, metadataLogDir)
- createMetadataLog(config)
-
- KafkaRaftManager.maybeDeleteMetadataLogDir(config)
- assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false)
- }
-
- @Test
- def testNonMigratingZkBrokerDoesNotDeleteMetadataLog(): Unit = {
- val logDirs = Seq(TestUtils.tempDir().toPath)
- val metadataLogDir = Some(TestUtils.tempDir().toPath)
- val nodeId = 1
-
- val config = createZkBrokerConfig(migrationEnabled = false, nodeId,
logDirs, metadataLogDir)
-
- // Create the metadata log dir directly as if the broker was previously in
migration mode.
- // This simulates a misconfiguration after downgrade
- Files.createDirectory(metadataLogDir.get.resolve("__cluster_metadata-0"))
-
- val err = assertThrows(classOf[RuntimeException], () =>
KafkaRaftManager.maybeDeleteMetadataLogDir(config),
- "Should have not deleted the metadata log")
- assertEquals("Not deleting metadata log dir since migrations are not
enabled.", err.getMessage)
-
- assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = true)
- }
-
- @Test
- def testZkBrokerDoesNotDeleteSeparateLogDirs(): Unit = {
- val logDirs = Seq(TestUtils.tempDir().toPath, TestUtils.tempDir().toPath)
- val metadataLogDir = Some(TestUtils.tempDir().toPath)
- val nodeId = 1
- val config = createZkBrokerConfig(migrationEnabled = true, nodeId,
logDirs, metadataLogDir)
- createMetadataLog(config)
-
- KafkaRaftManager.maybeDeleteMetadataLogDir(config)
- assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false)
- }
-
- @Test
- def testZkBrokerDoesNotDeleteSameLogDir(): Unit = {
- val logDirs = Seq(TestUtils.tempDir().toPath, TestUtils.tempDir().toPath)
- val metadataLogDir = logDirs.headOption
- val nodeId = 1
- val config = createZkBrokerConfig(migrationEnabled = true, nodeId,
logDirs, metadataLogDir)
- createMetadataLog(config)
-
- KafkaRaftManager.maybeDeleteMetadataLogDir(config)
- assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false)
- }
-
@Test
def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = {
val logDirs = Seq(TestUtils.tempDir().toPath)
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 0f5d3f027e5..71bfbefa307 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -24,16 +24,11 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.{BrokerHeartbeatResponseData,
BrokerRegistrationResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest,
BrokerRegistrationResponse}
-import org.apache.kafka.metadata.{BrokerState, VersionRange}
+import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.BrokerFeatures
-import org.apache.kafka.server.common.{Features, KRaftVersion, MetadataVersion}
-import org.apache.kafka.server.common.MetadataVersion.{IBP_3_8_IV0,
IBP_3_9_IV0}
-import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerLogConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test, Timeout}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
import java.util.concurrent.{CompletableFuture, Future}
import scala.jdk.CollectionConverters._
@@ -61,15 +56,6 @@ class BrokerLifecycleManagerTest {
properties
}
- def migrationConfigProperties(ibp: MetadataVersion) = {
- val migrationConfigProperties = configProperties
-
migrationConfigProperties.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG,
"true")
- migrationConfigProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG,
"localhost:2181")
- migrationConfigProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG,
"")
-
migrationConfigProperties.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
ibp.toString)
- migrationConfigProperties
- }
-
@Test
def testCreateAndClose(): Unit = {
val context = new RegistrationTestContext(configProperties)
@@ -113,42 +99,6 @@ class BrokerLifecycleManagerTest {
}
}
- @ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testSuccessfulRegistrationDuringMigration(nonInitialKraftVersion:
Boolean): Unit = {
- val ibp = if (nonInitialKraftVersion) IBP_3_9_IV0 else IBP_3_8_IV0
- val context = new RegistrationTestContext(migrationConfigProperties(ibp))
- manager = new BrokerLifecycleManager(context.config, context.time,
"successful-registration-", isZkBroker = false,
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
- val controllerNode = new Node(3000, "localhost", 8021)
- context.controllerNodeProvider.node.set(controllerNode)
- val features =
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true)).asScala
-
- // Even though ZK brokers don't use "metadata.version" feature, we need to
overwrite it with our IBP as part of registration
- // so the KRaft controller can verify that all brokers are on the same IBP
before starting the migration.
- val featuresRemapped = features + (MetadataVersion.FEATURE_NAME ->
VersionRange.of(ibp.featureLevel(), ibp.featureLevel()))
-
- manager.start(() => context.highestMetadataOffset.get(),
- context.mockChannelManager, context.clusterId,
context.advertisedListeners,
- featuresRemapped.asJava, OptionalLong.of(10L))
- TestUtils.retry(60000) {
- assertEquals(1, context.mockChannelManager.unsentQueue.size)
- val sentBrokerRegistrationData =
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data()
- assertEquals(10L, sentBrokerRegistrationData.previousBrokerEpoch())
- assertEquals(ibp.featureLevel(),
sentBrokerRegistrationData.features().find(MetadataVersion.FEATURE_NAME).maxSupportedVersion())
- if (nonInitialKraftVersion) {
- val sentKraftVersion =
sentBrokerRegistrationData.features().find(KRaftVersion.FEATURE_NAME)
- assertEquals(Features.KRAFT_VERSION.minimumProduction(),
sentKraftVersion.minSupportedVersion())
- assertEquals(Features.KRAFT_VERSION.latestTesting(),
sentKraftVersion.maxSupportedVersion())
- }
- }
- context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
- new BrokerRegistrationResponseData().setBrokerEpoch(1000)),
controllerNode)
- TestUtils.retry(10000) {
- context.poll()
- assertEquals(1000L, manager.brokerEpoch)
- }
- }
-
@Test
def testRegistrationTimeout(): Unit = {
val context = new RegistrationTestContext(configProperties)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1ce3c7de525..42f7c6dd1b4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -29,7 +29,6 @@ import kafka.server.epoch.util.MockBlockingSender
import kafka.server.share.DelayedShareFetch
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{Pool, TestUtils}
-import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node,
TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.compress.Compression
@@ -51,7 +50,7 @@ import
org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.{Exit, LogContext, Time, Utils}
+import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
@@ -61,7 +60,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsem
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
-import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion,
OffsetAndEpoch, RequestLocal, StopPartition}
+import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion,
MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
@@ -136,7 +135,7 @@ class ReplicaManagerTest {
@BeforeEach
def setUp(): Unit = {
- val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+ val props = TestUtils.createBrokerConfig(1, TestUtils.MockKraftConnect)
config = KafkaConfig.fromProps(props)
alterPartitionManager = mock(classOf[AlterPartitionManager])
quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
@@ -177,7 +176,7 @@ class ReplicaManagerTest {
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
- metadataCache = MetadataCache.zkMetadataCache(config.brokerId,
config.interBrokerProtocolVersion),
+ metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager)
try {
@@ -195,7 +194,7 @@ class ReplicaManagerTest {
@Test
def testHighwaterMarkRelativeDirectoryMapping(): Unit = {
- val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+ val props = TestUtils.createBrokerConfig(1, TestUtils.MockKraftConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new
File(_)))
@@ -206,7 +205,7 @@ class ReplicaManagerTest {
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
- metadataCache = MetadataCache.zkMetadataCache(config.brokerId,
config.interBrokerProtocolVersion),
+ metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager)
try {
@@ -232,7 +231,7 @@ class ReplicaManagerTest {
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
- metadataCache = MetadataCache.zkMetadataCache(config.brokerId,
config.interBrokerProtocolVersion),
+ metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager,
threadNamePrefix = Option(this.getClass.getName))
@@ -272,7 +271,7 @@ class ReplicaManagerTest {
def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = {
val dir1 = TestUtils.tempDir()
val dir2 = TestUtils.tempDir()
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logManager = TestUtils.createLogManager(config.logDirs.map(new
File(_)), new LogConfig(new Properties()))
@@ -335,7 +334,7 @@ class ReplicaManagerTest {
def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean):
Unit = {
val dir1 = TestUtils.tempDir()
val dir2 = TestUtils.tempDir()
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logManager = TestUtils.createLogManager(config.logDirs.map(new
File(_)), new LogConfig(new Properties()))
@@ -407,7 +406,7 @@ class ReplicaManagerTest {
@Test
def testClearPurgatoryOnBecomingFollower(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logProps = new Properties()
@@ -493,7 +492,7 @@ class ReplicaManagerTest {
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
- metadataCache = MetadataCache.zkMetadataCache(config.brokerId,
config.interBrokerProtocolVersion),
+ metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, ()
=> KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager,
threadNamePrefix = Option(this.getClass.getName))
@@ -2522,7 +2521,7 @@ class ReplicaManagerTest {
@Test
def testDisabledTransactionVerification(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
props.put("transaction.partition.verification.enable", "false")
val config = KafkaConfig.fromProps(props)
@@ -2701,18 +2700,8 @@ class ReplicaManagerTest {
}
}
- @ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testFullLeaderAndIsrStrayPartitions(zkMigrationEnabled: Boolean): Unit =
{
- val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
- if (zkMigrationEnabled) {
- props.put(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "" + zkMigrationEnabled)
- props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9071")
- props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
- props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- config = KafkaConfig.fromProps(props)
- }
-
+ @Test
+ def testFullLeaderAndIsrStrayPartitions(): Unit = {
val logManager = TestUtils.createLogManager(config.logDirs.map(new
File(_)), defaultConfig = new LogConfig(new Properties()), time = time)
val quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
val replicaManager = new ReplicaManager(
@@ -2722,7 +2711,7 @@ class ReplicaManagerTest {
scheduler = time.scheduler,
logManager = logManager,
quotaManagers = quotaManager,
- metadataCache = MetadataCache.zkMetadataCache(config.brokerId,
config.interBrokerProtocolVersion),
+ metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager,
threadNamePrefix = Option(this.getClass.getName))
@@ -2772,11 +2761,7 @@ class ReplicaManagerTest {
val stray0 = replicaManager.getPartition(new
TopicPartition("hosted-stray", 0))
- if (zkMigrationEnabled) {
- assertEquals(HostedPartition.None, stray0)
- } else {
- assertTrue(stray0.isInstanceOf[HostedPartition.Online])
- }
+ assertTrue(stray0.isInstanceOf[HostedPartition.Online])
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]] (
() => {
@@ -2806,7 +2791,7 @@ class ReplicaManagerTest {
scheduler = time.scheduler,
logManager = logManager,
quotaManagers = quotaManager,
- metadataCache = MetadataCache.zkMetadataCache(config.brokerId,
config.interBrokerProtocolVersion),
+ metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager,
threadNamePrefix = Option(this.getClass.getName))
@@ -2912,7 +2897,7 @@ class ReplicaManagerTest {
leaderEpochFromLeader: Int =
3,
extraProps: Properties = new
Properties(),
topicId: Option[Uuid] =
None): (ReplicaManager, LogManager) = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
props.asScala ++= extraProps.asScala
val config = KafkaConfig.fromProps(props)
@@ -3383,7 +3368,7 @@ class ReplicaManagerTest {
buildRemoteLogAuxState: Boolean = false,
remoteFetchQuotaExceeded: Option[Boolean] = None
): ReplicaManager = {
- val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
+ val props = TestUtils.createBrokerConfig(brokerId,
TestUtils.MockKraftConnect)
val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
enableRemoteStorage.toString)
@@ -3744,8 +3729,8 @@ class ReplicaManagerTest {
private def prepareDifferentReplicaManagers(brokerTopicStats1:
BrokerTopicStats,
brokerTopicStats2:
BrokerTopicStats): (ReplicaManager, ReplicaManager) = {
- val props0 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
- val props1 = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+ val props0 = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
+ val props1 = TestUtils.createBrokerConfig(1, TestUtils.MockKraftConnect)
props0.put("log0.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
props1.put("log1.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
@@ -4632,7 +4617,7 @@ class ReplicaManagerTest {
def testReplicaNotAvailable(): Unit = {
def createReplicaManager(): ReplicaManager = {
- val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+ val props = TestUtils.createBrokerConfig(1, TestUtils.MockKraftConnect)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new
File(_)))
new ReplicaManager(
@@ -4642,7 +4627,7 @@ class ReplicaManagerTest {
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
- metadataCache = MetadataCache.zkMetadataCache(config.brokerId,
config.interBrokerProtocolVersion),
+ metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, ()
=> KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager) {
override def getPartitionOrException(topicPartition: TopicPartition):
Partition = {
@@ -5855,115 +5840,6 @@ class ReplicaManagerTest {
}
}
- @Test
- def testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithZkPath(): Unit = {
- val localId = 0
- val topicPartition = new TopicPartition("foo", 0)
-
- val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
- val replicaManager = setupReplicaManagerWithMockedPurgatories(
- timer = new MockTimer(time),
- brokerId = localId,
- aliveBrokerIds = Seq(localId, localId + 1, localId + 2),
- mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
- )
-
- try {
- when(mockReplicaFetcherManager.removeFetcherForPartitions(
- Set(topicPartition))
- ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
-
- // Make the local replica the follower.
- var request = makeLeaderAndIsrRequest(
- topicId = FOO_UUID,
- topicPartition = topicPartition,
- replicas = Seq(localId, localId + 1),
- leaderAndIsr = new LeaderAndIsr(
- localId + 1,
- 0,
- List(localId, localId + 1).map(Int.box).asJava,
- LeaderRecoveryState.RECOVERED,
- 0
- )
- )
-
- replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
-
- // Check the state of that partition.
- val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
- assertFalse(followerPartition.isLeader)
- assertEquals(0, followerPartition.getLeaderEpoch)
- assertEquals(0, followerPartition.getPartitionEpoch)
-
- // Verify that the partition was removed and added back.
-
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
-
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition ->
InitialFetchState(
- topicId = Some(FOO_UUID),
- leader = new BrokerEndPoint(localId + 1, s"host${localId + 1}",
localId + 1),
- currentLeaderEpoch = 0,
- initOffset = 0
- )))
-
- reset(mockReplicaFetcherManager)
-
- // Apply changes that bumps the partition epoch.
- request = makeLeaderAndIsrRequest(
- topicId = FOO_UUID,
- topicPartition = topicPartition,
- replicas = Seq(localId, localId + 1, localId + 2),
- leaderAndIsr = new LeaderAndIsr(
- localId + 1,
- 0,
- List(localId, localId + 1).map(Int.box).asJava,
- LeaderRecoveryState.RECOVERED,
- 1
- )
- )
-
- replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
-
- assertFalse(followerPartition.isLeader)
- assertEquals(0, followerPartition.getLeaderEpoch)
- // Partition updates is fenced based on the leader epoch on the ZK path.
- assertEquals(0, followerPartition.getPartitionEpoch)
-
- // As the update is fenced based on the leader epoch,
removeFetcherForPartitions and
- // addFetcherForPartitions are not called at all.
- reset(mockReplicaFetcherManager)
-
- // Apply changes that bumps the leader epoch.
- request = makeLeaderAndIsrRequest(
- topicId = FOO_UUID,
- topicPartition = topicPartition,
- replicas = Seq(localId, localId + 1, localId + 2),
- leaderAndIsr = new LeaderAndIsr(
- localId + 2,
- 1,
- List(localId, localId + 1, localId + 2).map(Int.box).asJava,
- LeaderRecoveryState.RECOVERED,
- 2
- )
- )
-
- replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
-
- assertFalse(followerPartition.isLeader)
- assertEquals(1, followerPartition.getLeaderEpoch)
- assertEquals(2, followerPartition.getPartitionEpoch)
-
- // Verify that the partition was removed and added back.
-
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
-
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition ->
InitialFetchState(
- topicId = Some(FOO_UUID),
- leader = new BrokerEndPoint(localId + 2, s"host${localId + 2}",
localId + 2),
- currentLeaderEpoch = 1,
- initOffset = 0
- )))
- } finally {
- replicaManager.shutdown(checkpointHW = false)
- }
- }
-
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def
testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithKRaftPath(enableRemoteStorage:
Boolean): Unit = {
@@ -6549,7 +6425,7 @@ class ReplicaManagerTest {
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
- metadataCache = MetadataCache.zkMetadataCache(config.brokerId,
config.interBrokerProtocolVersion),
+ metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager))
@@ -6693,63 +6569,6 @@ class ReplicaManagerTest {
}
}
- @Test
- def testMetadataLogDirFailureInZkShouldNotHaltBroker(): Unit = {
- // Given
- val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect,
logDirCount = 2)
- val config = KafkaConfig.fromProps(props)
- val logDirFiles = config.logDirs.map(new File(_))
- val logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
- val logManager = TestUtils.createLogManager(logDirFiles, defaultConfig =
new LogConfig(new Properties()), time = time)
- val mockZkClient = mock(classOf[KafkaZkClient])
- val replicaManager = new ReplicaManager(
- metrics = metrics,
- config = config,
- time = time,
- scheduler = time.scheduler,
- logManager = logManager,
- quotaManagers = quotaManager,
- metadataCache = MetadataCache.zkMetadataCache(config.brokerId,
config.interBrokerProtocolVersion),
- logDirFailureChannel = logDirFailureChannel,
- alterPartitionManager = alterPartitionManager,
- threadNamePrefix = Option(this.getClass.getName),
- zkClient = Option(mockZkClient),
- )
- try {
- logManager.startup(Set.empty[String])
- replicaManager.startup()
-
- Exit.setHaltProcedure((_, _) => fail("Test failure, broker should not
have halted"))
-
- // When
-
logDirFailureChannel.maybeAddOfflineLogDir(logDirFiles.head.getAbsolutePath,
"test failure", null)
-
- // Then
- TestUtils.retry(60000) {
- verify(mockZkClient).propagateLogDirEvent(config.brokerId)
- }
- } finally {
- Utils.tryAll(util.Arrays.asList[Callable[Void]](
- () => {
- replicaManager.shutdown(checkpointHW = false)
- null
- },
- () => {
- try {
- logManager.shutdown()
- } catch {
- case _: Exception =>
- }
- null
- },
- () => {
- quotaManager.shutdown()
- null
- }
- ))
- }
- }
-
@Test
def testRemoteReadQuotaExceeded(): Unit = {
when(mockRemoteLogManager.getFetchThrottleTimeMs).thenReturn(quotaExceededThrottleTime)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3bd95454c0d..24740207a6f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -106,6 +106,10 @@ object TestUtils extends Logging {
val MockZkPort = 1
/** ZooKeeper connection string to use for unit tests that mock/don't
require a real ZK server. */
val MockZkConnect = "127.0.0.1:" + MockZkPort
+
+ val MockKraftPort = 1
+
+ val MockKraftConnect = "127.0.0.1:" + MockKraftPort
// CN in SSL certificates - this is used for endpoint validation when enabled
val SslCertificateCn = "localhost"