This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 37c066a3664 KAFKA-19012 Fix rare producer message corruption, don't
reuse buffers… (#21286)
37c066a3664 is described below
commit 37c066a3664663437d1efdbdc4e7bac5b70efaf9
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Mon Jan 12 11:53:01 2026 +0800
KAFKA-19012 Fix rare producer message corruption, don't reuse buffers…
(#21286)
… on the client in certain error cases (#21065)
Client versions 2.8.0 and later are affected by a
[change](https://github.com/apache/kafka/commit/30bc21ca35b165f04c472b4ce794893843809ccc)
that exposes a latent bug in how BufferPool is used (BufferPool is a
class used on the client side to allocate memory in ByteBuffers, for
performance it will reuse them with the caller of the class doing manual
memory management by calling free when they are done with the memory).
The bug is that a pooled ByteBuffer can be freed while it is still in
use by the network sending thread - this early freeing can happen when
batches expire / brokers are disconnecting from clients. This bug has
existed for more than a decade (since Kafka 0.x it seems), but never
manifested because prior to 2.8.0 the pooled ByteBuffer (which contained
record data aka your publishes) was copied into a freshly allocated
ByteBuffer before any potential reuse and that fresh ByteBuffer was what
got written over the network to the broker. With a change included in
2.8.0, the pooled ByteBuffer remains as-is inside of a MemoryRecords
instance and this pooled ByteBuffer (which in some cases can be reused
and overwritten with other data) is written over the network. Two
contributing factors are that the checksum for Kafka records only
includes the key/value/headers/etc and not the topic so there is no
protection there, and also an implementation detail is that, also newly
in the commit that exposed the bug, the produce request header (which
includes the topic and partition of a group of message batches) is
serialized in a buffer separately from the messages themselves (and the
latter is what gets put in the pooled ByteBuffer) which allows you to
get messages misrouted to a random recently used topic as opposed to
simple duplicate messages on their intended topic.
The key change is in Sender.sendProducerData, we cannot allow the pooled
ByteBuffer to be reused for expired in-flight batches until the request
completes. For these batches we avoid deallocating the buffer in the
normal failBatch call, deferring it until we call completeBatch (or a
different path of failBatch).
There are some automated tests to cover this, and also manual testing
done to reproduce the issue from KAFKA-19012 and verify that this is
sufficient to stop it.
Reviewers: Justine Olshan <[email protected]>, Jun Rao
<[email protected]>, Chia-Ping Tsai <[email protected]>
Co-authored-by: Donny Nadolny <[email protected]>
---
.../clients/producer/internals/ProducerBatch.java | 19 +++++
.../producer/internals/RecordAccumulator.java | 44 ++++++++++--
.../kafka/clients/producer/internals/Sender.java | 80 +++++++++++++++-------
.../producer/internals/RecordAccumulatorTest.java | 8 +--
.../clients/producer/internals/SenderTest.java | 51 +++++++++++++-
5 files changed, 165 insertions(+), 37 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index c4f9c0f7f08..274c5184894 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -72,6 +72,9 @@ public final class ProducerBatch {
private final AtomicInteger attempts = new AtomicInteger(0);
private final boolean isSplitBatch;
private final AtomicReference<FinalState> finalState = new
AtomicReference<>(null);
+ private boolean bufferDeallocated = false;
+ // Tracks if the batch has been sent to the NetworkClient
+ private boolean inflight = false;
int recordCount;
int maxRecordSize;
@@ -581,6 +584,22 @@ public final class ProducerBatch {
return reopened;
}
+ public boolean isBufferDeallocated() {
+ return bufferDeallocated;
+ }
+
+ public void markBufferDeallocated() {
+ bufferDeallocated = true;
+ }
+
+ public boolean isInflight() {
+ return inflight;
+ }
+
+ public void setInflight(boolean inflight) {
+ this.inflight = inflight;
+ }
+
// VisibleForTesting
OptionalInt currentLeaderEpoch() {
return currentLeaderEpoch;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 7a5c6fd8f12..adb31a6c005 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -1022,14 +1022,39 @@ public class RecordAccumulator {
}
/**
- * Deallocate the record batch
+ * Complete and deallocate the record batch
+ */
+ public void completeAndDeallocateBatch(ProducerBatch batch) {
+ completeBatch(batch);
+ deallocate(batch);
+ }
+
+ /**
+ * Only perform deallocation (and not removal from the incomplete set)
*/
public void deallocate(ProducerBatch batch) {
- incomplete.remove(batch);
// Only deallocate the batch if it is not a split batch because split
batch are allocated outside the
// buffer pool.
- if (!batch.isSplitBatch())
- free.deallocate(batch.buffer(), batch.initialCapacity());
+ if (!batch.isSplitBatch()) {
+ if (batch.isBufferDeallocated()) {
+ log.warn("Skipping deallocating a batch that has already been
deallocated. Batch is {}, created time is {}", batch, batch.createdMs);
+ } else {
+ batch.markBufferDeallocated();
+ if (batch.isInflight()) {
+ // Create a fresh ByteBuffer to give to BufferPool to
reuse since we can't safely call deallocate with the ProduceBatch's buffer
+
free.deallocate(ByteBuffer.allocate(batch.initialCapacity()));
+ throw new IllegalStateException("Attempting to deallocate
a batch that is inflight. Batch is " + batch);
+ }
+ free.deallocate(batch.buffer(), batch.initialCapacity());
+ }
+ }
+ }
+
+ /**
+ * Remove from the incomplete list but do not free memory yet
+ */
+ public void completeBatch(ProducerBatch batch) {
+ incomplete.remove(batch);
}
/**
@@ -1127,7 +1152,14 @@ public class RecordAccumulator {
dq.remove(batch);
}
batch.abort(reason);
- deallocate(batch);
+ if (batch.isInflight()) {
+ // KAFKA-19012: if the batch has been sent it might still be
in use by the network client so we cannot allow it to be reused yet.
+ // We skip deallocating it now. When the request in network
client completes with a response, either Sender.completeBatch() or
+ // Sender.failBatch() will be called with
deallocateBatch=true. The buffer associated with the batch will be deallocated
then.
+ completeBatch(batch);
+ } else {
+ completeAndDeallocateBatch(batch);
+ }
}
}
@@ -1147,7 +1179,7 @@ public class RecordAccumulator {
}
if (aborted) {
batch.abort(reason);
- deallocate(batch);
+ completeAndDeallocateBatch(batch);
}
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 64e8646d6f1..50cdaf1d78c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -171,7 +171,12 @@ public class Sender implements Runnable {
private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) {
maybeRemoveFromInflightBatches(batch);
- this.accumulator.deallocate(batch);
+ this.accumulator.completeAndDeallocateBatch(batch);
+ }
+
+ private void maybeRemoveAndDeallocateBatchLater(ProducerBatch batch) {
+ maybeRemoveFromInflightBatches(batch);
+ this.accumulator.completeBatch(batch);
}
/**
@@ -354,6 +359,23 @@ public class Sender implements Runnable {
return false;
}
+ private void failExpiredBatches(List<ProducerBatch> expiredBatches, long
now, boolean deallocateBuffer) {
+ // Reset the producer id if an expired batch has previously been sent
to the broker. Also update the metrics
+ // for expired batches. see the documentation of
@TransactionState.resetIdempotentProducerId to understand why
+ // we need to reset the producer id here.
+ if (!expiredBatches.isEmpty())
+ log.trace("Expired {} batches in accumulator",
expiredBatches.size());
+ for (ProducerBatch expiredBatch : expiredBatches) {
+ String errorMessage = "Expiring " + expiredBatch.recordCount + "
record(s) for " + expiredBatch.topicPartition
+ + ":" + (now - expiredBatch.createdMs) + " ms has passed
since batch creation";
+ failBatch(expiredBatch, new TimeoutException(errorMessage), false,
deallocateBuffer);
+ if (transactionManager != null && expiredBatch.inRetry()) {
+ // This ensures that no new batches are drained until the
current in flight batches are fully resolved.
+ transactionManager.markSequenceUnresolved(expiredBatch);
+ }
+ }
+ }
+
private long sendProducerData(long now) {
MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
// get the list of partitions with data ready to send
@@ -405,22 +427,10 @@ public class Sender implements Runnable {
accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches =
getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches =
this.accumulator.expiredBatches(now);
- expiredBatches.addAll(expiredInflightBatches);
- // Reset the producer id if an expired batch has previously been sent
to the broker. Also update the metrics
- // for expired batches. see the documentation of
@TransactionState.resetIdempotentProducerId to understand why
- // we need to reset the producer id here.
- if (!expiredBatches.isEmpty())
- log.trace("Expired {} batches in accumulator",
expiredBatches.size());
- for (ProducerBatch expiredBatch : expiredBatches) {
- String errorMessage = "Expiring " + expiredBatch.recordCount + "
record(s) for " + expiredBatch.topicPartition
- + ":" + (now - expiredBatch.createdMs) + " ms has passed since
batch creation";
- failBatch(expiredBatch, new TimeoutException(errorMessage), false);
- if (transactionManager != null && expiredBatch.inRetry()) {
- // This ensures that no new batches are drained until the
current in flight batches are fully resolved.
- transactionManager.markSequenceUnresolved(expiredBatch);
- }
- }
+ failExpiredBatches(expiredBatches, now, true);
+ failExpiredBatches(expiredInflightBatches, now, false);
+
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data,
poll with 0 timeout so this can immediately
@@ -523,6 +533,7 @@ public class Sender implements Runnable {
if (accumulator.hasIncomplete()) {
log.error("Aborting producer batches due to fatal error",
exception);
accumulator.abortBatches(exception);
+ inFlightBatches.clear();
}
}
@@ -658,6 +669,7 @@ public class Sender implements Runnable {
*/
private void completeBatch(ProducerBatch batch,
ProduceResponse.PartitionResponse response, long correlationId,
long now, Map<TopicPartition,
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo) {
+ batch.setInflight(false);
Errors error = response.error;
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
!batch.isDone() &&
@@ -695,7 +707,7 @@ public class Sender implements Runnable {
// tell the user the result of their request. We only adjust
sequence numbers if the batch didn't exhaust
// its retries -- if it did, we don't know whether the
sequence number was accepted or not, and
// thus it is not safe to reassign the sequence.
- failBatch(batch, response, batch.attempts() < this.retries);
+ failBatch(batch, response, batch.attempts() < this.retries,
true);
}
if (error.exception() instanceof InvalidMetadataException) {
if (error.exception() instanceof
UnknownTopicOrPartitionException) {
@@ -748,12 +760,16 @@ public class Sender implements Runnable {
if (batch.complete(response.baseOffset, response.logAppendTime)) {
maybeRemoveAndDeallocateBatch(batch);
+ } else {
+ // Always safe to call deallocate because the batch keeps track of
whether or not it was deallocated yet
+ this.accumulator.deallocate(batch);
}
}
private void failBatch(ProducerBatch batch,
ProduceResponse.PartitionResponse response,
- boolean adjustSequenceNumbers) {
+ boolean adjustSequenceNumbers,
+ boolean deallocateBatch) {
final RuntimeException topLevelException;
if (response.error == Errors.TOPIC_AUTHORIZATION_FAILED)
topLevelException = new
TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
@@ -763,7 +779,7 @@ public class Sender implements Runnable {
topLevelException =
response.error.exception(response.errorMessage);
if (response.recordErrors == null || response.recordErrors.isEmpty()) {
- failBatch(batch, topLevelException, adjustSequenceNumbers);
+ failBatch(batch, topLevelException, adjustSequenceNumbers,
deallocateBatch);
} else {
Map<Integer, RuntimeException> recordErrorMap = new
HashMap<>(response.recordErrors.size());
for (ProduceResponse.RecordError recordError :
response.recordErrors) {
@@ -802,23 +818,25 @@ public class Sender implements Runnable {
}
};
- failBatch(batch, topLevelException, recordExceptions,
adjustSequenceNumbers);
+ failBatch(batch, topLevelException, recordExceptions,
adjustSequenceNumbers, deallocateBatch);
}
}
private void failBatch(
ProducerBatch batch,
RuntimeException topLevelException,
- boolean adjustSequenceNumbers
+ boolean adjustSequenceNumbers,
+ boolean deallocateBatch
) {
- failBatch(batch, topLevelException, batchIndex -> topLevelException,
adjustSequenceNumbers);
+ failBatch(batch, topLevelException, batchIndex -> topLevelException,
adjustSequenceNumbers, deallocateBatch);
}
private void failBatch(
ProducerBatch batch,
RuntimeException topLevelException,
Function<Integer, RuntimeException> recordExceptions,
- boolean adjustSequenceNumbers
+ boolean adjustSequenceNumbers,
+ boolean deallocateBatch
) {
this.sensors.recordErrors(batch.topicPartition.topic(),
batch.recordCount);
@@ -832,7 +850,20 @@ public class Sender implements Runnable {
log.debug("Encountered error when transaction manager was
handling a failed batch", e);
}
}
- maybeRemoveAndDeallocateBatch(batch);
+ if (deallocateBatch) {
+ maybeRemoveAndDeallocateBatch(batch);
+ } else {
+ // Fix for KAFKA-19012
+ // The pooled ByteBuffer associated with this batch might
still be in use by the network client so we
+ // cannot allow it to be reused yet. We skip deallocating it
now. When the request in the network client
+ // completes with a response, either completeBatch() or
failBatch() will be called with deallocateBatch=true.
+ // The buffer associated with the batch will be deallocated
then.
+ maybeRemoveAndDeallocateBatchLater(batch);
+ }
+ } else {
+ if (deallocateBatch) {
+ this.accumulator.deallocate(batch);
+ }
}
}
@@ -885,6 +916,7 @@ public class Sender implements Runnable {
.setIndex(tp.partition())
.setRecords(records));
recordsByPartition.put(tp, batch);
+ batch.setInflight(true);
}
String transactionalId = null;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 00e693519e8..1420ec943e4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -403,7 +403,7 @@ public class RecordAccumulatorTest {
for (ProducerBatch batch : batches) {
for (@SuppressWarnings("UnusedLocalVariable") Record
ignored : batch.records().records())
read++;
- accum.deallocate(batch);
+ accum.completeAndDeallocateBatch(batch);
}
}
}
@@ -667,7 +667,7 @@ public class RecordAccumulatorTest {
for (List<ProducerBatch> batches: results.values())
for (ProducerBatch batch: batches)
- accum.deallocate(batch);
+ accum.completeAndDeallocateBatch(batch);
// should be complete with no unsent records.
accum.awaitFlushCompletion();
@@ -1573,7 +1573,7 @@ public class RecordAccumulatorTest {
assertEquals(1, batches.values().iterator().next().size());
ProducerBatch batch = batches.values().iterator().next().get(0);
int numSplitBatches = accum.splitAndReenqueue(batch);
- accum.deallocate(batch);
+ accum.completeAndDeallocateBatch(batch);
return numSplitBatches;
}
@@ -1597,7 +1597,7 @@ public class RecordAccumulatorTest {
} else {
batch.complete(0L, 0L);
}
- accum.deallocate(batch);
+ accum.completeAndDeallocateBatch(batch);
}
}
} while (batchDrained);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index f9f94af1806..62ec91fbfe9 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -126,6 +126,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -2181,7 +2182,10 @@ public class SenderTest {
public void testCancelInFlightRequestAfterFatalError() throws Exception {
final long producerId = 343434L;
TransactionManager transactionManager = createTransactionManager();
- setupWithTransactionState(transactionManager);
+ long totalSize = 1024 * 1024;
+ String metricGrpName = "producer-custom-metrics";
+ MatchingBufferPool pool = new MatchingBufferPool(totalSize, batchSize,
metrics, time, metricGrpName);
+ setupWithTransactionState(transactionManager, false, pool);
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
@@ -2193,6 +2197,8 @@ public class SenderTest {
Future<RecordMetadata> future2 = appendToAccumulator(tp1);
sender.runOnce();
+ assertFalse(pool.allMatch());
+
client.respond(
body -> body instanceof ProduceRequest &&
RequestTestUtils.hasIdempotentRecords((ProduceRequest) body),
produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
@@ -2203,12 +2209,14 @@ public class SenderTest {
sender.runOnce();
assertFutureFailure(future2, ClusterAuthorizationException.class);
+ assertFalse(pool.allMatch(), "Batch should not be deallocated before
the response is received");
// Should be fine if the second response eventually returns
client.respond(
body -> body instanceof ProduceRequest &&
RequestTestUtils.hasIdempotentRecords((ProduceRequest) body),
produceResponse(tp1, 0, Errors.NONE, 0));
sender.runOnce();
+ assertTrue(pool.allMatch(), "The batch should have been de-allocated");
}
@Test
@@ -2434,12 +2442,15 @@ public class SenderTest {
assertEquals(ApiKeys.PRODUCE,
client.requests().peek().requestBuilder().apiKey());
Node node = new Node(Integer.parseInt(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
+ ProducerBatch inflightBatch =
sender.inFlightBatches(tpId.topicPartition()).get(0);
+ assertTrue(inflightBatch.isInflight(), "Batch should be marked
inflight after being sent");
assertTrue(client.isReady(node, time.milliseconds()), "Client
ready status should be true");
Map<TopicIdPartition, ProduceResponse.PartitionResponse>
responseMap = new HashMap<>();
responseMap.put(tpId, new
ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
client.respond(new ProduceResponse(responseMap));
sender.runOnce(); // split and reenqueue
+ assertFalse(inflightBatch.isInflight(), "Batch should be marked as
not inflight after being split and re-enqueued");
assertEquals(2, txnManager.sequenceNumber(tpId.topicPartition()),
"The next sequence should be 2");
// The compression ratio should have been improved once.
assertEquals(CompressionType.GZIP.rate -
CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
@@ -2497,14 +2508,16 @@ public class SenderTest {
sender.runOnce(); // send request
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, sender.inFlightBatches(tp0).size());
+ assertFalse(sender.inFlightBatches(tp0).get(0).isBufferDeallocated(),
"Buffer not deallocated yet");
+ ProducerBatch inflightBatch = sender.inFlightBatches(tp0).get(0);
time.sleep(REQUEST_TIMEOUT);
assertFalse(pool.allMatch());
- sender.runOnce(); // expire the batch
+ sender.runOnce(); // times out the request
assertTrue(request1.isDone());
+ assertTrue(inflightBatch.isBufferDeallocated(), "Buffer should be
deallocated after request timeout");
assertTrue(pool.allMatch(), "The batch should have been de-allocated");
- assertTrue(pool.allMatch());
sender.runOnce();
assertTrue(pool.allMatch(), "The batch should have been de-allocated");
@@ -3588,6 +3601,38 @@ public class SenderTest {
}
}
+ @Test
+ public void testNoBufferReuseWhenBatchExpires() throws Exception {
+ long totalSize = 1024 * 1024;
+ try (Metrics m = new Metrics()) {
+ BufferPool pool = new BufferPool(totalSize, batchSize, m, time,
"producer-internal-metrics");
+
+ // Allocate and store a poolable buffer, then return it to the
pool so the Sender can pick it up
+ ByteBuffer buffer = pool.allocate(batchSize, 0);
+ pool.deallocate(buffer);
+
+ setupWithTransactionState(null, false, pool);
+ appendToAccumulator(tp0, 0L, "key", "value");
+ sender.runOnce(); // connect
+ sender.runOnce(); // send produce request
+
+ assertEquals(1, client.inFlightRequestCount());
+ assertEquals(1, sender.inFlightBatches(tp0).size());
+
+ ProducerBatch batch = sender.inFlightBatches(tp0).get(0);
+ // Validate the backing array of the buffer is the same as the
pooled one from the start
+ assertSame(buffer.array(), batch.records().buffer().array(),
"Sender should have allocated the same buffer we created");
+
+ time.sleep(DELIVERY_TIMEOUT_MS + 100);
+ sender.runOnce();
+
+ ByteBuffer newBuffer = pool.allocate(batchSize, 0);
+
+ // TEST buffer should not be reused
+ assertNotSame(buffer.array(), newBuffer.array(), "Buffer should
not be reused");
+ }
+ }
+
private void verifyErrorMessage(ProduceResponse response, String
expectedMessage) throws Exception {
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key",
"value");