Artem Livshits created KAFKA-14087:
--------------------------------------

             Summary: Add jmh benchmark for producer with MockClient
                 Key: KAFKA-14087
                 URL: https://issues.apache.org/jira/browse/KAFKA-14087
             Project: Kafka
          Issue Type: Improvement
          Components: producer 
            Reporter: Artem Livshits


Something like this
{code:java}
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
        configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");        
Time time = Time.SYSTEM;
        AtomicInteger offset = new AtomicInteger(0);        MetadataResponse 
initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, 
singletonMap("topic", 2));
        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);        
StringBuilder value = new StringBuilder("foo");
        for (int i = 0; i < 1000; i++)
            value.append("x");        AtomicInteger totalRecords = new 
AtomicInteger(0);
        long start = time.milliseconds();        CompletableFuture[] futures = 
new CompletableFuture[3];
        for (int i = 0; i < futures.length; i++) {
            futures[i] = CompletableFuture.runAsync(() -> {
                ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
                MockClient client = new MockClient(time, metadata) {
                    @Override
                    public void send(ClientRequest request, long now) {
                        super.send(request, now);
                        if (request.apiKey() == ApiKeys.PRODUCE) {
                            // Prepare response data from request.
                            ProduceResponseData responseData = new 
ProduceResponseData();                            ProduceRequest produceRequest 
= (ProduceRequest) request.requestBuilder().build();
                            produceRequest.data().topicData().forEach(topicData 
->
                                    
topicData.partitionData().forEach(partitionData -> {
                                        String topic = topicData.name();
                                        
ProduceResponseData.TopicProduceResponse tpr = 
responseData.responses().find(topic);
                                        if (tpr == null) {
                                            tpr = new 
ProduceResponseData.TopicProduceResponse().setName(topic);
                                            responseData.responses().add(tpr);
                                        }
                                        tpr.partitionResponses().add(new 
ProduceResponseData.PartitionProduceResponse()
                                                .setIndex(partitionData.index())
                                                
.setRecordErrors(Collections.emptyList())
                                                
.setBaseOffset(offset.addAndGet(1))
                                                
.setLogAppendTimeMs(time.milliseconds())
                                                .setLogStartOffset(0)
                                                .setErrorMessage("")
                                                
.setErrorCode(Errors.NONE.code()));
                                    }));                            // Schedule 
a reply to come after some time to mock broker latency.
                            executorService.schedule(() -> respond(new 
ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
                        }
                    }
                };                client.updateMetadata(initialUpdateResponse); 
               InitProducerIdResponseData responseData = new 
InitProducerIdResponseData()
                        .setErrorCode(Errors.NONE.code())
                        .setProducerEpoch((short) 0)
                        .setProducerId(42)
                        .setThrottleTimeMs(0);
                client.prepareResponse(body -> body instanceof 
InitProducerIdRequest,
                        new InitProducerIdResponse(responseData), false);       
         try (KafkaProducer<String, String> producer = kafkaProducer(
                        configs,
                        new StringSerializer(),
                        new StringSerializer(),
                        metadata,
                        client,
                        null,
                        time
                )) {
                    final int records = 20_000_000;                    for (int 
k = 0; k < records; k++) {
                        producer.send(new ProducerRecord<>("topic", null, 
start, "key-" + k, value.toString()));
                    }                    totalRecords.addAndGet(records);
                }
            });
        }        for (CompletableFuture future : futures) {
            future.get();
        } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to