This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 34a87d34770 KAFKA-19042 Move TransactionsWithMaxInFlightOneTest to 
client-integration-tests module (#19289)
34a87d34770 is described below

commit 34a87d34770d1a203df68e30b54fc93392233f8d
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Apr 11 12:04:19 2025 +0800

    KAFKA-19042 Move TransactionsWithMaxInFlightOneTest to 
client-integration-tests module (#19289)
    
    Use Java to rewrite `TransactionsWithMaxInFlightOneTest` by new test
    infra and move it to client-integration-tests module.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../TransactionsWithMaxInFlightOneTest.java        | 126 +++++++++++++++++++
 .../api/TransactionsWithMaxInFlightOneTest.scala   | 136 ---------------------
 2 files changed, 126 insertions(+), 136 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsWithMaxInFlightOneTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsWithMaxInFlightOneTest.java
new file mode 100644
index 00000000000..fb27f9be4f0
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsWithMaxInFlightOneTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.CO_KRAFT},
+    serverProperties = {
+        @ClusterConfigProperty(key = 
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
+        @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+        @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+        @ClusterConfigProperty(key = 
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+        @ClusterConfigProperty(key = 
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+        @ClusterConfigProperty(key = 
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
+        @ClusterConfigProperty(key = 
ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"),
+        @ClusterConfigProperty(key = "log.unclean.leader.election.enable", 
value = "false"),
+        @ClusterConfigProperty(key = 
ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
+        @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
+        @ClusterConfigProperty(key = 
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 value = "200")
+    }
+)
+public class TransactionsWithMaxInFlightOneTest {
+    private static final String TOPIC1 = "topic1";
+    private static final String TOPIC2 = "topic2";
+    private static final String HEADER_KEY = "transactionStatus";
+    private static final byte[] ABORTED_VALUE = "aborted".getBytes();
+    private static final byte[] COMMITTED_VALUE = "committed".getBytes();
+
+    @ClusterTest
+    public void 
testTransactionalProducerSingleBrokerMaxInFlightOne(ClusterInstance 
clusterInstance) throws InterruptedException {
+        // We want to test with one broker to verify multiple requests queued 
on a connection
+        assertEquals(1, clusterInstance.brokers().size());
+
+        clusterInstance.createTopic(TOPIC1, 4, (short) 1);
+        clusterInstance.createTopic(TOPIC2, 4, (short) 1);
+
+        try (Producer<byte[], byte[]> producer = 
clusterInstance.producer(Map.of(
+            ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-producer",
+            ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1
+        ))
+        ) {
+            producer.initTransactions();
+
+            producer.beginTransaction();
+            producer.send(new ProducerRecord<>(TOPIC2, null, "2".getBytes(), 
"2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, 
ABORTED_VALUE))));
+            producer.send(new ProducerRecord<>(TOPIC1, null, "4".getBytes(), 
"4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, 
ABORTED_VALUE))));
+            producer.flush();
+            producer.abortTransaction();
+
+            producer.beginTransaction();
+            producer.send(new ProducerRecord<>(TOPIC1, null, "1".getBytes(), 
"1".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, 
COMMITTED_VALUE))));
+            producer.send(new ProducerRecord<>(TOPIC2, null, "3".getBytes(), 
"3".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, 
COMMITTED_VALUE))));
+            producer.commitTransaction();
+
+            for (GroupProtocol groupProtocol : 
clusterInstance.supportedGroupProtocols()) {
+                ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords = 
new ArrayList<>();
+                try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(Map.of(
+                        ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name(),
+                        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
+                        ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"
+                    )
+                )) {
+                    consumer.subscribe(List.of(TOPIC1, TOPIC2));
+                    TestUtils.waitForCondition(() -> {
+                        ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(100));
+                        records.forEach(consumerRecords::add);
+                        return consumerRecords.size() == 2;
+                    }, 15_000, () -> "Consumer with protocol " + 
groupProtocol.name + " should consume 2 records, but get " + 
consumerRecords.size());
+                }
+                consumerRecords.forEach(record -> {
+                    Iterator<Header> headers = 
record.headers().headers(HEADER_KEY).iterator();
+                    assertTrue(headers.hasNext());
+                    Header header = headers.next();
+                    assertArrayEquals(COMMITTED_VALUE, header.value(), "Record 
does not have the expected header value");
+                });
+            }
+        }
+    }
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
 
b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
deleted file mode 100644
index 56a2a7181ea..00000000000
--- 
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.api
-
-import java.util.Properties
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
-import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.utils.TestUtils.consumeRecords
-import org.apache.kafka.clients.consumer.Consumer
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
-import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, 
ServerLogConfigs}
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import scala.collection.{Seq, mutable}
-import scala.jdk.CollectionConverters._
-
-/**
- * This is used to test transactions with one broker and 
`max.in.flight.requests.per.connection=1`.
- * A single broker is used to verify edge cases where different requests are 
queued on the same connection.
- */
-class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness {
-  val numBrokers = 1
-
-  val topic1 = "topic1"
-  val topic2 = "topic2"
-  val numPartitions = 4
-
-  val transactionalProducers = mutable.Buffer[KafkaProducer[Array[Byte], 
Array[Byte]]]()
-  val transactionalConsumers = mutable.Buffer[Consumer[Array[Byte], 
Array[Byte]]]()
-
-  override def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(numBrokers).map(KafkaConfig.fromProps(_, 
serverProps()))
-  }
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    super.setUp(testInfo)
-    val topicConfig = new Properties()
-    topicConfig.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, 1.toString)
-    createTopic(topic1, numPartitions, numBrokers, topicConfig)
-    createTopic(topic2, numPartitions, numBrokers, topicConfig)
-
-    createTransactionalProducer("transactional-producer")
-    createReadCommittedConsumer("transactional-group")
-  }
-
-  @AfterEach
-  override def tearDown(): Unit = {
-    transactionalProducers.foreach(_.close())
-    transactionalConsumers.foreach(_.close())
-    super.tearDown()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testTransactionalProducerSingleBrokerMaxInFlightOne(groupProtocol: 
String): Unit = {
-    // We want to test with one broker to verify multiple requests queued on a 
connection
-    assertEquals(1, brokers.size)
-
-    val producer = transactionalProducers.head
-    val consumer = transactionalConsumers.head
-
-    producer.initTransactions()
-
-    producer.beginTransaction()
-    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 
null, "2", "2", willBeCommitted = false))
-    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 
null, "4", "4", willBeCommitted = false))
-    producer.flush()
-    producer.abortTransaction()
-
-    producer.beginTransaction()
-    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 
null, "1", "1", willBeCommitted = true))
-    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 
null, "3", "3", willBeCommitted = true))
-    producer.commitTransaction()
-
-    consumer.subscribe(List(topic1, topic2).asJava)
-
-    val records = consumeRecords(consumer, 2)
-    records.foreach { record =>
-      TestUtils.assertCommittedAndGetValue(record)
-    }
-  }
-
-  private def serverProps() = {
-    val serverProps = new Properties()
-    serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
false.toString)
-    serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
-    
serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
1.toString)
-    serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
-    
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 1.toString)
-    serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
1.toString)
-    serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, 
true.toString)
-    serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
false.toString)
-    serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, 
false.toString)
-    
serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"0")
-    
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "200")
-    serverProps
-  }
-
-  private def createReadCommittedConsumer(group: String) = {
-    val consumer = TestUtils.createConsumer(bootstrapServers(),
-      groupProtocolFromTestParameters(),
-      groupId = group,
-      enableAutoCommit = false,
-      readCommitted = true)
-    transactionalConsumers += consumer
-    consumer
-  }
-
-  private def createTransactionalProducer(transactionalId: String): 
KafkaProducer[Array[Byte], Array[Byte]] = {
-    val producer = TestUtils.createTransactionalProducer(transactionalId, 
brokers, maxInFlight = 1)
-    transactionalProducers += producer
-    producer
-  }
-}

Reply via email to