This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new badfbacdd0 KAFKA-14020: Performance regression in Producer (#12365)
badfbacdd0 is described below

commit badfbacdd09a9ee8821847f4b28d98625f354ed7
Author: Artem Livshits <[email protected]>
AuthorDate: Wed Jul 20 08:19:31 2022 -0700

    KAFKA-14020: Performance regression in Producer (#12365)
    
    As part of KAFKA-10888 work, there were a couple regressions introduced:
    
    A call to time.milliseconds() got moved under the queue lock, moving it 
back outside the lock. The call may be expensive and cause lock contention. Now 
the call is moved back outside of the lock.
    
    The reference to ProducerRecord was held in the batch completion callback, 
so it was kept alive as long as the batch was alive, which may increase the 
amount of memory in certain scenario and cause excessive GC work. Now the 
reference is reset early, so the ProducerRecord lifetime isn't bound to the 
batch lifetime.
    
    Tested via manually crafted benchmark, lock profile shows ~15% lock 
contention on the ArrayQueue lock without the fix and ~5% lock contention with 
the fix (which is also consistent with pre-KAFKA-10888 profile).
    
    Alloc profile shows ~10% spent in 
ProducerBatch.completeFutureAndFireCallbacks without the fix vs. ~0.25% with 
the fix (which is also consistent with pre-KAFKA-10888 profile).
    
    Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      | 30 +++++++++++++++-------
 .../producer/internals/RecordAccumulator.java      | 13 +++++++---
 2 files changed, 30 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 74d408d9a5..2d5c8994b4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1465,13 +1465,21 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
     private class AppendCallbacks<K, V> implements 
RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
-        protected int partition = RecordMetadata.UNKNOWN_PARTITION;
+        private final String topic;
+        private final Integer recordPartition;
+        private final String recordLogString;
+        private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
+        private volatile TopicPartition topicPartition;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, 
V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
-            this.record = record;
+            // Extract record info as we don't want to keep a reference to the 
record during
+            // whole lifetime of the batch.
+            // We don't want to have an NPE here, because the interceptors 
would not be notified (see .doSend).
+            topic = record != null ? record.topic() : null;
+            recordPartition = record != null ? record.partition() : null;
+            recordLogString = log.isTraceEnabled() && record != null ? 
record.toString() : "";
         }
 
         @Override
@@ -1491,7 +1499,7 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition 
before that.
-                log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", recordLogString, userCallback, topic, partition);
             }
         }
 
@@ -1500,11 +1508,15 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         }
 
         public TopicPartition topicPartition() {
-            if (record == null)
-                return null;
-            return partition == RecordMetadata.UNKNOWN_PARTITION
-                    ? ProducerInterceptors.extractTopicPartition(record)
-                    : new TopicPartition(record.topic(), partition);
+            if (topicPartition == null && topic != null) {
+                if (partition != RecordMetadata.UNKNOWN_PARTITION)
+                    topicPartition = new TopicPartition(topic, partition);
+                else if (recordPartition != null)
+                    topicPartition = new TopicPartition(topic, 
recordPartition);
+                else
+                    topicPartition = new TopicPartition(topic, 
RecordMetadata.UNKNOWN_PARTITION);
+            }
+            return topicPartition;
         }
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 4168ea68aa..a1f684ac95 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -297,7 +297,12 @@ public class RecordAccumulator {
                     byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                     int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, 
value, headers));
                     log.trace("Allocating a new {} byte message buffer for 
topic {} partition {} with remaining timeout {}ms", size, topic, partition, 
maxTimeToBlock);
+                    // This call may block if we exhausted buffer space.
                     buffer = free.allocate(size, maxTimeToBlock);
+                    // Update the current time in case the buffer allocation 
blocked above.
+                    // NOTE: getting time may be expensive, so calling it 
under a lock
+                    // should be avoided.
+                    nowMs = time.milliseconds();
                 }
 
                 synchronized (dq) {
@@ -307,7 +312,7 @@ public class RecordAccumulator {
                                 partitionInfo.partition(), topic);
                         continue;
                     }
-                    RecordAppendResult appendResult = appendNewBatch(topic, 
effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer);
+                    RecordAppendResult appendResult = appendNewBatch(topic, 
effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, 
nowMs);
                     // Set buffer to null, so that deallocate doesn't return 
it back to free pool, since it's used in the batch.
                     if (appendResult.newBatchCreated)
                         buffer = null;
@@ -333,6 +338,7 @@ public class RecordAccumulator {
      * @param headers the Headers for the record
      * @param callbacks The callbacks to execute
      * @param buffer The buffer for the new batch
+     * @param nowMs The current time, in milliseconds
      */
     private RecordAppendResult appendNewBatch(String topic,
                                               int partition,
@@ -342,11 +348,10 @@ public class RecordAccumulator {
                                               byte[] value,
                                               Header[] headers,
                                               AppendCallbacks callbacks,
-                                              ByteBuffer buffer) {
+                                              ByteBuffer buffer,
+                                              long nowMs) {
         assert partition != RecordMetadata.UNKNOWN_PARTITION;
 
-        // Update the current time in case the buffer allocation blocked above.
-        long nowMs = time.milliseconds();
         RecordAppendResult appendResult = tryAppend(timestamp, key, value, 
headers, callbacks, dq, nowMs);
         if (appendResult != null) {
             // Somebody else found us a batch, return the one we waited for! 
Hopefully this doesn't happen often...

Reply via email to