artemlivshits commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r923725627
########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { // Log the message here, because we don't know the partition before that. - log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); + log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition); } + + // Reset record to null here so that it doesn't have to be alive as long as the batch is. + record = null; } public int getPartition() { return partition; } public TopicPartition topicPartition() { - if (record == null) - return null; - return partition == RecordMetadata.UNKNOWN_PARTITION - ? ProducerInterceptors.extractTopicPartition(record) - : new TopicPartition(record.topic(), partition); + if (partition != RecordMetadata.UNKNOWN_PARTITION) + return new TopicPartition(topic, partition); Review Comment: Ok. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -297,7 +297,12 @@ public RecordAppendResult append(String topic, byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock); + // This call may block if we exhausted buffer space. buffer = free.allocate(size, maxTimeToBlock); + // Update the current time in case the buffer allocation blocked above. + // NOTE: getting time may be expensive, so calling it under a lock + // should be avoided. + nowMs = time.milliseconds(); Review Comment: Filed KAFKA-14083. ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { // Log the message here, because we don't know the partition before that. - log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); + log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition); } + + // Reset record to null here so that it doesn't have to be alive as long as the batch is. + record = null; Review Comment: I think it would be non-intuitive to control record lifetime from the RecordAccumulator.append (that calls the callback) -- here we know that we don't need the record once partition is set, but the RecordAccumulator.append doesn't know it (in fact, it doesn't even know what we have the record). But I can add change it if you think this would make it easier to understand. ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1465,13 +1465,17 @@ public boolean isDone() { private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors<K, V> interceptors; - private final ProducerRecord<K, V> record; + private ProducerRecord<K, V> record; + private final String topic; protected int partition = RecordMetadata.UNKNOWN_PARTITION; private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) { this.userCallback = userCallback; this.interceptors = interceptors; this.record = record; + // Note a record would be null only if the client application has a bug, but we don't want to + // have NPE here, because the interceptors would not be notified (see .doSend). + topic = record != null ? record.topic() : null; Review Comment: Would passing null record not be a bug? I've changed the comment to not mention that it would be a bug. ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { Review Comment: All my previous life I was using asserts extensively in C/C++, they provide both validation and contract documentation. They do redundant validation in builds that are used in system tests without adding perf cost in prod. I can remove it, if it's not compatible with style, though I don't think this is just style -- using asserts makes a material difference in early bug detection and in code comprehension. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org