[
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16851625#comment-16851625
]
Brachi Packter commented on BEAM-7357:
--------------------------------------
I found why it is happen, you call in @SetUp method to describe stream.
Now this one has limit of 10 in a second, as per the docs:
[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html]
And if I open many workers, and each one has many threads, this method can
easily reach the limit.
Can you fix it? seems easy fix?
> Kinesis IO.write throws LimitExceededException
> ----------------------------------------------
>
> Key: BEAM-7357
> URL: https://issues.apache.org/jira/browse/BEAM-7357
> Project: Beam
> Issue Type: Bug
> Components: io-java-kinesis
> Affects Versions: 2.11.0
> Reporter: Brachi Packter
> Priority: Major
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code:
> LimitExceededException Message: Rate exceeded for stream *** under account
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> at
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that
> was successfully ingested to it.
>
> Here is my code:
>
>
> {code:java}
> .apply(KinesisIO.write()
> .withStreamName("**")
> .withPartitioner(new KinesisPartitioner() {
> @Override
> public String getPartitionKey(byte[] value) {
> return UUID.randomUUID().toString()
> }
> @Override
> public String getExplicitHashKey(byte[] value) {
> return null;
> }
> })
> .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>
> I tried to not use the Kinesis IO. and everything works well, I can't figure
> out what went wrong.
> I tried using the same API as the library did.
>
> {code:java}
> .apply(
> ParDo.of(new DoFn<byte[], Void>() {
> private transient IKinesisProducer inlineProducer;
> @Setup
> public void setup(){
> KinesisProducerConfiguration config =
> KinesisProducerConfiguration.fromProperties(new Properties());
> config.setRegion(Regions.US_EAST_1.getName());
> config.setCredentialsProvider(new AWSStaticCredentialsProvider(new
> BasicAWSCredentials("***", "***")));
> inlineProducer = new KinesisProducer(config);
> }
> @ProcessElement
> public void processElement(ProcessContext c) throws Exception {
> ByteBuffer data = ByteBuffer.wrap(c.element());
> String partitionKey =UUID.randomUUID().toString();
> ListenableFuture<UserRecordResult> f =
> getProducer().addUserRecord("***", partitionKey, data);
> Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
> class UserRecordResultFutureCallback implements
> FutureCallback<UserRecordResult> {
> @Override
> public void onFailure(Throwable cause) {
> throw new RuntimeException("failed produce:"+cause);
> }
> @Override
> public void onSuccess(UserRecordResult result) {
> }
> }
> })
> );
>
> {code}
>
> Any idea what I did wrong? or what the error in the KinesisIO?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)