artemlivshits commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r923725627


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition 
before that.
-                log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive 
as long as the batch is.
+            record = null;
         }
 
         public int getPartition() {
             return partition;
         }
 
         public TopicPartition topicPartition() {
-            if (record == null)
-                return null;
-            return partition == RecordMetadata.UNKNOWN_PARTITION
-                    ? ProducerInterceptors.extractTopicPartition(record)
-                    : new TopicPartition(record.topic(), partition);
+            if (partition != RecordMetadata.UNKNOWN_PARTITION)
+                return new TopicPartition(topic, partition);

Review Comment:
   Ok.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -297,7 +297,12 @@ public RecordAppendResult append(String topic,
                     byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                     int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, 
value, headers));
                     log.trace("Allocating a new {} byte message buffer for 
topic {} partition {} with remaining timeout {}ms", size, topic, partition, 
maxTimeToBlock);
+                    // This call may block if we exhausted buffer space.
                     buffer = free.allocate(size, maxTimeToBlock);
+                    // Update the current time in case the buffer allocation 
blocked above.
+                    // NOTE: getting time may be expensive, so calling it 
under a lock
+                    // should be avoided.
+                    nowMs = time.milliseconds();

Review Comment:
   Filed KAFKA-14083.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {
                 // Log the message here, because we don't know the partition 
before that.
-                log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+                log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, topic, partition);
             }
+
+            // Reset record to null here so that it doesn't have to be alive 
as long as the batch is.
+            record = null;

Review Comment:
   I think it would be non-intuitive to control record lifetime from the 
RecordAccumulator.append (that calls the callback) -- here we know that we 
don't need the record once partition is set, but the RecordAccumulator.append 
doesn't know it (in fact, it doesn't even know what we have the record).  But I 
can add change it if you think this would make it easier to understand.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1465,13 +1465,17 @@ public boolean isDone() {
     private class AppendCallbacks<K, V> implements 
RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final ProducerRecord<K, V> record;
+        private ProducerRecord<K, V> record;
+        private final String topic;
         protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
         private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, 
V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.record = record;
+            // Note a record would be null only if the client application has 
a bug, but we don't want to
+            // have NPE here, because the interceptors would not be notified 
(see .doSend).
+            topic = record != null ? record.topic() : null;

Review Comment:
   Would passing null record not be a bug?  I've changed the comment to not 
mention that it would be a bug.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
             if (log.isTraceEnabled()) {

Review Comment:
   All my previous life I was using asserts extensively in C/C++, they provide 
both validation and contract documentation.  They do redundant validation in 
builds that are used in system tests without adding perf cost in prod.  I can 
remove it, if it's not compatible with style, though I don't think this is just 
style -- using asserts makes a material difference in early bug detection and 
in code comprehension.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to