This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 37c066a3664 KAFKA-19012 Fix rare producer message corruption, don't 
reuse buffers… (#21286)
37c066a3664 is described below

commit 37c066a3664663437d1efdbdc4e7bac5b70efaf9
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Mon Jan 12 11:53:01 2026 +0800

    KAFKA-19012 Fix rare producer message corruption, don't reuse buffers… 
(#21286)
    
    … on the client in certain error cases (#21065)
    
    Client versions 2.8.0 and later are affected by a
    
    
    
[change](https://github.com/apache/kafka/commit/30bc21ca35b165f04c472b4ce794893843809ccc)
    that exposes a latent bug in how BufferPool is used (BufferPool is a
    class used on the client side to allocate memory in ByteBuffers, for
    performance it will reuse them with the caller of the class doing manual
    memory management by calling free when they are done with the memory).
    The bug is that a pooled ByteBuffer can be freed while it is still in
    use by the network sending thread - this early freeing can happen when
    batches expire / brokers are disconnecting from clients. This bug has
    existed for more than a decade (since Kafka 0.x it seems), but never
    manifested because prior to 2.8.0 the pooled ByteBuffer (which contained
    record data aka your publishes) was copied into a freshly allocated
    ByteBuffer before any potential reuse and that fresh ByteBuffer was what
    got written over the network to the broker. With a change included in
    2.8.0, the pooled ByteBuffer remains as-is inside of a MemoryRecords
    instance and this pooled ByteBuffer (which in some cases can be reused
    and overwritten with other data) is written over the network. Two
    contributing factors are that the checksum for Kafka records only
    includes the key/value/headers/etc and not the topic so there is no
    protection there, and also an implementation detail is that, also newly
    in the commit that exposed the bug, the produce request header (which
    includes the topic and partition of a group of message batches) is
    serialized in a buffer separately from the messages themselves (and the
    latter is what gets put in the pooled ByteBuffer) which allows you to
    get messages misrouted to a random recently used topic as opposed to
    simple duplicate messages on their intended topic.
    
    The key change is in Sender.sendProducerData, we cannot allow the pooled
    ByteBuffer to be reused for expired in-flight batches until the request
    completes. For these batches we avoid deallocating the buffer in the
    normal failBatch call, deferring it until we call completeBatch (or a
    different path of failBatch).
    
    There are some automated tests to cover this, and also manual testing
    done to reproduce the issue from KAFKA-19012 and verify that this is
    sufficient to stop it.
    
    Reviewers: Justine Olshan <[email protected]>, Jun Rao
     <[email protected]>, Chia-Ping Tsai <[email protected]>
    
    Co-authored-by: Donny Nadolny <[email protected]>
---
 .../clients/producer/internals/ProducerBatch.java  | 19 +++++
 .../producer/internals/RecordAccumulator.java      | 44 ++++++++++--
 .../kafka/clients/producer/internals/Sender.java   | 80 +++++++++++++++-------
 .../producer/internals/RecordAccumulatorTest.java  |  8 +--
 .../clients/producer/internals/SenderTest.java     | 51 +++++++++++++-
 5 files changed, 165 insertions(+), 37 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index c4f9c0f7f08..274c5184894 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -72,6 +72,9 @@ public final class ProducerBatch {
     private final AtomicInteger attempts = new AtomicInteger(0);
     private final boolean isSplitBatch;
     private final AtomicReference<FinalState> finalState = new 
AtomicReference<>(null);
+    private boolean bufferDeallocated = false;
+    // Tracks if the batch has been sent to the NetworkClient
+    private boolean inflight = false;
 
     int recordCount;
     int maxRecordSize;
@@ -581,6 +584,22 @@ public final class ProducerBatch {
         return reopened;
     }
 
+    public boolean isBufferDeallocated() {
+        return bufferDeallocated;
+    }
+
+    public void markBufferDeallocated() {
+        bufferDeallocated = true;
+    }
+
+    public boolean isInflight() {
+        return inflight;
+    }
+
+    public void setInflight(boolean inflight) {
+        this.inflight = inflight;
+    }
+
     // VisibleForTesting
     OptionalInt currentLeaderEpoch() {
         return currentLeaderEpoch;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 7a5c6fd8f12..adb31a6c005 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -1022,14 +1022,39 @@ public class RecordAccumulator {
     }
 
     /**
-     * Deallocate the record batch
+     * Complete and deallocate the record batch
+     */
+    public void completeAndDeallocateBatch(ProducerBatch batch) {
+        completeBatch(batch);
+        deallocate(batch);
+    }
+
+    /**
+     * Only perform deallocation (and not removal from the incomplete set)
      */
     public void deallocate(ProducerBatch batch) {
-        incomplete.remove(batch);
         // Only deallocate the batch if it is not a split batch because split 
batch are allocated outside the
         // buffer pool.
-        if (!batch.isSplitBatch())
-            free.deallocate(batch.buffer(), batch.initialCapacity());
+        if (!batch.isSplitBatch()) {
+            if (batch.isBufferDeallocated()) {
+                log.warn("Skipping deallocating a batch that has already been 
deallocated. Batch is {}, created time is {}", batch, batch.createdMs);
+            } else {
+                batch.markBufferDeallocated();
+                if (batch.isInflight()) {
+                    // Create a fresh ByteBuffer to give to BufferPool to 
reuse since we can't safely call deallocate with the ProduceBatch's buffer
+                    
free.deallocate(ByteBuffer.allocate(batch.initialCapacity()));
+                    throw new IllegalStateException("Attempting to deallocate 
a batch that is inflight. Batch is " + batch);
+                }
+                free.deallocate(batch.buffer(), batch.initialCapacity());
+            }
+        }
+    }
+
+    /**
+     * Remove from the incomplete list but do not free memory yet
+     */
+    public void completeBatch(ProducerBatch batch) {
+        incomplete.remove(batch);
     }
 
     /**
@@ -1127,7 +1152,14 @@ public class RecordAccumulator {
                 dq.remove(batch);
             }
             batch.abort(reason);
-            deallocate(batch);
+            if (batch.isInflight()) {
+                // KAFKA-19012: if the batch has been sent it might still be 
in use by the network client so we cannot allow it to be reused yet.
+                // We skip deallocating it now. When the request in network 
client completes with a response, either Sender.completeBatch() or
+                // Sender.failBatch() will be called with 
deallocateBatch=true. The buffer associated with the batch will be deallocated 
then.
+                completeBatch(batch);
+            } else {
+                completeAndDeallocateBatch(batch);
+            }
         }
     }
 
@@ -1147,7 +1179,7 @@ public class RecordAccumulator {
             }
             if (aborted) {
                 batch.abort(reason);
-                deallocate(batch);
+                completeAndDeallocateBatch(batch);
             }
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 64e8646d6f1..50cdaf1d78c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -171,7 +171,12 @@ public class Sender implements Runnable {
 
     private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) {
         maybeRemoveFromInflightBatches(batch);
-        this.accumulator.deallocate(batch);
+        this.accumulator.completeAndDeallocateBatch(batch);
+    }
+
+    private void maybeRemoveAndDeallocateBatchLater(ProducerBatch batch) {
+        maybeRemoveFromInflightBatches(batch);
+        this.accumulator.completeBatch(batch);
     }
 
     /**
@@ -354,6 +359,23 @@ public class Sender implements Runnable {
         return false;
     }
 
+    private void failExpiredBatches(List<ProducerBatch> expiredBatches, long 
now, boolean deallocateBuffer) {
+        // Reset the producer id if an expired batch has previously been sent 
to the broker. Also update the metrics
+        // for expired batches. see the documentation of 
@TransactionState.resetIdempotentProducerId to understand why
+        // we need to reset the producer id here.
+        if (!expiredBatches.isEmpty())
+            log.trace("Expired {} batches in accumulator", 
expiredBatches.size());
+        for (ProducerBatch expiredBatch : expiredBatches) {
+            String errorMessage = "Expiring " + expiredBatch.recordCount + " 
record(s) for " + expiredBatch.topicPartition
+                    + ":" + (now - expiredBatch.createdMs) + " ms has passed 
since batch creation";
+            failBatch(expiredBatch, new TimeoutException(errorMessage), false, 
deallocateBuffer);
+            if (transactionManager != null && expiredBatch.inRetry()) {
+                // This ensures that no new batches are drained until the 
current in flight batches are fully resolved.
+                transactionManager.markSequenceUnresolved(expiredBatch);
+            }
+        }
+    }
+
     private long sendProducerData(long now) {
         MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
         // get the list of partitions with data ready to send
@@ -405,22 +427,10 @@ public class Sender implements Runnable {
         accumulator.resetNextBatchExpiryTime();
         List<ProducerBatch> expiredInflightBatches = 
getExpiredInflightBatches(now);
         List<ProducerBatch> expiredBatches = 
this.accumulator.expiredBatches(now);
-        expiredBatches.addAll(expiredInflightBatches);
 
-        // Reset the producer id if an expired batch has previously been sent 
to the broker. Also update the metrics
-        // for expired batches. see the documentation of 
@TransactionState.resetIdempotentProducerId to understand why
-        // we need to reset the producer id here.
-        if (!expiredBatches.isEmpty())
-            log.trace("Expired {} batches in accumulator", 
expiredBatches.size());
-        for (ProducerBatch expiredBatch : expiredBatches) {
-            String errorMessage = "Expiring " + expiredBatch.recordCount + " 
record(s) for " + expiredBatch.topicPartition
-                + ":" + (now - expiredBatch.createdMs) + " ms has passed since 
batch creation";
-            failBatch(expiredBatch, new TimeoutException(errorMessage), false);
-            if (transactionManager != null && expiredBatch.inRetry()) {
-                // This ensures that no new batches are drained until the 
current in flight batches are fully resolved.
-                transactionManager.markSequenceUnresolved(expiredBatch);
-            }
-        }
+        failExpiredBatches(expiredBatches, now, true);
+        failExpiredBatches(expiredInflightBatches, now, false);
+
         sensors.updateProduceRequestMetrics(batches);
 
         // If we have any nodes that are ready to send + have sendable data, 
poll with 0 timeout so this can immediately
@@ -523,6 +533,7 @@ public class Sender implements Runnable {
         if (accumulator.hasIncomplete()) {
             log.error("Aborting producer batches due to fatal error", 
exception);
             accumulator.abortBatches(exception);
+            inFlightBatches.clear();
         }
     }
 
@@ -658,6 +669,7 @@ public class Sender implements Runnable {
      */
     private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response, long correlationId,
                                long now, Map<TopicPartition, 
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo) {
+        batch.setInflight(false);
         Errors error = response.error;
 
         if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && 
!batch.isDone() &&
@@ -695,7 +707,7 @@ public class Sender implements Runnable {
                 // tell the user the result of their request. We only adjust 
sequence numbers if the batch didn't exhaust
                 // its retries -- if it did, we don't know whether the 
sequence number was accepted or not, and
                 // thus it is not safe to reassign the sequence.
-                failBatch(batch, response, batch.attempts() < this.retries);
+                failBatch(batch, response, batch.attempts() < this.retries, 
true);
             }
             if (error.exception() instanceof InvalidMetadataException) {
                 if (error.exception() instanceof 
UnknownTopicOrPartitionException) {
@@ -748,12 +760,16 @@ public class Sender implements Runnable {
 
         if (batch.complete(response.baseOffset, response.logAppendTime)) {
             maybeRemoveAndDeallocateBatch(batch);
+        } else {
+            // Always safe to call deallocate because the batch keeps track of 
whether or not it was deallocated yet
+            this.accumulator.deallocate(batch);
         }
     }
 
     private void failBatch(ProducerBatch batch,
                            ProduceResponse.PartitionResponse response,
-                           boolean adjustSequenceNumbers) {
+                           boolean adjustSequenceNumbers,
+                           boolean deallocateBatch) {
         final RuntimeException topLevelException;
         if (response.error == Errors.TOPIC_AUTHORIZATION_FAILED)
             topLevelException = new 
TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
@@ -763,7 +779,7 @@ public class Sender implements Runnable {
             topLevelException = 
response.error.exception(response.errorMessage);
 
         if (response.recordErrors == null || response.recordErrors.isEmpty()) {
-            failBatch(batch, topLevelException, adjustSequenceNumbers);
+            failBatch(batch, topLevelException, adjustSequenceNumbers, 
deallocateBatch);
         } else {
             Map<Integer, RuntimeException> recordErrorMap = new 
HashMap<>(response.recordErrors.size());
             for (ProduceResponse.RecordError recordError : 
response.recordErrors) {
@@ -802,23 +818,25 @@ public class Sender implements Runnable {
                 }
             };
 
-            failBatch(batch, topLevelException, recordExceptions, 
adjustSequenceNumbers);
+            failBatch(batch, topLevelException, recordExceptions, 
adjustSequenceNumbers, deallocateBatch);
         }
     }
 
     private void failBatch(
         ProducerBatch batch,
         RuntimeException topLevelException,
-        boolean adjustSequenceNumbers
+        boolean adjustSequenceNumbers,
+        boolean deallocateBatch
     ) {
-        failBatch(batch, topLevelException, batchIndex -> topLevelException, 
adjustSequenceNumbers);
+        failBatch(batch, topLevelException, batchIndex -> topLevelException, 
adjustSequenceNumbers, deallocateBatch);
     }
 
     private void failBatch(
         ProducerBatch batch,
         RuntimeException topLevelException,
         Function<Integer, RuntimeException> recordExceptions,
-        boolean adjustSequenceNumbers
+        boolean adjustSequenceNumbers,
+        boolean deallocateBatch
     ) {
         this.sensors.recordErrors(batch.topicPartition.topic(), 
batch.recordCount);
 
@@ -832,7 +850,20 @@ public class Sender implements Runnable {
                     log.debug("Encountered error when transaction manager was 
handling a failed batch", e);
                 }
             }
-            maybeRemoveAndDeallocateBatch(batch);
+            if (deallocateBatch) {
+                maybeRemoveAndDeallocateBatch(batch);
+            } else {
+                // Fix for KAFKA-19012
+                // The pooled ByteBuffer associated with this batch might 
still be in use by the network client so we
+                // cannot allow it to be reused yet. We skip deallocating it 
now. When the request in the network client 
+                // completes with a response, either completeBatch() or 
failBatch() will be called with deallocateBatch=true.
+                // The buffer associated with the batch will be deallocated 
then.
+                maybeRemoveAndDeallocateBatchLater(batch);
+            }
+        } else {
+            if (deallocateBatch) {
+                this.accumulator.deallocate(batch);
+            }
         }
     }
 
@@ -885,6 +916,7 @@ public class Sender implements Runnable {
                     .setIndex(tp.partition())
                     .setRecords(records));
             recordsByPartition.put(tp, batch);
+            batch.setInflight(true);
         }
 
         String transactionalId = null;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 00e693519e8..1420ec943e4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -403,7 +403,7 @@ public class RecordAccumulatorTest {
                 for (ProducerBatch batch : batches) {
                     for (@SuppressWarnings("UnusedLocalVariable") Record 
ignored : batch.records().records())
                         read++;
-                    accum.deallocate(batch);
+                    accum.completeAndDeallocateBatch(batch);
                 }
             }
         }
@@ -667,7 +667,7 @@ public class RecordAccumulatorTest {
 
         for (List<ProducerBatch> batches: results.values())
             for (ProducerBatch batch: batches)
-                accum.deallocate(batch);
+                accum.completeAndDeallocateBatch(batch);
 
         // should be complete with no unsent records.
         accum.awaitFlushCompletion();
@@ -1573,7 +1573,7 @@ public class RecordAccumulatorTest {
         assertEquals(1, batches.values().iterator().next().size());
         ProducerBatch batch = batches.values().iterator().next().get(0);
         int numSplitBatches = accum.splitAndReenqueue(batch);
-        accum.deallocate(batch);
+        accum.completeAndDeallocateBatch(batch);
 
         return numSplitBatches;
     }
@@ -1597,7 +1597,7 @@ public class RecordAccumulatorTest {
                     } else {
                         batch.complete(0L, 0L);
                     }
-                    accum.deallocate(batch);
+                    accum.completeAndDeallocateBatch(batch);
                 }
             }
         } while (batchDrained);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index f9f94af1806..62ec91fbfe9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -126,6 +126,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -2181,7 +2182,10 @@ public class SenderTest {
     public void testCancelInFlightRequestAfterFatalError() throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = createTransactionManager();
-        setupWithTransactionState(transactionManager);
+        long totalSize = 1024 * 1024;
+        String metricGrpName = "producer-custom-metrics";
+        MatchingBufferPool pool = new MatchingBufferPool(totalSize, batchSize, 
metrics, time, metricGrpName);
+        setupWithTransactionState(transactionManager, false, pool);
 
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
@@ -2193,6 +2197,8 @@ public class SenderTest {
         Future<RecordMetadata> future2 = appendToAccumulator(tp1);
         sender.runOnce();
 
+        assertFalse(pool.allMatch());
+
         client.respond(
             body -> body instanceof ProduceRequest && 
RequestTestUtils.hasIdempotentRecords((ProduceRequest) body),
             produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
@@ -2203,12 +2209,14 @@ public class SenderTest {
 
         sender.runOnce();
         assertFutureFailure(future2, ClusterAuthorizationException.class);
+        assertFalse(pool.allMatch(), "Batch should not be deallocated before 
the response is received");
 
         // Should be fine if the second response eventually returns
         client.respond(
             body -> body instanceof ProduceRequest && 
RequestTestUtils.hasIdempotentRecords((ProduceRequest) body),
             produceResponse(tp1, 0, Errors.NONE, 0));
         sender.runOnce();
+        assertTrue(pool.allMatch(), "The batch should have been de-allocated");
     }
 
     @Test
@@ -2434,12 +2442,15 @@ public class SenderTest {
             assertEquals(ApiKeys.PRODUCE, 
client.requests().peek().requestBuilder().apiKey());
             Node node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
+            ProducerBatch inflightBatch = 
sender.inFlightBatches(tpId.topicPartition()).get(0);
+            assertTrue(inflightBatch.isInflight(), "Batch should be marked 
inflight after being sent");
             assertTrue(client.isReady(node, time.milliseconds()), "Client 
ready status should be true");
 
             Map<TopicIdPartition, ProduceResponse.PartitionResponse> 
responseMap = new HashMap<>();
             responseMap.put(tpId, new 
ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
             client.respond(new ProduceResponse(responseMap));
             sender.runOnce(); // split and reenqueue
+            assertFalse(inflightBatch.isInflight(), "Batch should be marked as 
not inflight after being split and re-enqueued");
             assertEquals(2, txnManager.sequenceNumber(tpId.topicPartition()), 
"The next sequence should be 2");
             // The compression ratio should have been improved once.
             assertEquals(CompressionType.GZIP.rate - 
CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
@@ -2497,14 +2508,16 @@ public class SenderTest {
         sender.runOnce();  // send request
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size());
+        assertFalse(sender.inFlightBatches(tp0).get(0).isBufferDeallocated(), 
"Buffer not deallocated yet");
+        ProducerBatch inflightBatch = sender.inFlightBatches(tp0).get(0);
 
         time.sleep(REQUEST_TIMEOUT);
         assertFalse(pool.allMatch());
 
-        sender.runOnce();  // expire the batch
+        sender.runOnce();  // times out the request
         assertTrue(request1.isDone());
+        assertTrue(inflightBatch.isBufferDeallocated(), "Buffer should be 
deallocated after request timeout");
         assertTrue(pool.allMatch(), "The batch should have been de-allocated");
-        assertTrue(pool.allMatch());
 
         sender.runOnce();
         assertTrue(pool.allMatch(), "The batch should have been de-allocated");
@@ -3588,6 +3601,38 @@ public class SenderTest {
         }
     }
 
+    @Test
+    public void testNoBufferReuseWhenBatchExpires() throws Exception {
+        long totalSize = 1024 * 1024;
+        try (Metrics m = new Metrics()) {
+            BufferPool pool = new BufferPool(totalSize, batchSize, m, time, 
"producer-internal-metrics");
+
+            // Allocate and store a poolable buffer, then return it to the 
pool so the Sender can pick it up
+            ByteBuffer buffer = pool.allocate(batchSize, 0);
+            pool.deallocate(buffer);
+
+            setupWithTransactionState(null, false, pool);
+            appendToAccumulator(tp0, 0L, "key", "value");
+            sender.runOnce();  // connect
+            sender.runOnce();  // send produce request
+
+            assertEquals(1, client.inFlightRequestCount());
+            assertEquals(1, sender.inFlightBatches(tp0).size());
+
+            ProducerBatch batch = sender.inFlightBatches(tp0).get(0);
+            // Validate the backing array of the buffer is the same as the 
pooled one from the start
+            assertSame(buffer.array(), batch.records().buffer().array(), 
"Sender should have allocated the same buffer we created");
+
+            time.sleep(DELIVERY_TIMEOUT_MS + 100);
+            sender.runOnce();
+
+            ByteBuffer newBuffer = pool.allocate(batchSize, 0);
+
+            // TEST buffer should not be reused
+            assertNotSame(buffer.array(), newBuffer.array(), "Buffer should 
not be reused");
+        }
+    }
+
 
     private void verifyErrorMessage(ProduceResponse response, String 
expectedMessage) throws Exception {
         Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", 
"value");

Reply via email to