[ 
https://issues.apache.org/jira/browse/BEAM-8382?focusedWorklogId=331108&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331108
 ]

ASF GitHub Bot logged work on BEAM-8382:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Oct/19 20:30
            Start Date: 20/Oct/19 20:30
    Worklog Time Spent: 10m 
      Work Description: jfarr commented on issue #9765: [BEAM-8382] Add polling 
interval to KinesisIO.Read
URL: https://github.com/apache/beam/pull/9765#issuecomment-544289609
 
 
   @aromanenko-dev thanks for the link, that's really helpful. For our use case 
we'll probably poll once per second. That's what the SDK docs recommend, 
Kinesis Firehose polls on a fixed 1 second interval, Lambda triggers poll on a 
fixed 1 second interval, the KCL has a configurable polling interval but it 
defaults to 1 second, etc., so that seems pretty standard across the board. We 
currently have 2 Firehoses on this stream polling once per second so in theory 
we could go as low as 350ms or so but as you say with a static configuration we 
would have to stop the running pipeline if we needed to change this later. This 
would allow us to add one maybe two consumers before we needed to deal with 
that. I know that my colleague @cmachgodaddy and possibly others are working on 
a v2 KinesisIO that uses subscribeToShard instead of polling getRecords so 
hopefully we can switch to using that instead before then.
   
   To give you some more context, we are trying to read records from Kinesis, 
extract a few fields, then send the record to SNS with those fields as message 
attributes. So far we can't even get `read from Kinesis -> write to SNS` to 
work at our scale which isn't particularly large, only thousands of records per 
second. We are seeing latencies grow to 15-20 mins or more under peak load. To 
be honest at this point I can't say for sure whether it's KinesisIO or SnsIO or 
some combination, but I can say that adding the polling interval solved our 
ReadProvisionedThroughputExceeded problem but hasn't completely solved the 
latency problem. What I would like to do next week is build a test pipeline 
that just reads from Kinesis, calculates the latency at the moment we receive 
the record, then pushes that metric to CloudWatch. Then I'd like to test the 
current implementation vs adding FluentBackoff vs adding a polling interval (my 
PR) vs adding both. That will allow us to isolate KinesisIO and get some hard 
data that we can compare. I'll let you know what I find ASAP. How does that 
sound?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 331108)
    Time Spent: 2h 20m  (was: 2h 10m)

> Add polling interval to KinesisIO.Read
> --------------------------------------
>
>                 Key: BEAM-8382
>                 URL: https://issues.apache.org/jira/browse/BEAM-8382
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kinesis
>    Affects Versions: 2.13.0, 2.14.0, 2.15.0
>            Reporter: Jonothan Farr
>            Assignee: Jonothan Farr
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> With the current implementation we are observing Kinesis throttling due to 
> ReadProvisionedThroughputExceeded on the order of hundreds of times per 
> second, regardless of the actual Kinesis throughput. This is because the 
> ShardReadersPool readLoop() method is polling getRecords() as fast as 
> possible.
> From the KDS documentation:
> {quote}Each shard can support up to five read transactions per second.
> {quote}
> and
> {quote}For best results, sleep for at least 1 second (1,000 milliseconds) 
> between calls to getRecords to avoid exceeding the limit on getRecords 
> frequency.
> {quote}
> [https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html]
> [https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to