artemlivshits commented on code in PR #12570:
URL: https://github.com/apache/kafka/pull/12570#discussion_r969101389
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -170,13 +170,49 @@ boolean isPartitionChanged(StickyPartitionInfo
partitionInfo) {
* @param cluster The cluster information
*/
void updatePartitionInfo(StickyPartitionInfo partitionInfo, int
appendedBytes, Cluster cluster) {
+ updatePartitionInfo(partitionInfo, appendedBytes, cluster, true);
+ }
+
+ /**
+ * Update partition info with the number of bytes appended and maybe
switch partition.
+ * NOTE this function needs to be called under the partition's batch queue
lock.
+ *
+ * @param partitionInfo The sticky partition info object returned by
peekCurrentPartitionInfo
+ * @param appendedBytes The number of bytes appended to this partition
+ * @param cluster The cluster information
+ * @param enableSwitch If true, switch partition once produced enough bytes
+ */
+ void updatePartitionInfo(StickyPartitionInfo partitionInfo, int
appendedBytes, Cluster cluster, boolean enableSwitch) {
// partitionInfo may be null if the caller didn't use built-in
partitioner.
if (partitionInfo == null)
return;
assert partitionInfo == stickyPartitionInfo.get();
int producedBytes =
partitionInfo.producedBytes.addAndGet(appendedBytes);
- if (producedBytes >= stickyBatchSize) {
+
+ // We're trying to switch partition once we produce stickyBatchSize
bytes to a partition
+ // but doing so may hinder batching because partition switch may
happen while batch isn't
+ // ready to send. This situation is especially likely with high
linger.ms setting.
+ // Consider the following example:
+ // linger.ms=500, producer produces 12KB in 500ms, batch.size=16KB
+ // - first batch collects 12KB in 500ms, gets sent
+ // - second batch collects 4KB, then we switch partition, so 4KB
gets eventually sent
+ // - ... and so on - we'd get 12KB and 4KB batches
+ // To get more optimal batching and avoid 4KB fractional batches, the
caller may disallow
+ // partition switch if batch is not ready to send, so with the example
above we'd avoid
+ // fractional 4KB batches: in that case the scenario would look like
this:
+ // - first batch collects 12KB in 500ms, gets sent
+ // - second batch collects 4KB, but partition switch doesn't
happen because batch in not ready
+ // - second batch collects 12KB in 500ms, gets sent and now we
switch partition.
+ // - ... and so on - we'd just send 12KB batches
+ // We cap the produced bytes to not exceed 2x of the batch size to
avoid pathological cases
+ // (e.g. if we have a mix of keyed and unkeyed messages, key messages
may create an
+ // unready batch after the batch that disabled partition switch
becomes ready).
+ // As a result, with high latency.ms setting we end up switching
partitions after producing
+ // between stickyBatchSize and stickyBatchSize * 2 bytes, to better
align with batch boundary.
+ if (producedBytes >= stickyBatchSize * 2)
+ log.trace("Exceeded {} bytes, produced {} bytes, enable is {}",
stickyBatchSize * 2, producedBytes, enableSwitch);
Review Comment:
ok
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1137,23 +1137,26 @@ public void onCompletion(RecordMetadata metadata,
Exception exception) {
assertEquals(partition1, partition.get());
assertEquals(2, mockRandom.get());
- // Produce large record, we should switch to next partition.
+ // Produce large record, we switched to next partition by previous
produce, but
+ // for this produce the switch would be disabled because of
incomplete batch.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null,
largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(),
cluster);
assertEquals(partition2, partition.get());
- assertEquals(3, mockRandom.get());
+ assertEquals(2, mockRandom.get());
Review Comment:
What happens here is the following:
1. First record (small) -- gets a new partition (because there was none)
2. Second record (large) doesn't fit, so the first record forms a batch (but
not enough to switch).
3. Second record (large) creates a new batch, but it's not marked as full
(disabling the switch).
4. Third record arrives, doesn't fit into the batch, it's marked as full
(completing the switch, that was disabled in step 3).
So effectively in the step 4 the switch happens before the records is added,
rather than after.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -170,13 +170,49 @@ boolean isPartitionChanged(StickyPartitionInfo
partitionInfo) {
* @param cluster The cluster information
*/
void updatePartitionInfo(StickyPartitionInfo partitionInfo, int
appendedBytes, Cluster cluster) {
+ updatePartitionInfo(partitionInfo, appendedBytes, cluster, true);
+ }
+
+ /**
+ * Update partition info with the number of bytes appended and maybe
switch partition.
+ * NOTE this function needs to be called under the partition's batch queue
lock.
+ *
+ * @param partitionInfo The sticky partition info object returned by
peekCurrentPartitionInfo
+ * @param appendedBytes The number of bytes appended to this partition
+ * @param cluster The cluster information
+ * @param enableSwitch If true, switch partition once produced enough bytes
+ */
+ void updatePartitionInfo(StickyPartitionInfo partitionInfo, int
appendedBytes, Cluster cluster, boolean enableSwitch) {
// partitionInfo may be null if the caller didn't use built-in
partitioner.
if (partitionInfo == null)
return;
assert partitionInfo == stickyPartitionInfo.get();
int producedBytes =
partitionInfo.producedBytes.addAndGet(appendedBytes);
- if (producedBytes >= stickyBatchSize) {
+
+ // We're trying to switch partition once we produce stickyBatchSize
bytes to a partition
+ // but doing so may hinder batching because partition switch may
happen while batch isn't
+ // ready to send. This situation is especially likely with high
linger.ms setting.
+ // Consider the following example:
+ // linger.ms=500, producer produces 12KB in 500ms, batch.size=16KB
+ // - first batch collects 12KB in 500ms, gets sent
+ // - second batch collects 4KB, then we switch partition, so 4KB
gets eventually sent
+ // - ... and so on - we'd get 12KB and 4KB batches
+ // To get more optimal batching and avoid 4KB fractional batches, the
caller may disallow
+ // partition switch if batch is not ready to send, so with the example
above we'd avoid
+ // fractional 4KB batches: in that case the scenario would look like
this:
+ // - first batch collects 12KB in 500ms, gets sent
+ // - second batch collects 4KB, but partition switch doesn't
happen because batch in not ready
+ // - second batch collects 12KB in 500ms, gets sent and now we
switch partition.
+ // - ... and so on - we'd just send 12KB batches
+ // We cap the produced bytes to not exceed 2x of the batch size to
avoid pathological cases
+ // (e.g. if we have a mix of keyed and unkeyed messages, key messages
may create an
+ // unready batch after the batch that disabled partition switch
becomes ready).
+ // As a result, with high latency.ms setting we end up switching
partitions after producing
+ // between stickyBatchSize and stickyBatchSize * 2 bytes, to better
align with batch boundary.
+ if (producedBytes >= stickyBatchSize * 2)
Review Comment:
This could potentially happen if we have a mix of keyed and unkeyed messages.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -378,6 +415,15 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer
buffer, byte maxUsableMag
}
/**
+ * Check if all batches in the queue are full.
+ */
+ private boolean allBatchesFull(Deque<ProducerBatch> deque) {
+ // Only the last batch may be incomplete, so we just check that.
+ ProducerBatch last = deque.peekLast();
+ return last == null || last.isFull();
Review Comment:
Did a bit of experiments and discussed the results offline, looks like the
condition as it is would be more backward compatible.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -378,6 +415,15 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer
buffer, byte maxUsableMag
}
/**
+ * Check if all batches in the queue are full.
+ */
+ private boolean allBatchesFull(Deque<ProducerBatch> deque) {
Review Comment:
I think lastBatchFull doesn't include the case of last == null.
Alternatively, we could invert the function and name it hasIncompleteBatches.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]