This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 46f1f0268be KAFKA-17137 Add integration tests for admin client
(Transaction and UserScramCredentials related) (#16652)
46f1f0268be is described below
commit 46f1f0268bec2eb8c5de112c3edbf3d758e681f4
Author: xijiu <[email protected]>
AuthorDate: Wed Aug 7 01:11:55 2024 +0800
KAFKA-17137 Add integration tests for admin client (Transaction and
UserScramCredentials related) (#16652)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 366 ++++++++++++++++++++-
1 file changed, 364 insertions(+), 2 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index ce5606fbd9d..3c74c9df1ac 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer,
ProducerConfig, Produce
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding,
AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig,
SslConfigs, TopicConfig}
import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.requests.{DeleteRecordsRequest,
MetadataResponse}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourceType}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer}
@@ -52,7 +53,7 @@ import org.apache.kafka.server.config.{QuotaConfigs,
ServerConfigs, ServerLogCon
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo,
Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.slf4j.LoggerFactory
@@ -94,6 +95,367 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName = "tom"
+ client.alterUserScramCredentials(Collections.singletonList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
client.describeUserScramCredentials().all().get().size() == 1,
+ "Add one user scram credential timeout")
+
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
client.describeUserScramCredentials().all().get().size() == 3,
+ "Add user scram credential timeout")
+
+ // alter user info
+ client.alterUserScramCredentials(Collections.singletonList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+
client.describeUserScramCredentials().all().get().get(targetUserName).credentialInfos().size()
== 2
+ }, "Alter user scram credential timeout")
+
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+
+ // test describeUserScramCredentials(List<String> users)
+ val userAndScramMap =
client.describeUserScramCredentials(Collections.singletonList("tom2")).all().get()
+ assertEquals(1, userAndScramMap.size())
+ val scram = userAndScramMap.get("tom2")
+ assertNotNull(scram)
+ val credentialInfo = scram.credentialInfos().get(0)
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialInfo.mechanism())
+ assertEquals(4096, credentialInfo.iterations())
+ }
+
+ private def createInvalidAdminClient(): Admin = {
+ val config = createConfig
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
s"localhost:${TestUtils.IncorrectBrokerPort}")
+ Admin.create(config)
+ }
+
+ @ParameterizedTest
+ @Timeout(10)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentialsTimeout(quorum: String): Unit = {
+ client = createInvalidAdminClient()
+ try {
+ // test describeUserScramCredentials(List<String> users,
DescribeUserScramCredentialsOptions options)
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ client.describeUserScramCredentials(Collections.singletonList("tom4"),
+ new DescribeUserScramCredentialsOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally client.close(time.Duration.ZERO)
+ }
+
+ private def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(topicPartition))
+ consumer.seekToBeginning(Collections.singleton(topicPartition))
+ var consumeNum = 0
+ TestUtils.waitUntilTrue(() => {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ consumeNum >= expectedNumber
+ }, "consumeToExpectedNumber timeout")
+ } finally consumer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ client.createTopics(Collections.singletonList(new NewTopic(topic, 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ topic, partition, i.toString.getBytes, i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ topic, partition, i.toString.getBytes, i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def queryProducerDetail() = client
+ .describeProducers(Collections.singletonList(topicPartition))
+ .partitionResult(topicPartition).get().activeProducers().asScala
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ try {
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ } finally ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ client.createTopics(Collections.singletonList(new NewTopic(topic, 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def describeTransactions(): TransactionDescription = {
+
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get()
+ }
+ def transactionState(): TransactionState = {
+ describeTransactions().state()
+ }
+
+ def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+ // calculate the transaction partition id
+ val transactionPartitionId = Utils.abs(transactionId.hashCode) %
+
brokers.head.metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME).get
+ val transactionTopic =
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+ val partitionList =
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+ partitionList.asScala.filter(tp => tp.partition() ==
transactionPartitionId).head.leader().id()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+ assertInstanceOf(classOf[TransactionalIdNotFoundException],
exception.getCause)
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+
+ TestUtils.waitUntilTrue(() =>
describeTransactions().topicPartitions().size() == 1, "Describe transactions
timeout")
+ val transactionResult = describeTransactions()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.producerId())
+ assertEquals(0, transactionResult.producerEpoch())
+ assertEquals(Collections.singleton(topicPartition),
transactionResult.topicPartitions())
+
+ producer.commitTransaction()
+ val state = transactionState()
+ // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+ assertTrue(state == TransactionState.PREPARE_COMMIT || state ==
TransactionState.COMPLETE_COMMIT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+ } finally producer.close()
+
+ // abort case
+ transactionId = "foo2"
+ val abortProducer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ abortProducer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ abortProducer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ val transactionResult = describeTransactions()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.topicPartitions().size())
+
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, "k1".getBytes, "v1".getBytes()))
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, "k2".getBytes, "v2".getBytes()))
+ abortProducer.flush()
+
+ val transactionSendMsgResult = describeTransactions()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionSendMsgResult.coordinatorId())
+ assertEquals(Collections.singleton(topicPartition),
transactionSendMsgResult.topicPartitions())
+ assertEquals(topicPartition,
transactionSendMsgResult.topicPartitions().asScala.head)
+
+ assertEquals(TransactionState.ONGOING, transactionState())
+ abortProducer.abortTransaction()
+ val state = transactionState()
+ assertTrue(state == TransactionState.PREPARE_ABORT || state ==
TransactionState.COMPLETE_ABORT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+ } finally abortProducer.close()
+ }
+
+ @ParameterizedTest
+ @Timeout(10)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactionsTimeout(quorum: String): Unit = {
+ client = createInvalidAdminClient()
+ try {
+ val transactionId = "foo"
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ client.describeTransactions(Collections.singleton(transactionId),
+ new
DescribeTransactionsOptions().timeoutMs(0)).description(transactionId).get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally client.close(time.Duration.ZERO)
+ }
+
+ @ParameterizedTest
+ @Timeout(10)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAbortTransactionTimeout(quorum: String): Unit = {
+ client = createInvalidAdminClient()
+ try {
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ client.abortTransaction(
+ new AbortTransactionSpec(topicPartition, 1, 1, 1),
+ new AbortTransactionOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally client.close(time.Duration.ZERO)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTransactions(quorum: String): Unit = {
+ def createTransactionList(): Unit = {
+ client = createAdminClient
+ client.createTopics(Collections.singletonList(new NewTopic(topic, 1,
1.toShort))).all().get()
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+ } finally producer.close()
+
+ val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+ try {
+ producer2.initTransactions()
+ producer2.beginTransaction()
+ producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, "k1".getBytes, "v1".getBytes()))
+ producer2.flush()
+ producer2.abortTransaction()
+ } finally producer2.close()
+
+ val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+ try {
+ producer3.initTransactions()
+ producer3.beginTransaction()
+ producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, "k1".getBytes, "v1".getBytes()))
+ producer3.flush()
+ producer3.commitTransaction()
+ } finally producer3.close()
+
+ consumeToExpectedNumber(2)
+ }
+
+ createTransactionList()
+
+ assertEquals(3, client.listTransactions().all().get().size())
+ assertEquals(2, client.listTransactions(new ListTransactionsOptions()
+
.filterStates(Collections.singletonList(TransactionState.COMPLETE_COMMIT))).all().get().size())
+ assertEquals(1, client.listTransactions(new ListTransactionsOptions()
+
.filterStates(Collections.singletonList(TransactionState.COMPLETE_ABORT))).all().get().size())
+ assertEquals(1, client.listTransactions(new ListTransactionsOptions()
+ .filterProducerIds(Collections.singletonList(0L))).all().get().size())
+
+ // ensure all transaction's txnStartTimestamp >= 500
+ Thread.sleep(501)
+ assertEquals(3, client.listTransactions(new
ListTransactionsOptions().filterOnDuration(500)).all().get().size())
+
+ val producerNew = TestUtils.createTransactionalProducer("foo4", brokers)
+ try {
+ producerNew.initTransactions()
+ producerNew.beginTransaction()
+ producerNew.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, "k1".getBytes, "v1".getBytes()))
+ producerNew.flush()
+ val transactionList = client.listTransactions(new
ListTransactionsOptions().filterOnDuration(500)).all().get()
+ // current transaction start time is now, so transactionList size is
still 3
+ assertEquals(3, transactionList.size())
+ // transactionList not contains 'foo4'
+ assertEquals(0, transactionList.asScala.count(t =>
t.transactionalId().equals("foo4")))
+ } finally producerNew.close()
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testAbortTransaction(quorum: String): Unit = {
@@ -599,7 +961,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// try a newCount which would be a decrease
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(1)).asJava, option)
-
+
var e = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidPartitionsException when newCount is a
decrease")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)