This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push: new 3cbd063 KAFKA-12870; Flush in progress not cleared after transaction completion (#10880) 3cbd063 is described below commit 3cbd063ec2be17cfcf3b4e4a1d203d4da5ab17b3 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Fri Jun 18 15:50:49 2021 -0700 KAFKA-12870; Flush in progress not cleared after transaction completion (#10880) We had been using `RecordAccumulator.beginFlush` in order to force the `RecordAccumulator` to flush pending batches when a transaction was being completed. Internally, `RecordAccumulator` has a simple counter for the number of flushes in progress. The count gets incremented in `beginFlush` and it is expected to be decremented by `awaitFlushCompletion`. The second call to decrement the counter never happened in the transactional path, so the counter could get stuck at a positive value, [...] This patch fixes the problem by removing the use of `beginFlush` in `Sender`. Instead, we now add an additional condition in `RecordAccumulator` to explicitly check when a transaction is being completed. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../producer/internals/RecordAccumulator.java | 8 +- .../kafka/clients/producer/internals/Sender.java | 8 -- .../producer/internals/ProducerTestUtils.java | 45 +++++++ .../producer/internals/RecordAccumulatorTest.java | 68 +++++++++- .../clients/producer/internals/SenderTest.java | 142 ++++++++++++++++++++- .../producer/internals/TransactionManagerTest.java | 20 +-- 6 files changed, 258 insertions(+), 33 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index efccf30..b974f7a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -465,7 +465,13 @@ public final class RecordAccumulator { long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; boolean full = deque.size() > 1 || batch.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; - boolean sendable = full || expired || exhausted || closed || flushInProgress(); + boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting(); + boolean sendable = full + || expired + || exhausted + || closed + || flushInProgress() + || transactionCompleting; if (sendable && !backingOff) { readyNodes.add(leader); } else { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8f25493..b88c988 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -431,14 +431,6 @@ public class Sender implements Runnable { } } - if (transactionManager.isCompleting() && !accumulator.flushInProgress()) { - // There may still be requests left which are being retried. Since we do not know whether they had - // been successfully appended to the broker log, we must resend them until their final status is clear. - // If they had been appended and we did not receive the error, then our sequence number would no longer - // be correct which would lead to an OutOfSequenceException. - accumulator.beginFlush(); - } - TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequest(accumulator.hasIncomplete()); if (nextRequestHandler == null) return false; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java new file mode 100644 index 0000000..a841033 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerTestUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ProducerTestUtils { + private static final int MAX_TRIES = 10; + + static void runUntil( + Sender sender, + Supplier<Boolean> condition + ) { + runUntil(sender, condition, MAX_TRIES); + } + + static void runUntil( + Sender sender, + Supplier<Boolean> condition, + int maxTries + ) { + int tries = 0; + while (!condition.get() && tries < maxTries) { + tries++; + sender.runOnce(); + } + assertTrue(condition.get(), "Condition not satisfied after " + maxTries + " tries"); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 3608aab..90dcba5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -40,10 +40,12 @@ import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -695,7 +697,7 @@ public class RecordAccumulatorTest { } @Test - public void testIdempotenceWithOldMagic() throws InterruptedException { + public void testIdempotenceWithOldMagic() { // Simulate talking to an older broker, ie. one which supports a lower magic. ApiVersions apiVersions = new ApiVersions(); int batchSize = 1025; @@ -706,7 +708,7 @@ public class RecordAccumulatorTest { String metricGrpName = "producer-metrics"; apiVersions.update("foobar", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2)); - TransactionManager transactionManager = new TransactionManager(new LogContext(), null, 0, 100L, new ApiVersions(), false); + TransactionManager transactionManager = new TransactionManager(new LogContext(), null, 0, 100L, apiVersions, false); RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); @@ -715,6 +717,52 @@ public class RecordAccumulatorTest { } @Test + public void testRecordsDrainedWhenTransactionCompleting() throws Exception { + int batchSize = 1025; + int deliveryTimeoutMs = 3200; + int lingerMs = 10; + long totalSize = 10 * batchSize; + + TransactionManager transactionManager = Mockito.mock(TransactionManager.class); + RecordAccumulator accumulator = createTestRecordAccumulator(transactionManager, deliveryTimeoutMs, + batchSize, totalSize, CompressionType.NONE, lingerMs); + + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(12345L, (short) 5); + Mockito.when(transactionManager.producerIdAndEpoch()).thenReturn(producerIdAndEpoch); + Mockito.when(transactionManager.isSendToPartitionAllowed(tp1)).thenReturn(true); + Mockito.when(transactionManager.isPartitionAdded(tp1)).thenReturn(true); + Mockito.when(transactionManager.firstInFlightSequence(tp1)).thenReturn(0); + + // Initially, the transaction is still in progress, so we should respect the linger. + Mockito.when(transactionManager.isCompleting()).thenReturn(false); + + accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, + false, time.milliseconds()); + accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, + false, time.milliseconds()); + assertTrue(accumulator.hasUndrained()); + + RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(cluster, time.milliseconds()); + assertEquals(0, firstResult.readyNodes.size()); + Map<Integer, List<ProducerBatch>> firstDrained = accumulator.drain(cluster, firstResult.readyNodes, + Integer.MAX_VALUE, time.milliseconds()); + assertEquals(0, firstDrained.size()); + + // Once the transaction begins completion, then the batch should be drained immediately. + Mockito.when(transactionManager.isCompleting()).thenReturn(true); + + RecordAccumulator.ReadyCheckResult secondResult = accumulator.ready(cluster, time.milliseconds()); + assertEquals(1, secondResult.readyNodes.size()); + Node readyNode = secondResult.readyNodes.iterator().next(); + + Map<Integer, List<ProducerBatch>> secondDrained = accumulator.drain(cluster, secondResult.readyNodes, + Integer.MAX_VALUE, time.milliseconds()); + assertEquals(Collections.singleton(readyNode.id()), secondDrained.keySet()); + List<ProducerBatch> batches = secondDrained.get(readyNode.id()); + assertEquals(1, batches.size()); + } + + @Test public void testSplitAndReenqueue() throws ExecutionException, InterruptedException { long now = time.milliseconds(); RecordAccumulator accum = createTestRecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10); @@ -1080,16 +1128,26 @@ public class RecordAccumulatorTest { } } - private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, int lingerMs) { int deliveryTimeoutMs = 3200; return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, type, lingerMs); } + private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) { + return createTestRecordAccumulator(null, deliveryTimeoutMs, batchSize, totalSize, type, lingerMs); + } + /** * Return a test RecordAccumulator instance */ - private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) { + private RecordAccumulator createTestRecordAccumulator( + TransactionManager txnManager, + int deliveryTimeoutMs, + int batchSize, + long totalSize, + CompressionType type, + int lingerMs + ) { long retryBackoffMs = 100L; String metricGrpName = "producer-metrics"; @@ -1104,7 +1162,7 @@ public class RecordAccumulatorTest { metricGrpName, time, new ApiVersions(), - null, + txnManager, new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index e118c11..c20eaf6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -105,6 +105,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.kafka.clients.producer.internals.ProducerTestUtils.runUntil; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -1030,7 +1031,7 @@ public class SenderTest { TransactionManager transactionManager = createTransactionManager(); // Retries once - setupWithTransactionState(transactionManager, false, null, true, 1); + setupWithTransactionState(transactionManager, false, null, true, 1, 0); // Init producer id/epoch prepareAndReceiveInitProducerId(producerId, Errors.NONE); @@ -2627,6 +2628,130 @@ public class SenderTest { } @Test + public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception { + try (Metrics m = new Metrics()) { + int lingerMs = 50; + SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); + + TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions, false); + setupWithTransactionState(txnManager, lingerMs); + + Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, + 1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions); + + // Begin a transaction and successfully add one partition to it. + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + doInitTransactions(txnManager, producerIdAndEpoch); + txnManager.beginTransaction(); + addPartitionToTxn(sender, txnManager, tp0); + + // Send a couple records and assert that they are not sent immediately (due to linger). + appendToAccumulator(tp0); + appendToAccumulator(tp0); + sender.runOnce(); + assertFalse(client.hasInFlightRequests()); + + // Now begin the commit and assert that the Produce request is sent immediately + // without waiting for the linger. + txnManager.beginCommit(); + runUntil(sender, client::hasInFlightRequests); + + // Respond to the produce request and wait for the EndTxn request to be sent. + respondToProduce(tp0, Errors.NONE, 1L); + runUntil(sender, txnManager::hasInFlightRequest); + + // Respond to the expected EndTxn request. + respondToEndTxn(Errors.NONE); + runUntil(sender, txnManager::isReady); + + // Finally, we want to assert that the linger time is still effective + // when the new transaction begins. + txnManager.beginTransaction(); + addPartitionToTxn(sender, txnManager, tp0); + + appendToAccumulator(tp0); + appendToAccumulator(tp0); + time.sleep(lingerMs - 1); + sender.runOnce(); + assertFalse(client.hasInFlightRequests()); + assertTrue(accumulator.hasUndrained()); + + time.sleep(1); + runUntil(sender, client::hasInFlightRequests); + assertFalse(accumulator.hasUndrained()); + } + } + + @Test + public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception { + try (Metrics m = new Metrics()) { + SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); + + TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions, false); + setupWithTransactionState(txnManager); + + Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, + 1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions); + + // Begin a transaction and successfully add one partition to it. + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + doInitTransactions(txnManager, producerIdAndEpoch); + txnManager.beginTransaction(); + addPartitionToTxn(sender, txnManager, tp0); + + // Send one Produce request. + appendToAccumulator(tp0); + runUntil(sender, () -> client.requests().size() == 1); + assertFalse(accumulator.hasUndrained()); + assertTrue(client.hasInFlightRequests()); + assertTrue(txnManager.hasInflightBatches(tp0)); + + // Enqueue another record and then commit the transaction. We expect the unsent record to + // get sent before the transaction can be completed. + appendToAccumulator(tp0); + txnManager.beginCommit(); + runUntil(sender, () -> client.requests().size() == 2); + + assertTrue(txnManager.isCompleting()); + assertFalse(txnManager.hasInFlightRequest()); + assertTrue(txnManager.hasInflightBatches(tp0)); + + // Now respond to the pending Produce requests. + respondToProduce(tp0, Errors.NONE, 0L); + respondToProduce(tp0, Errors.NONE, 1L); + runUntil(sender, txnManager::hasInFlightRequest); + + // Finally, respond to the expected EndTxn request. + respondToEndTxn(Errors.NONE); + runUntil(sender, txnManager::isReady); + } + } + + private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) { + txnManager.maybeAddPartitionToTransaction(tp); + client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE))); + runUntil(sender, () -> txnManager.isPartitionAdded(tp)); + assertFalse(txnManager.hasInFlightRequest()); + } + + private void respondToProduce(TopicPartition tp, Errors error, long offset) { + client.respond( + request -> request instanceof ProduceRequest, + produceResponse(tp, offset, error, 0) + ); + + } + + private void respondToEndTxn(Errors error) { + client.respond( + request -> request instanceof EndTxnRequest, + new EndTxnResponse(new EndTxnResponseData() + .setErrorCode(error.code()) + .setThrottleTimeMs(0)) + ); + } + + @Test public void testIncompleteTransactionAbortOnShutdown() { // create a sender with retries = 1 int maxRetries = 1; @@ -2915,11 +3040,15 @@ public class SenderTest { } private void setupWithTransactionState(TransactionManager transactionManager) { - setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE); + setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE, 0); + } + + private void setupWithTransactionState(TransactionManager transactionManager, int lingerMs) { + setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE, lingerMs); } private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) { - setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE); + setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0); } private void setupWithTransactionState( @@ -2928,7 +3057,7 @@ public class SenderTest { BufferPool customPool, boolean updateMetadata ) { - setupWithTransactionState(transactionManager, guaranteeOrder, customPool, updateMetadata, Integer.MAX_VALUE); + setupWithTransactionState(transactionManager, guaranteeOrder, customPool, updateMetadata, Integer.MAX_VALUE, 0); } private void setupWithTransactionState( @@ -2936,7 +3065,8 @@ public class SenderTest { boolean guaranteeOrder, BufferPool customPool, boolean updateMetadata, - int retries + int retries, + int lingerMs ) { long totalSize = 1024 * 1024; String metricGrpName = "producer-metrics"; @@ -2944,7 +3074,7 @@ public class SenderTest { this.metrics = new Metrics(metricConfig, time); BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool; - this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L, + this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, lingerMs, 0L, DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, transactionManager, pool); this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics); this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL, diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 9f097fc..3e24bcf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -18,21 +18,17 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.errors.FencedInstanceIdException; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; -import org.apache.kafka.common.requests.JoinGroupRequest; -import org.apache.kafka.common.requests.RequestTestUtils; -import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; @@ -44,6 +40,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.message.EndTxnResponseData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.metrics.Metrics; @@ -67,13 +64,16 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; +import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; import org.apache.kafka.common.requests.TxnOffsetCommitResponse; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -3596,13 +3596,7 @@ public class TransactionManagerTest { } private void runUntil(Supplier<Boolean> condition) { - for (int i = 0; i < 5; i++) { - if (condition.get()) - break; - sender.runOnce(); - } - if (!condition.get()) - throw new AssertionError("Condition was not satisfied after multiple runs"); + ProducerTestUtils.runUntil(sender, condition); } }