[
https://issues.apache.org/jira/browse/BEAM-8382?focusedWorklogId=331108&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331108
]
ASF GitHub Bot logged work on BEAM-8382:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Oct/19 20:30
Start Date: 20/Oct/19 20:30
Worklog Time Spent: 10m
Work Description: jfarr commented on issue #9765: [BEAM-8382] Add polling
interval to KinesisIO.Read
URL: https://github.com/apache/beam/pull/9765#issuecomment-544289609
@aromanenko-dev thanks for the link, that's really helpful. For our use case
we'll probably poll once per second. That's what the SDK docs recommend,
Kinesis Firehose polls on a fixed 1 second interval, Lambda triggers poll on a
fixed 1 second interval, the KCL has a configurable polling interval but it
defaults to 1 second, etc., so that seems pretty standard across the board. We
currently have 2 Firehoses on this stream polling once per second so in theory
we could go as low as 350ms or so but as you say with a static configuration we
would have to stop the running pipeline if we needed to change this later. This
would allow us to add one maybe two consumers before we needed to deal with
that. I know that my colleague @cmachgodaddy and possibly others are working on
a v2 KinesisIO that uses subscribeToShard instead of polling getRecords so
hopefully we can switch to using that instead before then.
To give you some more context, we are trying to read records from Kinesis,
extract a few fields, then send the record to SNS with those fields as message
attributes. So far we can't even get `read from Kinesis -> write to SNS` to
work at our scale which isn't particularly large, only thousands of records per
second. We are seeing latencies grow to 15-20 mins or more under peak load. To
be honest at this point I can't say for sure whether it's KinesisIO or SnsIO or
some combination, but I can say that adding the polling interval solved our
ReadProvisionedThroughputExceeded problem but hasn't completely solved the
latency problem. What I would like to do next week is build a test pipeline
that just reads from Kinesis, calculates the latency at the moment we receive
the record, then pushes that metric to CloudWatch. Then I'd like to test the
current implementation vs adding FluentBackoff vs adding a polling interval (my
PR) vs adding both. That will allow us to isolate KinesisIO and get some hard
data that we can compare. I'll let you know what I find ASAP. How does that
sound?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 331108)
Time Spent: 2h 20m (was: 2h 10m)
> Add polling interval to KinesisIO.Read
> --------------------------------------
>
> Key: BEAM-8382
> URL: https://issues.apache.org/jira/browse/BEAM-8382
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kinesis
> Affects Versions: 2.13.0, 2.14.0, 2.15.0
> Reporter: Jonothan Farr
> Assignee: Jonothan Farr
> Priority: Major
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> With the current implementation we are observing Kinesis throttling due to
> ReadProvisionedThroughputExceeded on the order of hundreds of times per
> second, regardless of the actual Kinesis throughput. This is because the
> ShardReadersPool readLoop() method is polling getRecords() as fast as
> possible.
> From the KDS documentation:
> {quote}Each shard can support up to five read transactions per second.
> {quote}
> and
> {quote}For best results, sleep for at least 1 second (1,000 milliseconds)
> between calls to getRecords to avoid exceeding the limit on getRecords
> frequency.
> {quote}
> [https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html]
> [https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)