This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 34a87d34770 KAFKA-19042 Move TransactionsWithMaxInFlightOneTest to
client-integration-tests module (#19289)
34a87d34770 is described below
commit 34a87d34770d1a203df68e30b54fc93392233f8d
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Apr 11 12:04:19 2025 +0800
KAFKA-19042 Move TransactionsWithMaxInFlightOneTest to
client-integration-tests module (#19289)
Use Java to rewrite `TransactionsWithMaxInFlightOneTest` by new test
infra and move it to client-integration-tests module.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../TransactionsWithMaxInFlightOneTest.java | 126 +++++++++++++++++++
.../api/TransactionsWithMaxInFlightOneTest.scala | 136 ---------------------
2 files changed, 126 insertions(+), 136 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsWithMaxInFlightOneTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsWithMaxInFlightOneTest.java
new file mode 100644
index 00000000000..fb27f9be4f0
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsWithMaxInFlightOneTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+ types = {Type.CO_KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key = "log.unclean.leader.election.enable",
value = "false"),
+ @ClusterConfigProperty(key =
ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
+ @ClusterConfigProperty(key =
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
value = "200")
+ }
+)
+public class TransactionsWithMaxInFlightOneTest {
+ private static final String TOPIC1 = "topic1";
+ private static final String TOPIC2 = "topic2";
+ private static final String HEADER_KEY = "transactionStatus";
+ private static final byte[] ABORTED_VALUE = "aborted".getBytes();
+ private static final byte[] COMMITTED_VALUE = "committed".getBytes();
+
+ @ClusterTest
+ public void
testTransactionalProducerSingleBrokerMaxInFlightOne(ClusterInstance
clusterInstance) throws InterruptedException {
+ // We want to test with one broker to verify multiple requests queued
on a connection
+ assertEquals(1, clusterInstance.brokers().size());
+
+ clusterInstance.createTopic(TOPIC1, 4, (short) 1);
+ clusterInstance.createTopic(TOPIC2, 4, (short) 1);
+
+ try (Producer<byte[], byte[]> producer =
clusterInstance.producer(Map.of(
+ ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-producer",
+ ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1
+ ))
+ ) {
+ producer.initTransactions();
+
+ producer.beginTransaction();
+ producer.send(new ProducerRecord<>(TOPIC2, null, "2".getBytes(),
"2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY,
ABORTED_VALUE))));
+ producer.send(new ProducerRecord<>(TOPIC1, null, "4".getBytes(),
"4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY,
ABORTED_VALUE))));
+ producer.flush();
+ producer.abortTransaction();
+
+ producer.beginTransaction();
+ producer.send(new ProducerRecord<>(TOPIC1, null, "1".getBytes(),
"1".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY,
COMMITTED_VALUE))));
+ producer.send(new ProducerRecord<>(TOPIC2, null, "3".getBytes(),
"3".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY,
COMMITTED_VALUE))));
+ producer.commitTransaction();
+
+ for (GroupProtocol groupProtocol :
clusterInstance.supportedGroupProtocols()) {
+ ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords =
new ArrayList<>();
+ try (Consumer<byte[], byte[]> consumer =
clusterInstance.consumer(Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name(),
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"
+ )
+ )) {
+ consumer.subscribe(List.of(TOPIC1, TOPIC2));
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(100));
+ records.forEach(consumerRecords::add);
+ return consumerRecords.size() == 2;
+ }, 15_000, () -> "Consumer with protocol " +
groupProtocol.name + " should consume 2 records, but get " +
consumerRecords.size());
+ }
+ consumerRecords.forEach(record -> {
+ Iterator<Header> headers =
record.headers().headers(HEADER_KEY).iterator();
+ assertTrue(headers.hasNext());
+ Header header = headers.next();
+ assertArrayEquals(COMMITTED_VALUE, header.value(), "Record
does not have the expected header value");
+ });
+ }
+ }
+ }
+}
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
deleted file mode 100644
index 56a2a7181ea..00000000000
---
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.util.Properties
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
-import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.utils.TestUtils.consumeRecords
-import org.apache.kafka.clients.consumer.Consumer
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
-import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs,
ServerLogConfigs}
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import scala.collection.{Seq, mutable}
-import scala.jdk.CollectionConverters._
-
-/**
- * This is used to test transactions with one broker and
`max.in.flight.requests.per.connection=1`.
- * A single broker is used to verify edge cases where different requests are
queued on the same connection.
- */
-class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
- val numBrokers = 1
-
- val topic1 = "topic1"
- val topic2 = "topic2"
- val numPartitions = 4
-
- val transactionalProducers = mutable.Buffer[KafkaProducer[Array[Byte],
Array[Byte]]]()
- val transactionalConsumers = mutable.Buffer[Consumer[Array[Byte],
Array[Byte]]]()
-
- override def generateConfigs: Seq[KafkaConfig] = {
- TestUtils.createBrokerConfigs(numBrokers).map(KafkaConfig.fromProps(_,
serverProps()))
- }
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- super.setUp(testInfo)
- val topicConfig = new Properties()
- topicConfig.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, 1.toString)
- createTopic(topic1, numPartitions, numBrokers, topicConfig)
- createTopic(topic2, numPartitions, numBrokers, topicConfig)
-
- createTransactionalProducer("transactional-producer")
- createReadCommittedConsumer("transactional-group")
- }
-
- @AfterEach
- override def tearDown(): Unit = {
- transactionalProducers.foreach(_.close())
- transactionalConsumers.foreach(_.close())
- super.tearDown()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testTransactionalProducerSingleBrokerMaxInFlightOne(groupProtocol:
String): Unit = {
- // We want to test with one broker to verify multiple requests queued on a
connection
- assertEquals(1, brokers.size)
-
- val producer = transactionalProducers.head
- val consumer = transactionalConsumers.head
-
- producer.initTransactions()
-
- producer.beginTransaction()
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2,
null, "2", "2", willBeCommitted = false))
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
null, "4", "4", willBeCommitted = false))
- producer.flush()
- producer.abortTransaction()
-
- producer.beginTransaction()
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
null, "1", "1", willBeCommitted = true))
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2,
null, "3", "3", willBeCommitted = true))
- producer.commitTransaction()
-
- consumer.subscribe(List(topic1, topic2).asJava)
-
- val records = consumeRecords(consumer, 2)
- records.foreach { record =>
- TestUtils.assertCommittedAndGetValue(record)
- }
- }
-
- private def serverProps() = {
- val serverProps = new Properties()
- serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
false.toString)
- serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
1.toString)
-
serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
1.toString)
- serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
1.toString)
-
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
1.toString)
- serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
1.toString)
- serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG,
true.toString)
- serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
false.toString)
- serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG,
false.toString)
-
serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"0")
-
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"200")
- serverProps
- }
-
- private def createReadCommittedConsumer(group: String) = {
- val consumer = TestUtils.createConsumer(bootstrapServers(),
- groupProtocolFromTestParameters(),
- groupId = group,
- enableAutoCommit = false,
- readCommitted = true)
- transactionalConsumers += consumer
- consumer
- }
-
- private def createTransactionalProducer(transactionalId: String):
KafkaProducer[Array[Byte], Array[Byte]] = {
- val producer = TestUtils.createTransactionalProducer(transactionalId,
brokers, maxInFlight = 1)
- transactionalProducers += producer
- producer
- }
-}