This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c85fd07  KAFKA-9703; Free up compression buffer after splitting a 
large batch
c85fd07 is described below

commit c85fd07bd161c9a4fd531fcb633e62c25db1f660
Author: Jiamei Xie <[email protected]>
AuthorDate: Thu Apr 16 08:01:10 2020 +0800

    KAFKA-9703; Free up compression buffer after splitting a large batch
    
    Method split takes up too many resources and might
    cause outOfMemory error when the bigBatch is huge.
    Call closeForRecordAppends() to free up resources
    like compression buffers.
    
    Change-Id: Iac6519fcc2e432330b8af2d9f68a8d4d4a07646b
    Signed-off-by: Jiamei Xie <jiamei.xiearm.com>
    
    *More detailed description of your change,
    if necessary. The PR title and PR message become
    the squashed commit message, so use a separate
    comment to ping reviewers.*
    
    *Summary of testing strategy (including rationale)
    for the feature or bug fix. Unit and/or integration
    tests are expected for any behaviour change and
    system tests should be considered for larger changes.*
    
    Author: Jiamei Xie <[email protected]>
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Jiangjie (Becket) Qin 
<[email protected]>
    
    Closes #8286 from jiameixie/outOfMemory
---
 .../org/apache/kafka/clients/producer/internals/ProducerBatch.java   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 9323a61..cfd3a67 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -268,14 +268,17 @@ public final class ProducerBatch {
             // A newly created batch can always host the first message.
             if (!batch.tryAppendForSplit(record.timestamp(), record.key(), 
record.value(), record.headers(), thunk)) {
                 batches.add(batch);
+                batch.closeForRecordAppends();
                 batch = createBatchOffAccumulatorForRecord(record, 
splitBatchSize);
                 batch.tryAppendForSplit(record.timestamp(), record.key(), 
record.value(), record.headers(), thunk);
             }
         }
 
         // Close the last batch and add it to the batch list after split.
-        if (batch != null)
+        if (batch != null) {
             batches.add(batch);
+            batch.closeForRecordAppends();
+        }
 
         produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, new 
RecordBatchTooLargeException());
         produceFuture.done();

Reply via email to