This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 7a04de4b09 KAFKA-14020: Performance regression in Producer (#12365)
7a04de4b09 is described below
commit 7a04de4b09d2f7706309bbe5dccdccf97077e681
Author: Artem Livshits <[email protected]>
AuthorDate: Wed Jul 20 08:19:31 2022 -0700
KAFKA-14020: Performance regression in Producer (#12365)
As part of KAFKA-10888 work, there were a couple regressions introduced:
A call to time.milliseconds() got moved under the queue lock, moving it
back outside the lock. The call may be expensive and cause lock contention. Now
the call is moved back outside of the lock.
The reference to ProducerRecord was held in the batch completion callback,
so it was kept alive as long as the batch was alive, which may increase the
amount of memory in certain scenario and cause excessive GC work. Now the
reference is reset early, so the ProducerRecord lifetime isn't bound to the
batch lifetime.
Tested via manually crafted benchmark, lock profile shows ~15% lock
contention on the ArrayQueue lock without the fix and ~5% lock contention with
the fix (which is also consistent with pre-KAFKA-10888 profile).
Alloc profile shows ~10% spent in
ProducerBatch.completeFutureAndFireCallbacks without the fix vs. ~0.25% with
the fix (which is also consistent with pre-KAFKA-10888 profile).
Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 30 +++++++++++++++-------
.../producer/internals/RecordAccumulator.java | 13 +++++++---
2 files changed, 30 insertions(+), 13 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 74d408d9a5..2d5c8994b4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1465,13 +1465,21 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
private class AppendCallbacks<K, V> implements
RecordAccumulator.AppendCallbacks {
private final Callback userCallback;
private final ProducerInterceptors<K, V> interceptors;
- private final ProducerRecord<K, V> record;
- protected int partition = RecordMetadata.UNKNOWN_PARTITION;
+ private final String topic;
+ private final Integer recordPartition;
+ private final String recordLogString;
+ private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
+ private volatile TopicPartition topicPartition;
private AppendCallbacks(Callback userCallback, ProducerInterceptors<K,
V> interceptors, ProducerRecord<K, V> record) {
this.userCallback = userCallback;
this.interceptors = interceptors;
- this.record = record;
+ // Extract record info as we don't want to keep a reference to the
record during
+ // whole lifetime of the batch.
+ // We don't want to have an NPE here, because the interceptors
would not be notified (see .doSend).
+ topic = record != null ? record.topic() : null;
+ recordPartition = record != null ? record.partition() : null;
+ recordLogString = log.isTraceEnabled() && record != null ?
record.toString() : "";
}
@Override
@@ -1491,7 +1499,7 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
if (log.isTraceEnabled()) {
// Log the message here, because we don't know the partition
before that.
- log.trace("Attempting to append record {} with callback {} to
topic {} partition {}", record, userCallback, record.topic(), partition);
+ log.trace("Attempting to append record {} with callback {} to
topic {} partition {}", recordLogString, userCallback, topic, partition);
}
}
@@ -1500,11 +1508,15 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
}
public TopicPartition topicPartition() {
- if (record == null)
- return null;
- return partition == RecordMetadata.UNKNOWN_PARTITION
- ? ProducerInterceptors.extractTopicPartition(record)
- : new TopicPartition(record.topic(), partition);
+ if (topicPartition == null && topic != null) {
+ if (partition != RecordMetadata.UNKNOWN_PARTITION)
+ topicPartition = new TopicPartition(topic, partition);
+ else if (recordPartition != null)
+ topicPartition = new TopicPartition(topic,
recordPartition);
+ else
+ topicPartition = new TopicPartition(topic,
RecordMetadata.UNKNOWN_PARTITION);
+ }
+ return topicPartition;
}
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 4168ea68aa..a1f684ac95 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -297,7 +297,12 @@ public class RecordAccumulator {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize,
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key,
value, headers));
log.trace("Allocating a new {} byte message buffer for
topic {} partition {} with remaining timeout {}ms", size, topic, partition,
maxTimeToBlock);
+ // This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
+ // Update the current time in case the buffer allocation
blocked above.
+ // NOTE: getting time may be expensive, so calling it
under a lock
+ // should be avoided.
+ nowMs = time.milliseconds();
}
synchronized (dq) {
@@ -307,7 +312,7 @@ public class RecordAccumulator {
partitionInfo.partition(), topic);
continue;
}
- RecordAppendResult appendResult = appendNewBatch(topic,
effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer);
+ RecordAppendResult appendResult = appendNewBatch(topic,
effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer,
nowMs);
// Set buffer to null, so that deallocate doesn't return
it back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
@@ -333,6 +338,7 @@ public class RecordAccumulator {
* @param headers the Headers for the record
* @param callbacks The callbacks to execute
* @param buffer The buffer for the new batch
+ * @param nowMs The current time, in milliseconds
*/
private RecordAppendResult appendNewBatch(String topic,
int partition,
@@ -342,11 +348,10 @@ public class RecordAccumulator {
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
- ByteBuffer buffer) {
+ ByteBuffer buffer,
+ long nowMs) {
assert partition != RecordMetadata.UNKNOWN_PARTITION;
- // Update the current time in case the buffer allocation blocked above.
- long nowMs = time.milliseconds();
RecordAppendResult appendResult = tryAppend(timestamp, key, value,
headers, callbacks, dq, nowMs);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for!
Hopefully this doesn't happen often...