artemlivshits commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r910500293
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -160,91 +220,162 @@ public double measure(MetricConfig config, long now) {
metrics.addMetric(metricName, availableBytes);
}
+ private void setPartition(AppendCallbacks callbacks, int partition) {
+ if (callbacks != null)
+ callbacks.setPartition(partition);
+ }
+
/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for
whether the appended batch is full or a new batch is created
* <p>
*
- * @param tp The topic/partition to which this record is being sent
+ * @param topic The topic to which this record is being sent
+ * @param partition The partition to which this record is being sent or
RecordMetadata.UNKNOWN_PARTITION
+ * if any partition could be used
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param headers the Headers for the record
- * @param callback The user-supplied callback to execute when the request
is complete
+ * @param callbacks The callbacks to execute
* @param maxTimeToBlock The maximum time in milliseconds to block for
buffer memory to be available
* @param abortOnNewBatch A boolean that indicates returning before a new
batch is created and
* running the partitioner's onNewBatch method
before trying to append again
* @param nowMs The current time, in milliseconds
+ * @param cluster The cluster metadata
*/
- public RecordAppendResult append(TopicPartition tp,
+ public RecordAppendResult append(String topic,
+ int partition,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
- Callback callback,
+ AppendCallbacks callbacks,
long maxTimeToBlock,
boolean abortOnNewBatch,
- long nowMs) throws InterruptedException {
+ long nowMs,
+ Cluster cluster) throws
InterruptedException {
+ TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new
TopicInfo(logContext, k, batchSize));
+
// We keep track of the number of appending thread to make sure we do
not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
- // check if we have an in-progress batch
- Deque<ProducerBatch> dq = getOrCreateDeque(tp);
- synchronized (dq) {
- if (closed)
- throw new KafkaException("Producer closed while send in
progress");
- RecordAppendResult appendResult = tryAppend(timestamp, key,
value, headers, callback, dq, nowMs);
- if (appendResult != null)
- return appendResult;
- }
+ // Loop to retry in case we encounter partitioner's race
conditions.
+ while (true) {
+ // If the message doesn't have any partition affinity, so we
pick a partition based on the broker
+ // availability and performance. Note, that here we peek
current partition before we hold the
+ // deque lock, so we'll need to make sure that it's not
changed while we were waiting for the
+ // deque lock.
+ final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
+ final int effectivePartition;
+ if (partition == RecordMetadata.UNKNOWN_PARTITION) {
+ partitionInfo =
topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
+ effectivePartition = partitionInfo.partition();
+ } else {
+ partitionInfo = null;
+ effectivePartition = partition;
+ }
- // we don't have an in-progress record batch try to allocate a new
batch
- if (abortOnNewBatch) {
- // Return a result that will cause another call to append.
- return new RecordAppendResult(null, false, false, true);
- }
+ // Now that we know the effective partition, let the caller
know.
+ setPartition(callbacks, effectivePartition);
+
+ // check if we have an in-progress batch
+ Deque<ProducerBatch> dq =
topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
+ synchronized (dq) {
+ // After taking the lock, validate that the partition
hasn't changed and retry.
+ if
(topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
+ log.trace("Partition {} for topic {} switched by a
concurrent append, retrying",
+ partitionInfo.partition(), topic);
+ continue;
+ }
+ RecordAppendResult appendResult = tryAppend(timestamp,
key, value, headers, callbacks, dq, nowMs);
+ if (appendResult != null) {
+
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo,
appendResult.appendedBytes, cluster);
+ return appendResult;
+ }
+ }
- byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
- int size = Math.max(this.batchSize,
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key,
value, headers));
- log.trace("Allocating a new {} byte message buffer for topic {}
partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(),
maxTimeToBlock);
- buffer = free.allocate(size, maxTimeToBlock);
+ // we don't have an in-progress record batch try to allocate a
new batch
+ if (abortOnNewBatch) {
+ // Return a result that will cause another call to append.
+ return new RecordAppendResult(null, false, false, true, 0);
+ }
- // Update the current time in case the buffer allocation blocked
above.
- nowMs = time.milliseconds();
- synchronized (dq) {
- // Need to check if producer is closed again after grabbing
the dequeue lock.
- if (closed)
- throw new KafkaException("Producer closed while send in
progress");
+ if (buffer == null) {
+ byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
+ int size = Math.max(this.batchSize,
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key,
value, headers));
+ log.trace("Allocating a new {} byte message buffer for
topic {} partition {} with remaining timeout {}ms", size, topic, partition,
maxTimeToBlock);
+ buffer = free.allocate(size, maxTimeToBlock);
+ }
- RecordAppendResult appendResult = tryAppend(timestamp, key,
value, headers, callback, dq, nowMs);
- if (appendResult != null) {
- // Somebody else found us a batch, return the one we
waited for! Hopefully this doesn't happen often...
+ synchronized (dq) {
+ // After taking the lock, validate that the partition
hasn't changed and retry.
+ if
(topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
+ log.trace("Partition {} for topic {} switched by a
concurrent append, retrying",
+ partitionInfo.partition(), topic);
+ continue;
+ }
+ RecordAppendResult appendResult = appendNewBatch(topic,
effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer);
+ // Set buffer to null, so that deallocate doesn't return
it back to free pool, since it's used in the batch.
+ if (appendResult.newBatchCreated)
+ buffer = null;
+
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo,
appendResult.appendedBytes, cluster);
return appendResult;
}
-
- MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer,
maxUsableMagic);
- ProducerBatch batch = new ProducerBatch(tp, recordsBuilder,
nowMs);
- FutureRecordMetadata future =
Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
- callback, nowMs));
-
- dq.addLast(batch);
- incomplete.add(batch);
-
- // Don't deallocate this buffer in the finally block as it's
being used in the record batch
- buffer = null;
- return new RecordAppendResult(future, dq.size() > 1 ||
batch.isFull(), true, false);
}
} finally {
- if (buffer != null)
- free.deallocate(buffer);
+ free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
+ /**
+ * Append a new batch to the queue
+ *
+ * @param topic The topic
+ * @param partition The partition (cannot be
RecordMetadata.UNKNOWN_PARTITION)
+ * @param dq The queue
+ * @param timestamp The timestamp of the record
+ * @param key The key for the record
+ * @param value The value for the record
+ * @param headers the Headers for the record
+ * @param callbacks The callbacks to execute
+ * @param buffer The buffer for the new batch
+ */
+ private RecordAppendResult appendNewBatch(String topic,
+ int partition,
+ Deque<ProducerBatch> dq,
+ long timestamp,
+ byte[] key,
+ byte[] value,
+ Header[] headers,
+ AppendCallbacks callbacks,
+ ByteBuffer buffer) {
+ assert partition != RecordMetadata.UNKNOWN_PARTITION;
+
+ // Update the current time in case the buffer allocation blocked above.
+ long nowMs = time.milliseconds();
Review Comment:
Yep, the previous code had it just after the allocate and while moving code
around I accidentally moved it under the lock. Fix
https://github.com/apache/kafka/pull/12365.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]