[
https://issues.apache.org/jira/browse/BEAM-14111?focusedWorklogId=754429&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754429
]
ASF GitHub Bot logged work on BEAM-14111:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Apr/22 05:58
Start Date: 08/Apr/22 05:58
Worklog Time Spent: 10m
Work Description: aaltay commented on PR #17096:
URL: https://github.com/apache/beam/pull/17096#issuecomment-1092468455
R: @johnjcasey
Issue Time Tracking
-------------------
Worklog Id: (was: 754429)
Time Spent: 20m (was: 10m)
> Legacy (SDF wrapper-based) KafkaIO reader degrades when Kafka poll latency is
> hign
> ----------------------------------------------------------------------------------
>
> Key: BEAM-14111
> URL: https://issues.apache.org/jira/browse/BEAM-14111
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Reporter: Dmitry Orlovsky
> Assignee: Dmitry Orlovsky
> Priority: P2
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Beam has two KafkaIO source implementations now:
> * a modern one implemented as a Splittable DoFn (SDF), and
> * a (deprecated) legacy one implemented as an SDF wrapper over an
> UnboundedSource and KafkaUnboundedReader classes.
> We found that the legacy KafkaIO source can not provide good throughput when
> the latency of calls to Kafka
> [Consumer.poll|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L523]
> becomes high. The degradation is very sharp: a pipeline that drops elements
> immediately after reading them from source was only able to read about
> 100-1000 qps per Kafka partition. The Kafka cluster was overprovisioned but
> was in a remote network and had poll latency about 30ms.
> First problem that may be addressed in the scope of this bug is that there's
> very little visibility into the Kafka source now. We had to add extra logging
> to understand the issue with the pipeline above, or even see the poll latency.
> We believe that the cause of throughput degradation is poor choice of the
> [RECORDS_DEQUEUE_POLL_TIMEOUT and
> RECORDS_ENQUEUE_POLL_TIMEOUT|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335]
> especially the former one which is now 10ms.
> These are timeouts for popping and pushing elements from/to the
> [availableRecordsQueue
> |https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L343].
> This is a synchronous queue (i.e. blocking, without buffering) used to hand
> records fetched from Kafka between two loops:
> * The
> [consumerPollLoop|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L515]
> that polls data via a Kafka Consumer if there's no pending data already, and
> offers it to the availableRecordsQueue otherwise. It also does offset
> checkpointing but this is irrelevant to our case.
> * The beam UnboundedSourceAsSDFWrapperFn message processing loop. It's a bit
> complicated, but the important part is that it would call the [nextBatch
> function|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L573]
> repeatedly until an attempt to [fetch an element from the
> avaliableRecordsQueue|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L580]
> times out. After the timeout, it returns the control to the worker and it
> may take relatively long time until the loop is scheduled again.
> This is what we think is happening when the poll latency is high:
> * consumerPollLoop fetches data bundle from Kafka via poll() and offers it to
> the avaliableRecordsQueue
> * message processing loop fetches bundle from avaliableRecordsQueue and
> unblocks the consumerPollLoop
> * consumerPollLoop calls poll() again
> * message processing loop completes processing the bundle BEFORE the poll()
> call above completes, and tries to fetch next bundle from
> avaliableRecordsQueue.
> * fetch from avaliableRecordsQueue has a very short timeout (10ms) and if it
> expires before the pending poll() in the consumerPollLoop completes the
> message processing loop will believe there's no fresh data in Kafka and exit.
> All the time until the message processing loop is rescheduled is wasted.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)