[
https://issues.apache.org/jira/browse/BEAM-12225?focusedWorklogId=607016&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-607016
]
ASF GitHub Bot logged work on BEAM-12225:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Jun/21 11:27
Start Date: 04/Jun/21 11:27
Worklog Time Spent: 10m
Work Description: aromanenko-dev commented on a change in pull request
#14743:
URL: https://github.com/apache/beam/pull/14743#discussion_r645497538
##########
File path:
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
##########
@@ -61,18 +69,33 @@
private static final int PERIOD_GRANULARITY_IN_SECONDS = 60;
private static final String SUM_STATISTIC = "Sum";
private static final String STREAM_NAME_DIMENSION = "StreamName";
- private static final int LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS = 10;
- private static final Duration LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF =
+ private static final int LIST_SHARDS_MAX_RESULTS = 1_000;
+ private static final Duration
+ SPACING_FOR_TIMESTAMP_LIST_SHARDS_REQUEST_TO_NOT_EXCEED_TRIM_HORIZON =
+ Duration.standardMinutes(5);
+ private static final int DESCRIBE_STREAM_SUMMARY_MAX_ATTEMPTS = 10;
+ private static final Duration DESCRIBE_STREAM_SUMMARY_INITIAL_BACKOFF =
Duration.standardSeconds(1);
+
private final AmazonKinesis kinesis;
private final AmazonCloudWatch cloudWatch;
private final Integer limit;
+ private final Supplier<Instant> currentInstantSupplier;
public SimplifiedKinesisClient(
AmazonKinesis kinesis, AmazonCloudWatch cloudWatch, Integer limit) {
+ this(kinesis, cloudWatch, limit, Instant::now);
Review comment:
You right, I was mistaken.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 607016)
Time Spent: 2h 10m (was: 2h)
> Replace AWS API used to list shards from DescribeStream to ListShards
> ---------------------------------------------------------------------
>
> Key: BEAM-12225
> URL: https://issues.apache.org/jira/browse/BEAM-12225
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kinesis
> Affects Versions: 2.27.0
> Reporter: Rafał Ochyra
> Assignee: Rafał Ochyra
> Priority: P2
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> We use Google Dataflow with Apache Beam to read data from AWS Kinesis
> streams. We started experiencing problems with the [GetShardIterator
> API|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html]
> that is used to establish the initial set of active shards to read from a
> given timestamp.
> Some of the shards unexpectedly threw AmazonKinesisExceptions (with error
> code “InternalFailure”) during an attempt to get an iterator for them, which
> caused failures of our Dataflow jobs. It resulted in us looking for potential
> solutions for this problem.
> According to our knowledge, AWS lately modified the way GetShardIterator API
> acts. Instead of throwing exceptions AWS right now will return an iterator
> even for closed shards. Following description of this behavior is available
> in [the
> documentation|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html]:
> _“If the shard is closed, GetShardIterator returns a valid iterator for the
> last sequence number of the shard. A shard can be closed as a result of using
> SplitShard or MergeShards.”_
> Because of this change Beam will try to read closed shard, find no records
> and try to read its children. If children were created a long time in the
> past the data to read from Kinesis stream might be significantly moved back
> in time and Beam will not start processing data from the expected starting
> point. This causes unnecessary delay and additional costs for processing the
> data.
> In the current implementation, validation of shards using GetShardsIterator
> is required, because [DescribeStream
> API|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html],
> used for listing them, returns all shards - not only opened, but closed ones
> as well. AWS recommends to use [ListShards
> API|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html]
> for such use cases. It eliminates not only the problem of thrown exceptions
> but reduces time required to list valid shards. Additional issue with
> DescribeStream is the low transaction limit. It is only 10 transactions per
> second per account, whereas ListShards limit is 100 transactions per second
> per data stream.
> Use of ListShards API instead of DescribeStream API:
> * reduces time required to start reading data (it should address problem
> with long-lasting pipeline creation: BEAM-9759),
> * allows for more transactions per second,
> * introduces API for listing shards recommended by AWS,
> * eliminates the problem with data being read from unexpectedly old shards
> children.
> Potential issues with ListShards API:
> * according to the
> [documentation|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html]
> ListShards API for fine-grained IAM policy might require an update to that
> policy
--
This message was sent by Atlassian Jira
(v8.3.4#803005)