[
https://issues.apache.org/jira/browse/BEAM-14111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dmitry Orlovsky updated BEAM-14111:
-----------------------------------
Description:
Beam has two KafkaIO source implementations now:
* a modern one implemented as a Splittable DoFn (SDF), and
* a (deprecated) legacy one implemented as an SDF wrapper over an
UnboundedSource and KafkaUnboundedReader classes.
We found that the legacy KafkaIO source can not provide good throughput when
the latency of calls to Kafka
[Consumer.poll|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L523]
becomes high. The degradation is very sharp: a pipeline that drops elements
immediately after reading them from source was only able to read about 100-1000
qps per Kafka partition. The Kafka cluster was overprovisioned but was in a
remote network and had poll latency about 30ms.
First problem that may be addressed in the scope of this bug is that there's
very little visibility into the Kafka source now. We had to add extra logging
to understand the issue with the pipeline above, or even see the poll latency.
We believe that the cause of throughput degradation is poor choice of the
[RECORDS_DEQUEUE_POLL_TIMEOUT and
RECORDS_ENQUEUE_POLL_TIMEOUT|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335],]][,|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335],]
especially the former one which is now 10ms.
These are timeouts for popping and pushing elements from/to the
[availableRecordsQueue
|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L343].
This is a synchronous queue (i.e. blocking, without buffering) used to hand
records fetched from Kafka between two loops:
* The
[consumerPollLoop|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L515]
that polls data via a Kafka Consumer if there's no pending data already, and
offers it to the availableRecordsQueue otherwise. It also does offset
checkpointing but this is irrelevant to our case.
* The beam UnboundedSourceAsSDFWrapperFn message processing loop. It's a bit
complicated, but the important part is that it would call the [nextBatch
function|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L573]
repeatedly until an attempt to [fetch an element from the
avaliableRecordsQueue|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L580]
times out. After the timeout, it returns the control to the worker and it may
take relatively long time until the loop is scheduled again.
This is what we think is happening when the poll latency is high:
* consumerPollLoop fetches data bundle from Kafka via poll() and offers it to
the avaliableRecordsQueue
* message processing loop fetches bundle from avaliableRecordsQueue and
unblocks the avaliableRecordsQueue
* consumerPollLoop calls poll() again
* message processing loop completes processing the bundle BEFORE the poll()
call again completes, and tries to fetch next bundle from avaliableRecordsQueue.
* fetch from avaliableRecordsQueue has a very short timeout (10ms) and if it
expires before the pending poll() in the consumerPollLoop completes the message
processing loop will believe there's no fresh data in Kafka and exit. All the
time until the message processing loop is rescheduled is wasted.
was:
Beam has two KafkaIO source implementations now:
* a modern one implemented as a Splittable DoFn (SDF), and
* a (deprecated) legacy one implemented as an SDF wrapper over an
UnboundedSource and KafkaUnboundedReader classes.
We found that the legacy KafkaIO source can not provide good throughput when
the latency of calls to Kafka
[Consumer.poll|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L523]
becomes high. The degradation is very sharp: a pipeline that drops elements
immediately after reading them from source was only able to read about 100-1000
qps per Kafka partition. The Kafka cluster was overprovisioned but was in a
remote network and had poll latency about 30ms.
First problem that may be addressed in the scope of this bug is that there's
very little visibility into the Kafka source now. We had to add extra logging
to understand the issue with the pipeline above, or even see the poll latency.
We believe that the cause of throughput degradation is poor choice of the
[RECORDS_DEQUEUE_POLL_TIMEOUT and
RECORDS_ENQUEUE_POLL_TIMEOUT|[https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335],]][,|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335],]
especially the former one which is now 10ms.
These are timeouts for popping and pushing elements from/to the
[availableRecordsQueue
|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L343].
This is a synchronous queue (i.e. blocking, without buffering) used to hand
records fetched from Kafka between two loops:
* The
[consumerPollLoop|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L515]
that polls data via a Kafka Consumer if there's no pending data already, and
offers it to the availableRecordsQueue otherwise. It also does offset
checkpointing but this is irrelevant to our case.
* The beam UnboundedSourceAsSDFWrapperFn message processing loop. It's a bit
complicated, but the important part is that it would call the [nextBatch
function|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L573]
repeatedly until an attempt to [fetch an element from the
avaliableRecordsQueue|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L580]
times out. After the timeout, it returns the control to the worker and it may
take relatively long time until the loop is scheduled again.
This is what we think is happening when the poll latency is high:
* consumerPollLoop fetches data bundle from Kafka via poll() and offers it to
the avaliableRecordsQueue
* message processing loop fetches bundle from avaliableRecordsQueue and
unblocks the avaliableRecordsQueue
* consumerPollLoop calls poll() again
* message processing loop completes processing the bundle BEFORE the poll()
call again completes, and tries to fetch next bundle from avaliableRecordsQueue.
* fetch from avaliableRecordsQueue has a very short timeout (10ms) and if it
expires before the pending poll() in the consumerPollLoop completes the message
processing loop will believe there's no fresh data in Kafka and exit. All the
time until the message processing loop is rescheduled is wasted.
> Legacy (SDF wrapper-based) KafkaIO reader degrades when Kafka poll latency is
> hign
> ----------------------------------------------------------------------------------
>
> Key: BEAM-14111
> URL: https://issues.apache.org/jira/browse/BEAM-14111
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Reporter: Dmitry Orlovsky
> Priority: P2
>
> Beam has two KafkaIO source implementations now:
> * a modern one implemented as a Splittable DoFn (SDF), and
> * a (deprecated) legacy one implemented as an SDF wrapper over an
> UnboundedSource and KafkaUnboundedReader classes.
> We found that the legacy KafkaIO source can not provide good throughput when
> the latency of calls to Kafka
> [Consumer.poll|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L523]
> becomes high. The degradation is very sharp: a pipeline that drops elements
> immediately after reading them from source was only able to read about
> 100-1000 qps per Kafka partition. The Kafka cluster was overprovisioned but
> was in a remote network and had poll latency about 30ms.
> First problem that may be addressed in the scope of this bug is that there's
> very little visibility into the Kafka source now. We had to add extra logging
> to understand the issue with the pipeline above, or even see the poll latency.
> We believe that the cause of throughput degradation is poor choice of the
> [RECORDS_DEQUEUE_POLL_TIMEOUT and
> RECORDS_ENQUEUE_POLL_TIMEOUT|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335],]][,|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L334-L335],]
> especially the former one which is now 10ms.
> These are timeouts for popping and pushing elements from/to the
> [availableRecordsQueue
> |https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L343].
> This is a synchronous queue (i.e. blocking, without buffering) used to hand
> records fetched from Kafka between two loops:
> * The
> [consumerPollLoop|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L515]
> that polls data via a Kafka Consumer if there's no pending data already, and
> offers it to the availableRecordsQueue otherwise. It also does offset
> checkpointing but this is irrelevant to our case.
> * The beam UnboundedSourceAsSDFWrapperFn message processing loop. It's a bit
> complicated, but the important part is that it would call the [nextBatch
> function|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L573]
> repeatedly until an attempt to [fetch an element from the
> avaliableRecordsQueue|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L580]
> times out. After the timeout, it returns the control to the worker and it
> may take relatively long time until the loop is scheduled again.
> This is what we think is happening when the poll latency is high:
> * consumerPollLoop fetches data bundle from Kafka via poll() and offers it to
> the avaliableRecordsQueue
> * message processing loop fetches bundle from avaliableRecordsQueue and
> unblocks the avaliableRecordsQueue
> * consumerPollLoop calls poll() again
> * message processing loop completes processing the bundle BEFORE the poll()
> call again completes, and tries to fetch next bundle from
> avaliableRecordsQueue.
> * fetch from avaliableRecordsQueue has a very short timeout (10ms) and if it
> expires before the pending poll() in the consumerPollLoop completes the
> message processing loop will believe there's no fresh data in Kafka and exit.
> All the time until the message processing loop is rescheduled is wasted.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)