This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c85fd07 KAFKA-9703; Free up compression buffer after splitting a
large batch
c85fd07 is described below
commit c85fd07bd161c9a4fd531fcb633e62c25db1f660
Author: Jiamei Xie <[email protected]>
AuthorDate: Thu Apr 16 08:01:10 2020 +0800
KAFKA-9703; Free up compression buffer after splitting a large batch
Method split takes up too many resources and might
cause outOfMemory error when the bigBatch is huge.
Call closeForRecordAppends() to free up resources
like compression buffers.
Change-Id: Iac6519fcc2e432330b8af2d9f68a8d4d4a07646b
Signed-off-by: Jiamei Xie <jiamei.xiearm.com>
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
Author: Jiamei Xie <[email protected]>
Reviewers: Chia-Ping Tsai <[email protected]>, Jiangjie (Becket) Qin
<[email protected]>
Closes #8286 from jiameixie/outOfMemory
---
.../org/apache/kafka/clients/producer/internals/ProducerBatch.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 9323a61..cfd3a67 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -268,14 +268,17 @@ public final class ProducerBatch {
// A newly created batch can always host the first message.
if (!batch.tryAppendForSplit(record.timestamp(), record.key(),
record.value(), record.headers(), thunk)) {
batches.add(batch);
+ batch.closeForRecordAppends();
batch = createBatchOffAccumulatorForRecord(record,
splitBatchSize);
batch.tryAppendForSplit(record.timestamp(), record.key(),
record.value(), record.headers(), thunk);
}
}
// Close the last batch and add it to the batch list after split.
- if (batch != null)
+ if (batch != null) {
batches.add(batch);
+ batch.closeForRecordAppends();
+ }
produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, new
RecordBatchTooLargeException());
produceFuture.done();