Artem Livshits created KAFKA-14087:
--------------------------------------
Summary: Add jmh benchmark for producer with MockClient
Key: KAFKA-14087
URL: https://issues.apache.org/jira/browse/KAFKA-14087
Project: Kafka
Issue Type: Improvement
Components: producer
Reporter: Artem Livshits
Something like this
{code:java}
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
Time time = Time.SYSTEM;
AtomicInteger offset = new AtomicInteger(0); MetadataResponse
initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1,
singletonMap("topic", 2));
ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
StringBuilder value = new StringBuilder("foo");
for (int i = 0; i < 1000; i++)
value.append("x"); AtomicInteger totalRecords = new
AtomicInteger(0);
long start = time.milliseconds(); CompletableFuture[] futures =
new CompletableFuture[3];
for (int i = 0; i < futures.length; i++) {
futures[i] = CompletableFuture.runAsync(() -> {
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
MockClient client = new MockClient(time, metadata) {
@Override
public void send(ClientRequest request, long now) {
super.send(request, now);
if (request.apiKey() == ApiKeys.PRODUCE) {
// Prepare response data from request.
ProduceResponseData responseData = new
ProduceResponseData(); ProduceRequest produceRequest
= (ProduceRequest) request.requestBuilder().build();
produceRequest.data().topicData().forEach(topicData
->
topicData.partitionData().forEach(partitionData -> {
String topic = topicData.name();
ProduceResponseData.TopicProduceResponse tpr =
responseData.responses().find(topic);
if (tpr == null) {
tpr = new
ProduceResponseData.TopicProduceResponse().setName(topic);
responseData.responses().add(tpr);
}
tpr.partitionResponses().add(new
ProduceResponseData.PartitionProduceResponse()
.setIndex(partitionData.index())
.setRecordErrors(Collections.emptyList())
.setBaseOffset(offset.addAndGet(1))
.setLogAppendTimeMs(time.milliseconds())
.setLogStartOffset(0)
.setErrorMessage("")
.setErrorCode(Errors.NONE.code()));
})); // Schedule
a reply to come after some time to mock broker latency.
executorService.schedule(() -> respond(new
ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
}
}
}; client.updateMetadata(initialUpdateResponse);
InitProducerIdResponseData responseData = new
InitProducerIdResponseData()
.setErrorCode(Errors.NONE.code())
.setProducerEpoch((short) 0)
.setProducerId(42)
.setThrottleTimeMs(0);
client.prepareResponse(body -> body instanceof
InitProducerIdRequest,
new InitProducerIdResponse(responseData), false);
try (KafkaProducer<String, String> producer = kafkaProducer(
configs,
new StringSerializer(),
new StringSerializer(),
metadata,
client,
null,
time
)) {
final int records = 20_000_000; for (int
k = 0; k < records; k++) {
producer.send(new ProducerRecord<>("topic", null,
start, "key-" + k, value.toString()));
} totalRecords.addAndGet(records);
}
});
} for (CompletableFuture future : futures) {
future.get();
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)