[
https://issues.apache.org/jira/browse/KAFKA-13559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rajini Sivaram resolved KAFKA-13559.
------------------------------------
Fix Version/s: 3.4.0
Reviewer: Rajini Sivaram
Resolution: Fixed
> The broker's ProduceResponse may be delayed for 300ms
> ------------------------------------------------------
>
> Key: KAFKA-13559
> URL: https://issues.apache.org/jira/browse/KAFKA-13559
> Project: Kafka
> Issue Type: Task
> Components: core
> Affects Versions: 2.7.0
> Reporter: frankshi
> Assignee: Badai Aqrandista
> Priority: Major
> Fix For: 3.4.0
>
> Attachments: image-1.png, image-2.png,
> image-2021-12-21-14-44-56-689.png, image-2021-12-21-14-45-22-716.png,
> image-3.png, image-5.png, image-6.png, image-7.png, image.png
>
>
> Hi team:
> We have found the value in the source code
> [here|https://github.com/apache/kafka/blob/2.7/core/src/main/scala/kafka/network/SocketServer.scala#L922]
> may lead broker’s ProduceResponse to be delayed for 300ms.
> * Server-version: 2.13-2.7.0.
> * Client-version: confluent-kafka-python-1.5.0.
> we have set the client’s configuration as following:
> {code:java}
> ling.ms = 0
> acks = 1
> delivery.timeout.ms = 100
> request.timeout.ms = 80
> Sasl.mechanism = “PLAIN”
> Security.protocol = “SASL_SSL”
> ......
> {code}
> Because we set ACKs = 1, the client sends ProduceRequests and receives
> ProduceResponses from brokers. The leader broker doesn't need to wait for the
> ISR’s writing data to disk successfully. It can reply to the client by
> sending ProduceResponses directly. In our situation, the ping value between
> the client and the kafka brokers is about ~10ms, and most of the time, the
> responses are received about 10ms after the Produce requests are sent. But
> sometimes the responses are received about ~300ms later.
> The following shows the log from the client.
> {code:java}
> 2021-11-26 02:31:30,567 Sent partial ProduceRequest (v7, 0+16527/37366
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568 Sent partial ProduceRequest (v7, 16527+16384/37366
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568 Sent ProduceRequest (v7, 37366 bytes @ 32911, CorrId
> 2753)
> 2021-11-26 02:31:30,570 Sent ProduceRequest (v7, 4714 bytes @ 0, CorrId 2754)
> 2021-11-26 02:31:30,571 Sent ProduceRequest (v7, 1161 bytes @ 0, CorrId 2755)
> 2021-11-26 02:31:30,572 Sent ProduceRequest (v7, 1240 bytes @ 0, CorrId 2756)
> 2021-11-26 02:31:30,572 Received ProduceResponse (v7, 69 bytes, CorrId 2751,
> rtt 9.79ms)
> 2021-11-26 02:31:30,572 Received ProduceResponse (v7, 69 bytes, CorrId 2752,
> rtt 10.34ms)
> 2021-11-26 02:31:30,573 Received ProduceResponse (v7, 69 bytes, CorrId 2753,
> rtt 10.11ms)
> 2021-11-26 02:31:30,872 Received ProduceResponse (v7, 69 bytes, CorrId 2754,
> rtt 309.69ms)
> 2021-11-26 02:31:30,883 Sent ProduceRequest (v7, 1818 bytes @ 0, CorrId 2757)
> 2021-11-26 02:31:30,887 Sent ProduceRequest (v7, 1655 bytes @ 0, CorrId 2758)
> 2021-11-26 02:31:30,888 Received ProduceResponse (v7, 69 bytes, CorrId 2755,
> rtt 318.85ms)
> 2021-11-26 02:31:30,893 Sent partial ProduceRequest (v7, 0+16527/37562
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,894 Sent partial ProduceRequest (v7, 16527+16384/37562
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,895 Sent ProduceRequest (v7, 37562 bytes @ 32911, CorrId
> 2759)
> 2021-11-26 02:31:30,896 Sent ProduceRequest (v7, 4700 bytes @ 0, CorrId 2760)
> 2021-11-26 02:31:30,897 Received ProduceResponse (v7, 69 bytes, CorrId 2756,
> rtt 317.74ms)
> 2021-11-26 02:31:30,897 Received ProduceResponse (v7, 69 bytes, CorrId 2757,
> rtt 4.22ms)
> 2021-11-26 02:31:30,899 Received ProduceResponse (v7, 69 bytes, CorrId 2758,
> rtt 2.61ms){code}
>
> The requests of CorrId 2753 and 2754 are almost sent at the same time, but
> the Response of 2754 is delayed for ~300ms.
> We checked the logs on the broker.
>
> {code:java}
> [2021-11-26 02:31:30,873] DEBUG Completed
> request:RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=rdkafka,
> correlationId=2754) – {acks=1,timeout=80,numPartitions=1},response:
> {responses=[\{topic=***,partition_responses=[{partition=32,error_code=0,base_offset=58625,log_append_time=-1,log_start_offset=49773}]}
> ],throttle_time_ms=0} from connection
> 10.10.44.59:9093-10.10.0.68:31183-66;totalTime:0.852,requestQueueTime:0.128,localTime:0.427,remoteTime:0.09,throttleTime:0,responseQueueTime:0.073,sendTime:0.131,securityProtocol:SASL_SSL,principal:User:***,listener:SASL_SSL,clientInformation:ClientInformation(softwareName=confluent-kafka-python,
> softwareVersion=1.5.0-rdkafka-1.5.2) (kafka.request.logger)
> {code}
>
>
> It seems that the time cost on the server side is very small. What’s the
> reason for the latency spikes?
> We also did tcpdump at the server side and found the delay comes from the
> server side.
> The CorrId=2754’s request was received at 10:31:30.566172 and The
> CorrId=2754’s response was sent at 10:31:30.873518. So, the server's
> processing time for this request is about {*}873-566=307ms{*}.
> wireshark shows the CorrId=2754 ProduceRequest's timestamp and request info.
> !image-2021-12-21-14-45-22-716.png!
> wireshark shows the CorrId=2754 ProduceResponse's timestamp and response info.
> !image-2021-12-21-14-44-56-689.png!
>
> We checked the source code and found the problems. The broker’s processor’s
> run loop is as following:
> !image-5.png|width=1001,height=449!
> Look at the poll function, you can see the {*}poll timeout value is 300ms{*}.
> {code:java}
> private def poll(): Unit = {
> val pollTimeout = if (newConnections.isEmpty) 300 else 0
> try selector.poll(pollTimeout)
> catch {
> case e @ (_: IllegalStateException | _: IOException) =>
> // The exception is not re-thrown and any completed
> sends/receives/connections/disconnections
> // from this poll will be processed.
> error(s"Processor $id poll failed", e)
> }
> }{code}
>
> The following is the selector.poll function:
> !image-6.png!
> So, we may encounter the following situation:
> * The first run in the loop.
> ** poll -> received request ->processCompletedReceives -> request to queue.
> * The second run in the loop.
> ** processNewResponse-> ResponseQueue is empty(IO thread is processing the
> request) -> poll() -> select(timeout=0) ->
> {color:#172b4d}madeReadProgressLastCall = false{color}
> * The third run in the loop.
> ** processNewResponse -> ResponseQueue is *NOT* empty -> poll() ->
> select(timeout=300) immediately return, because the response data is already,
> the fd has been added to write_fd sets.
> * The fourth run in the loop.
> ** ResponseQueue is empty() -> poll() -> select(timeout=300) wait for 300ms
> or new data arrives.
> The server may receive server produce requests at one time but can only
> handle one request each time, after the previous response sending finished,
> then it can handle the next request. When the previous request is in
> handling status, the other requests are saved in the cache. So, if the first
> response was sent and at that time no new data arrived, the saved request may
> be delayed for 300ms to process.
> {color:#ff0000}*We suggest changing the poll timeout value from 300 to
> 10.*{color}
> *The following two figures show the comparisons of Request-Response RTT
> value.*
> *!image-3.png!*
> RTT values when poll timeout value = 300
>
> !image-2.png!
> RTT values when poll timeout value = 10
>
> Another problem, why does the server's log show the total time is very small?
> Because the start time is set in the following function
> processCompletedReceives, yet when the request is saved at the cache, the
> timer doesn't start, so the totalTime doesn't include the time in the cache.
> !image-7.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)