This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 3cbd063  KAFKA-12870; Flush in progress not cleared after transaction 
completion (#10880)
3cbd063 is described below

commit 3cbd063ec2be17cfcf3b4e4a1d203d4da5ab17b3
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Jun 18 15:50:49 2021 -0700

    KAFKA-12870; Flush in progress not cleared after transaction completion 
(#10880)
    
    We had been using `RecordAccumulator.beginFlush` in order to force the 
`RecordAccumulator` to flush pending batches when a transaction was being 
completed. Internally, `RecordAccumulator` has a simple counter for the number 
of flushes in progress. The count gets incremented in `beginFlush` and it is 
expected to be decremented by `awaitFlushCompletion`. The second call to 
decrement the counter never happened in the transactional path, so the counter 
could get stuck at a positive value, [...]
    
    This patch fixes the problem by removing the use of `beginFlush` in 
`Sender`. Instead, we now add an additional condition in `RecordAccumulator` to 
explicitly check when a transaction is being completed.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../producer/internals/RecordAccumulator.java      |   8 +-
 .../kafka/clients/producer/internals/Sender.java   |   8 --
 .../producer/internals/ProducerTestUtils.java      |  45 +++++++
 .../producer/internals/RecordAccumulatorTest.java  |  68 +++++++++-
 .../clients/producer/internals/SenderTest.java     | 142 ++++++++++++++++++++-
 .../producer/internals/TransactionManagerTest.java |  20 +--
 6 files changed, 258 insertions(+), 33 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index efccf30..b974f7a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -465,7 +465,13 @@ public final class RecordAccumulator {
                         long timeToWaitMs = backingOff ? retryBackoffMs : 
lingerMs;
                         boolean full = deque.size() > 1 || batch.isFull();
                         boolean expired = waitedTimeMs >= timeToWaitMs;
-                        boolean sendable = full || expired || exhausted || 
closed || flushInProgress();
+                        boolean transactionCompleting = transactionManager != 
null && transactionManager.isCompleting();
+                        boolean sendable = full
+                            || expired
+                            || exhausted
+                            || closed
+                            || flushInProgress()
+                            || transactionCompleting;
                         if (sendable && !backingOff) {
                             readyNodes.add(leader);
                         } else {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8f25493..b88c988 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -431,14 +431,6 @@ public class Sender implements Runnable {
             }
         }
 
-        if (transactionManager.isCompleting() && 
!accumulator.flushInProgress()) {
-            // There may still be requests left which are being retried. Since 
we do not know whether they had
-            // been successfully appended to the broker log, we must resend 
them until their final status is clear.
-            // If they had been appended and we did not receive the error, 
then our sequence number would no longer
-            // be correct which would lead to an OutOfSequenceException.
-            accumulator.beginFlush();
-        }
-
         TransactionManager.TxnRequestHandler nextRequestHandler = 
transactionManager.nextRequest(accumulator.hasIncomplete());
         if (nextRequestHandler == null)
             return false;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java
new file mode 100644
index 0000000..a841033
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ProducerTestUtils {
+    private static final int MAX_TRIES = 10;
+
+    static void runUntil(
+        Sender sender,
+        Supplier<Boolean> condition
+    ) {
+        runUntil(sender, condition, MAX_TRIES);
+    }
+
+    static void runUntil(
+        Sender sender,
+        Supplier<Boolean> condition,
+        int maxTries
+    ) {
+        int tries = 0;
+        while (!condition.get() && tries < maxTries) {
+            tries++;
+            sender.runOnce();
+        }
+        assertTrue(condition.get(), "Condition not satisfied after " + 
maxTries + " tries");
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 3608aab..90dcba5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -40,10 +40,12 @@ import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -695,7 +697,7 @@ public class RecordAccumulatorTest {
     }
 
     @Test
-    public void testIdempotenceWithOldMagic() throws InterruptedException {
+    public void testIdempotenceWithOldMagic() {
         // Simulate talking to an older broker, ie. one which supports a lower 
magic.
         ApiVersions apiVersions = new ApiVersions();
         int batchSize = 1025;
@@ -706,7 +708,7 @@ public class RecordAccumulatorTest {
         String metricGrpName = "producer-metrics";
 
         apiVersions.update("foobar", 
NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2));
-        TransactionManager transactionManager = new TransactionManager(new 
LogContext(), null, 0, 100L, new ApiVersions(), false);
+        TransactionManager transactionManager = new TransactionManager(new 
LogContext(), null, 0, 100L, apiVersions, false);
         RecordAccumulator accum = new RecordAccumulator(logContext, batchSize 
+ DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
             CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, 
metrics, metricGrpName, time, apiVersions, transactionManager,
             new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
@@ -715,6 +717,52 @@ public class RecordAccumulatorTest {
     }
 
     @Test
+    public void testRecordsDrainedWhenTransactionCompleting() throws Exception 
{
+        int batchSize = 1025;
+        int deliveryTimeoutMs = 3200;
+        int lingerMs = 10;
+        long totalSize = 10 * batchSize;
+
+        TransactionManager transactionManager = 
Mockito.mock(TransactionManager.class);
+        RecordAccumulator accumulator = 
createTestRecordAccumulator(transactionManager, deliveryTimeoutMs,
+            batchSize, totalSize, CompressionType.NONE, lingerMs);
+
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(12345L, 
(short) 5);
+        
Mockito.when(transactionManager.producerIdAndEpoch()).thenReturn(producerIdAndEpoch);
+        
Mockito.when(transactionManager.isSendToPartitionAllowed(tp1)).thenReturn(true);
+        
Mockito.when(transactionManager.isPartitionAdded(tp1)).thenReturn(true);
+        
Mockito.when(transactionManager.firstInFlightSequence(tp1)).thenReturn(0);
+
+        // Initially, the transaction is still in progress, so we should 
respect the linger.
+        Mockito.when(transactionManager.isCompleting()).thenReturn(false);
+
+        accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs,
+            false, time.milliseconds());
+        accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs,
+            false, time.milliseconds());
+        assertTrue(accumulator.hasUndrained());
+
+        RecordAccumulator.ReadyCheckResult firstResult = 
accumulator.ready(cluster, time.milliseconds());
+        assertEquals(0, firstResult.readyNodes.size());
+        Map<Integer, List<ProducerBatch>> firstDrained = 
accumulator.drain(cluster, firstResult.readyNodes,
+            Integer.MAX_VALUE, time.milliseconds());
+        assertEquals(0, firstDrained.size());
+
+        // Once the transaction begins completion, then the batch should be 
drained immediately.
+        Mockito.when(transactionManager.isCompleting()).thenReturn(true);
+
+        RecordAccumulator.ReadyCheckResult secondResult = 
accumulator.ready(cluster, time.milliseconds());
+        assertEquals(1, secondResult.readyNodes.size());
+        Node readyNode = secondResult.readyNodes.iterator().next();
+
+        Map<Integer, List<ProducerBatch>> secondDrained = 
accumulator.drain(cluster, secondResult.readyNodes,
+            Integer.MAX_VALUE, time.milliseconds());
+        assertEquals(Collections.singleton(readyNode.id()), 
secondDrained.keySet());
+        List<ProducerBatch> batches = secondDrained.get(readyNode.id());
+        assertEquals(1, batches.size());
+    }
+
+    @Test
     public void testSplitAndReenqueue() throws ExecutionException, 
InterruptedException {
         long now = time.milliseconds();
         RecordAccumulator accum = createTestRecordAccumulator(1024, 10 * 1024, 
CompressionType.GZIP, 10);
@@ -1080,16 +1128,26 @@ public class RecordAccumulatorTest {
         }
     }
 
-
     private RecordAccumulator createTestRecordAccumulator(int batchSize, long 
totalSize, CompressionType type, int lingerMs) {
         int deliveryTimeoutMs = 3200;
         return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, 
totalSize, type, lingerMs);
     }
 
+    private RecordAccumulator createTestRecordAccumulator(int 
deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int 
lingerMs) {
+        return createTestRecordAccumulator(null, deliveryTimeoutMs, batchSize, 
totalSize, type, lingerMs);
+    }
+
     /**
      * Return a test RecordAccumulator instance
      */
-    private RecordAccumulator createTestRecordAccumulator(int 
deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int 
lingerMs) {
+    private RecordAccumulator createTestRecordAccumulator(
+        TransactionManager txnManager,
+        int deliveryTimeoutMs,
+        int batchSize,
+        long totalSize,
+        CompressionType type,
+        int lingerMs
+    ) {
         long retryBackoffMs = 100L;
         String metricGrpName = "producer-metrics";
 
@@ -1104,7 +1162,7 @@ public class RecordAccumulatorTest {
             metricGrpName,
             time,
             new ApiVersions(),
-            null,
+            txnManager,
             new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index e118c11..c20eaf6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -105,6 +105,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.kafka.clients.producer.internals.ProducerTestUtils.runUntil;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -1030,7 +1031,7 @@ public class SenderTest {
         TransactionManager transactionManager = createTransactionManager();
 
         // Retries once
-        setupWithTransactionState(transactionManager, false, null, true, 1);
+        setupWithTransactionState(transactionManager, false, null, true, 1, 0);
 
         // Init producer id/epoch
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
@@ -2627,6 +2628,130 @@ public class SenderTest {
     }
 
     @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws 
Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions, false);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent 
immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is 
sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request 
to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still 
effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws 
Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions, false);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We 
expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager 
txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, 
Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long 
offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    @Test
     public void testIncompleteTransactionAbortOnShutdown() {
         // create a sender with retries = 1
         int maxRetries = 1;
@@ -2915,11 +3040,15 @@ public class SenderTest {
     }
     
     private void setupWithTransactionState(TransactionManager 
transactionManager) {
-        setupWithTransactionState(transactionManager, false, null, true, 
Integer.MAX_VALUE);
+        setupWithTransactionState(transactionManager, false, null, true, 
Integer.MAX_VALUE, 0);
+    }
+
+    private void setupWithTransactionState(TransactionManager 
transactionManager, int lingerMs) {
+        setupWithTransactionState(transactionManager, false, null, true, 
Integer.MAX_VALUE, lingerMs);
     }
 
     private void setupWithTransactionState(TransactionManager 
transactionManager, boolean guaranteeOrder, BufferPool customPool) {
-        setupWithTransactionState(transactionManager, guaranteeOrder, 
customPool, true, Integer.MAX_VALUE);
+        setupWithTransactionState(transactionManager, guaranteeOrder, 
customPool, true, Integer.MAX_VALUE, 0);
     }
 
     private void setupWithTransactionState(
@@ -2928,7 +3057,7 @@ public class SenderTest {
         BufferPool customPool,
         boolean updateMetadata
     ) {
-        setupWithTransactionState(transactionManager, guaranteeOrder, 
customPool, updateMetadata, Integer.MAX_VALUE);
+        setupWithTransactionState(transactionManager, guaranteeOrder, 
customPool, updateMetadata, Integer.MAX_VALUE, 0);
     }
 
     private void setupWithTransactionState(
@@ -2936,7 +3065,8 @@ public class SenderTest {
         boolean guaranteeOrder,
         BufferPool customPool,
         boolean updateMetadata,
-        int retries
+        int retries,
+        int lingerMs
     ) {
         long totalSize = 1024 * 1024;
         String metricGrpName = "producer-metrics";
@@ -2944,7 +3074,7 @@ public class SenderTest {
         this.metrics = new Metrics(metricConfig, time);
         BufferPool pool = (customPool == null) ? new BufferPool(totalSize, 
batchSize, metrics, time, metricGrpName) : customPool;
 
-        this.accumulator = new RecordAccumulator(logContext, batchSize, 
CompressionType.NONE, 0, 0L,
+        this.accumulator = new RecordAccumulator(logContext, batchSize, 
CompressionType.NONE, lingerMs, 0L,
             DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, 
transactionManager, pool);
         this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
         this.sender = new Sender(logContext, this.client, this.metadata, 
this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 9f097fc..3e24bcf 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -18,21 +18,17 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
-import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.errors.FencedInstanceIdException;
-import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
-import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.RequestTestUtils;
-import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -44,6 +40,7 @@ import 
org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.metrics.Metrics;
@@ -67,13 +64,16 @@ import 
org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -3596,13 +3596,7 @@ public class TransactionManagerTest {
     }
 
     private void runUntil(Supplier<Boolean> condition) {
-        for (int i = 0; i < 5; i++) {
-            if (condition.get())
-                break;
-            sender.runOnce();
-        }
-        if (!condition.get())
-            throw new AssertionError("Condition was not satisfied after 
multiple runs");
+        ProducerTestUtils.runUntil(sender, condition);
     }
 
 }

Reply via email to