This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 112294334f4a2e539c37f8ea2a6064af7443351f Author: Colin Patrick McCabe <[email protected]> AuthorDate: Wed Jul 27 10:38:31 2022 -0700 MINOR: Convert some junit tests to kraft (#12443) Convert ProducerCompressionTest, MirrorMakerIntegrationTest, EdgeCaseRequestTest to kraft. Make it explicit that ServerShutdownTest#testControllerShutdownDuringSend is ZK-only. Reviewers: David Arthur <[email protected]> --- .../kafka/api/ProducerCompressionTest.scala | 49 +++++++++++----------- .../kafka/tools/MirrorMakerIntegrationTest.scala | 24 ++++++----- .../unit/kafka/server/EdgeCaseRequestTest.scala | 32 ++++++++------ .../unit/kafka/server/ServerShutdownTest.scala | 10 +++-- 4 files changed, 64 insertions(+), 51 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index ccdfe7d3d36..07d9ccb024f 100755 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -17,19 +17,19 @@ package kafka.api.test -import kafka.server.{KafkaConfig, KafkaServer} +import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness} import kafka.utils.TestUtils -import kafka.server.QuorumTestHarness import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.ByteArraySerializer import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{Arguments, MethodSource} +import org.junit.jupiter.params.provider.CsvSource import java.util.{Collections, Properties} -import scala.jdk.CollectionConverters._ class ProducerCompressionTest extends QuorumTestHarness { @@ -37,18 +37,18 @@ class ProducerCompressionTest extends QuorumTestHarness { private val topic = "topic" private val numRecords = 2000 - private var server: KafkaServer = null + private var broker: KafkaBroker = null @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - val props = TestUtils.createBrokerConfig(brokerId, zkConnect) - server = TestUtils.createServer(KafkaConfig.fromProps(props)) + val props = TestUtils.createBrokerConfig(brokerId, zkConnectOrNull) + broker = createBroker(new KafkaConfig(props)) } @AfterEach override def tearDown(): Unit = { - TestUtils.shutdownServers(Seq(server)) + TestUtils.shutdownServers(Seq(broker)) super.tearDown() } @@ -58,11 +58,18 @@ class ProducerCompressionTest extends QuorumTestHarness { * Compressed messages should be able to sent and consumed correctly */ @ParameterizedTest - @MethodSource(Array("parameters")) - def testCompression(compression: String): Unit = { + @CsvSource(value = Array( + "kraft,none", + "kraft,gzip", + "kraft,snappy", + "kraft,lz4", + "kraft,zstd", + "zk,gzip" + )) + def testCompression(quorum: String, compression: String): Unit = { val producerProps = new Properties() - val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(server)) + val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker)) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000") @@ -72,7 +79,13 @@ class ProducerCompressionTest extends QuorumTestHarness { try { // create topic - TestUtils.createTopic(zkClient, topic, 1, 1, List(server)) + val admin = TestUtils.createAdminClient(Seq(broker), + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + try { + TestUtils.createTopicWithAdmin(admin, topic, Seq(broker)) + } finally { + admin.close() + } val partition = 0 // prepare the messages @@ -103,15 +116,3 @@ class ProducerCompressionTest extends QuorumTestHarness { } } } - -object ProducerCompressionTest { - def parameters: java.util.stream.Stream[Arguments] = { - Seq( - Arguments.of("none"), - Arguments.of("gzip"), - Arguments.of("snappy"), - Arguments.of("lz4"), - Arguments.of("zstd") - ).asJava.stream() - } -} diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala index 4f673cdd60a..c64d25fe4e6 100644 --- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala @@ -18,26 +18,27 @@ package kafka.tools import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean - import scala.collection.Seq import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.tools.MirrorMaker.{ConsumerWrapper, MirrorMakerProducer, NoRecordsException} -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.kafka.common.utils.Exit -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource @deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0") class MirrorMakerIntegrationTest extends KafkaServerTestHarness { override def generateConfigs: Seq[KafkaConfig] = - TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties())) + TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(KafkaConfig.fromProps(_, new Properties())) val exited = new AtomicBoolean(false) @@ -57,8 +58,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness { } } - @Test - def testCommitOffsetsThrowTimeoutException(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCommitOffsetsThrowTimeoutException(quorum: String): Unit = { val consumerProps = new Properties consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group") consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") @@ -70,8 +72,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness { assertThrows(classOf[TimeoutException], () => mirrorMakerConsumer.commit()) } - @Test - def testCommitOffsetsRemoveNonExistentTopics(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCommitOffsetsRemoveNonExistentTopics(quorum: String): Unit = { val consumerProps = new Properties consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group") consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") @@ -85,8 +88,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness { assertTrue(mirrorMakerConsumer.offsets.isEmpty, "Offsets for non-existent topics should be removed") } - @Test - def testCommaSeparatedRegex(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCommaSeparatedRegex(quorum: String): Unit = { val topic = "new-topic" val msg = "a test message" diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala index 1a383a8fbcd..1bbde3ffb6b 100755 --- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala @@ -35,19 +35,20 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.ByteUtils import org.apache.kafka.common.{TopicPartition, requests} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ class EdgeCaseRequestTest extends KafkaServerTestHarness { def generateConfigs = { - val props = TestUtils.createBrokerConfig(1, zkConnect) + val props = TestUtils.createBrokerConfig(1, zkConnectOrNull) props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false") List(KafkaConfig.fromProps(props)) } - private def socketServer = servers.head.socketServer + private def socketServer = brokers.head.socketServer private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = { new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol))) @@ -116,8 +117,9 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { } } - @Test - def testProduceRequestWithNullClientId(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testProduceRequestWithNullClientId(quorum: String): Unit = { val topic = "topic" val topicPartition = new TopicPartition(topic, 0) val correlationId = -1 @@ -161,23 +163,27 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode), "There should be no error") } - @Test - def testHeaderOnlyRequest(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testHeaderOnlyRequest(quorum: String): Unit = { verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, 1)) } - @Test - def testInvalidApiKeyRequest(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testInvalidApiKeyRequest(quorum: String): Unit = { verifyDisconnect(requestHeaderBytes(-1, 0)) } - @Test - def testInvalidApiVersionRequest(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testInvalidApiVersionRequest(quorum: String): Unit = { verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, -1)) } - @Test - def testMalformedHeaderRequest(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testMalformedHeaderRequest(quorum: String): Unit = { val serializedBytes = { // Only send apiKey and apiVersion val buffer = ByteBuffer.allocate( diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 96aeac5fa61..70554d9427c 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -38,7 +38,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer} import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.BrokerState -import org.junit.jupiter.api.{BeforeEach, Disabled, Test, TestInfo, Timeout} +import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo, Timeout} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.function.Executable import org.junit.jupiter.params.ParameterizedTest @@ -251,9 +251,11 @@ class ServerShutdownTest extends KafkaServerTestHarness { } // Verify that if controller is in the midst of processing a request, shutdown completes - // without waiting for request timeout. - @Test - def testControllerShutdownDuringSend(): Unit = { + // without waiting for request timeout. Since this involves LeaderAndIsr request, it is + // ZK-only for now. + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk")) + def testControllerShutdownDuringSend(quorum: String): Unit = { val securityProtocol = SecurityProtocol.PLAINTEXT val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
