This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 46540eb KAFKA-9820: validateMessagesAndAssignOffsetsCompressed
allocates unused iterator (#8422)
46540eb is described below
commit 46540eb5e0a7abb32a159250564d42137bd8b99f
Author: Lucas Bradstreet <[email protected]>
AuthorDate: Sat Apr 4 10:05:51 2020 -0700
KAFKA-9820: validateMessagesAndAssignOffsetsCompressed allocates unused
iterator (#8422)
https://github.com/apache/kafka/commit/3e9d1c1411c5268de382f9dfcc95bdf66d0063a0
introduced skipKeyValueIterator(s) which were intended to be used, but in this
case were created but were not used in offset validation.
A subset of the benchmark results follow. Looks like a 20% improvement in
validation performance and a 40% reduction in garbage allocation for 1-2 batch
sizes.
**# Parameters: (bufferSupplierStr = NO_CACHING, bytes = RANDOM,
compressionType = LZ4, maxBatchSize = 1, messageSize = 1000, messageVersion =
2)**
Before:
Result
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
64851.837 ±(99.9%) 944.248 ops/s [Average]
(min, avg, max) = (64505.317, 64851.837, 65114.359), stdev = 245.218
CI (99.9%): [63907.589, 65796.084] (assumes normal distribution)
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
164088.003 ±(99.9%) 0.004 B/op [Average]
(min, avg, max) = (164088.001, 164088.003, 164088.004), stdev = 0.001
CI (99.9%): [164087.998, 164088.007] (assumes normal distribution)
After:
Result
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
78910.273 ±(99.9%) 707.024 ops/s [Average]
(min, avg, max) = (78785.486, 78910.273, 79234.007), stdev = 183.612
CI (99.9%): [78203.249, 79617.297] (assumes normal distribution)
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
96440.002 ±(99.9%) 0.001 B/op [Average]
(min, avg, max) = (96440.002, 96440.002, 96440.002), stdev = 0.001
CI (99.9%): [96440.002, 96440.003] (assumes normal distribution)
**# Parameters: (bufferSupplierStr = NO_CACHING, bytes = RANDOM,
compressionType = LZ4, maxBatchSize = 2, messageSize = 1000, messageVersion =
2)**
Before:
Result
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
64815.364 ±(99.9%) 639.309 ops/s [Average]
(min, avg, max) = (64594.545, 64815.364, 64983.305), stdev = 166.026
CI (99.9%): [64176.056, 65454.673] (assumes normal distribution)
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
163944.003 ±(99.9%) 0.001 B/op [Average]
(min, avg, max) = (163944.002, 163944.003, 163944.003), stdev = 0.001
CI (99.9%): [163944.002, 163944.004] (assumes normal distribution)
After:
Result
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
77075.096 ±(99.9%) 201.092 ops/s [Average]
(min, avg, max) = (77021.537, 77075.096, 77129.693), stdev = 52.223
CI (99.9%): [76874.003, 77276.188] (assumes normal distribution)
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
96504.002 ±(99.9%) 0.003 B/op [Average]
(min, avg, max) = (96504.001, 96504.002, 96504.003), stdev = 0.001
CI (99.9%): [96503.999, 96504.005] (assumes normal distribution)
Reviewers: Chia-Ping Tsai <[email protected]>, Ismael Juma
<[email protected]>
---
checkstyle/import-control-jmh-benchmarks.xml | 2 +
core/src/main/scala/kafka/log/LogValidator.scala | 4 +-
.../jmh/record/RecordBatchIterationBenchmark.java | 43 ++++++++++++++++++++--
3 files changed, 45 insertions(+), 4 deletions(-)
diff --git a/checkstyle/import-control-jmh-benchmarks.xml
b/checkstyle/import-control-jmh-benchmarks.xml
index 4b546cb..e5017e4 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -44,6 +44,8 @@
<allow class="kafka.utils.Pool"/>
<allow class="kafka.utils.KafkaScheduler"/>
<allow class="org.apache.kafka.clients.FetchSessionHandler"/>
+ <allow pkg="kafka.common"/>
+ <allow pkg="kafka.message"/>
<allow pkg="org.mockito"/>
<allow pkg="kafka.security.authorizer"/>
<allow pkg="org.apache.kafka.server"/>
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala
b/core/src/main/scala/kafka/log/LogValidator.scala
index 08c2270..4a30777 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -414,7 +414,8 @@ private[log] object LogValidator extends Logging {
try {
val recordErrors = new ArrayBuffer[ApiRecordError](0)
- for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+ var batchIndex = 0
+ for (record <- recordsIterator.asScala) {
val expectedOffset = expectedInnerOffset.getAndIncrement()
val recordError = validateRecordCompression(batchIndex,
record).orElse {
validateRecord(batch, topicPartition, record, batchIndex, now,
@@ -433,6 +434,7 @@ private[log] object LogValidator extends Logging {
uncompressedSizeInBytes += record.sizeInBytes()
validatedRecords += record
}
+ batchIndex += 1
}
processRecordErrors(recordErrors)
} finally {
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
index 73552c2..da1b645 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
@@ -16,6 +16,13 @@
*/
package org.apache.kafka.jmh.record;
+import kafka.api.ApiVersion;
+import kafka.common.LongRef;
+import kafka.log.AppendOrigin;
+import kafka.log.LogValidator;
+import kafka.message.CompressionCodec;
+import kafka.server.BrokerTopicStats;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
@@ -26,6 +33,7 @@ import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.Time;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
@@ -57,7 +65,7 @@ public class RecordBatchIterationBenchmark {
RANDOM, ONES
}
- @Param(value = {"10", "50", "200", "500"})
+ @Param(value = {"1", "2", "10", "50", "200", "500"})
private int maxBatchSize = 200;
@Param(value = {"LZ4", "SNAPPY", "GZIP", "ZSTD", "NONE"})
@@ -72,8 +80,11 @@ public class RecordBatchIterationBenchmark {
@Param(value = {"RANDOM", "ONES"})
private Bytes bytes = Bytes.RANDOM;
+ @Param(value = {"NO_CACHING", "CREATE"})
+ private String bufferSupplierStr;
+
// zero starting offset is much faster for v1 batches, but that will
almost never happen
- private final int startingOffset = 42;
+ private int startingOffset;
// Used by measureSingleMessage
private ByteBuffer singleBatchBuffer;
@@ -83,9 +94,22 @@ public class RecordBatchIterationBenchmark {
private int[] batchSizes;
private BufferSupplier bufferSupplier;
+
+
@Setup
public void init() {
- bufferSupplier = BufferSupplier.create();
+ // For v0 batches a zero starting offset is much faster but that will
almost never happen.
+ // For v2 batches we use starting offset = 0 as these batches are
relative to the base
+ // offset and measureValidation will mutate these batches between
iterations
+ startingOffset = messageVersion == 2 ? 0 : 42;
+
+ if (bufferSupplierStr.equals("NO_CACHING")) {
+ bufferSupplier = BufferSupplier.NO_CACHING;
+ } else if (bufferSupplierStr.equals("CREATE")) {
+ bufferSupplier = BufferSupplier.create();
+ } else {
+ throw new IllegalArgumentException("Unsupported buffer supplier "
+ bufferSupplierStr);
+ }
singleBatchBuffer = createBatch(1);
batchBuffers = new ByteBuffer[batchCount];
@@ -123,6 +147,19 @@ public class RecordBatchIterationBenchmark {
}
@Benchmark
+ public void measureValidation(Blackhole bh) throws IOException {
+ MemoryRecords records =
MemoryRecords.readableRecords(singleBatchBuffer.duplicate());
+ LogValidator.validateMessagesAndAssignOffsetsCompressed(records, new
TopicPartition("a", 0),
+ new LongRef(startingOffset), Time.SYSTEM,
System.currentTimeMillis(),
+ CompressionCodec.getCompressionCodec(compressionType.id),
+ CompressionCodec.getCompressionCodec(compressionType.id),
+ false, messageVersion, TimestampType.CREATE_TIME,
Long.MAX_VALUE, 0,
+ new AppendOrigin.Client$(),
+ ApiVersion.latestVersion(),
+ new BrokerTopicStats());
+ }
+
+ @Benchmark
public void measureIteratorForBatchWithSingleMessage(Blackhole bh) throws
IOException {
for (RecordBatch batch :
MemoryRecords.readableRecords(singleBatchBuffer.duplicate()).batches()) {
try (CloseableIterator<Record> iterator =
batch.streamingIterator(bufferSupplier)) {