ijuma commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r919747090
########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { // Log the message here, because we don't know the partition before that. - log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); + log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition); } + + // Reset record to null here so that it doesn't have to be alive as long as the batch is. + record = null; } public int getPartition() { return partition; } public TopicPartition topicPartition() { - if (record == null) - return null; - return partition == RecordMetadata.UNKNOWN_PARTITION - ? ProducerInterceptors.extractTopicPartition(record) - : new TopicPartition(record.topic(), partition); + if (partition != RecordMetadata.UNKNOWN_PARTITION) + return new TopicPartition(topic, partition); Review Comment: It's a bit surprising to allocate every time a method like this is called. Can we not allocate the topic partition once and reuse it? ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { // Log the message here, because we don't know the partition before that. - log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); + log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition); } + + // Reset record to null here so that it doesn't have to be alive as long as the batch is. + record = null; Review Comment: It's a bit surprising that a method called setPartition resets the record. Maybe we can make the method name clearer. It would also be useful for the comment to state why we no longer need the record after this. ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { Review Comment: A couple of lines above we use a language level assert. In Kafka, we typically use asset like methods like the Objects class since the language level asserts are disabled by default. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -297,7 +297,12 @@ public RecordAppendResult append(String topic, byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock); + // This call may block if we exhausted buffer space. buffer = free.allocate(size, maxTimeToBlock); + // Update the current time in case the buffer allocation blocked above. + // NOTE: getting time may be expensive, so calling it under a lock + // should be avoided. + nowMs = time.milliseconds(); Review Comment: Did we reach a conclusion regarding this? ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1465,13 +1465,17 @@ public boolean isDone() { private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors<K, V> interceptors; - private final ProducerRecord<K, V> record; + private ProducerRecord<K, V> record; + private final String topic; protected int partition = RecordMetadata.UNKNOWN_PARTITION; private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) { this.userCallback = userCallback; this.interceptors = interceptors; this.record = record; + // Note a record would be null only if the client application has a bug, but we don't want to + // have NPE here, because the interceptors would not be notified (see .doSend). + topic = record != null ? record.topic() : null; Review Comment: Can you elaborate on this? What kind of application bug would surface itself in a silent way like this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org