[
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861009#comment-16861009
]
Brachi Packter commented on BEAM-7357:
--------------------------------------
I still get this error. any chance you still call to describe stream from
another place? can you verify?
Mention again, when not using KinesisIo, and directly calling to Kinesis
producer, this works. see code above in the ticket description.
> Kinesis IO.write throws LimitExceededException
> ----------------------------------------------
>
> Key: BEAM-7357
> URL: https://issues.apache.org/jira/browse/BEAM-7357
> Project: Beam
> Issue Type: Bug
> Components: io-java-kinesis
> Affects Versions: 2.11.0
> Reporter: Brachi Packter
> Assignee: Alexey Romanenko
> Priority: Major
> Fix For: 2.14.0
>
> Time Spent: 50m
> Remaining Estimate: 0h
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code:
> LimitExceededException Message: Rate exceeded for stream *** under account
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> at
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that
> was successfully ingested to it.
>
> Here is my code:
>
>
> {code:java}
> .apply(KinesisIO.write()
> .withStreamName("**")
> .withPartitioner(new KinesisPartitioner() {
> @Override
> public String getPartitionKey(byte[] value) {
> return UUID.randomUUID().toString()
> }
> @Override
> public String getExplicitHashKey(byte[] value) {
> return null;
> }
> })
> .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>
> I tried to not use the Kinesis IO. and everything works well, I can't figure
> out what went wrong.
> I tried using the same API as the library did.
>
> {code:java}
> .apply(
> ParDo.of(new DoFn<byte[], Void>() {
> private transient IKinesisProducer inlineProducer;
> @Setup
> public void setup(){
> KinesisProducerConfiguration config =
> KinesisProducerConfiguration.fromProperties(new Properties());
> config.setRegion(Regions.US_EAST_1.getName());
> config.setCredentialsProvider(new AWSStaticCredentialsProvider(new
> BasicAWSCredentials("***", "***")));
> inlineProducer = new KinesisProducer(config);
> }
> @ProcessElement
> public void processElement(ProcessContext c) throws Exception {
> ByteBuffer data = ByteBuffer.wrap(c.element());
> String partitionKey =UUID.randomUUID().toString();
> ListenableFuture<UserRecordResult> f =
> getProducer().addUserRecord("***", partitionKey, data);
> Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
> class UserRecordResultFutureCallback implements
> FutureCallback<UserRecordResult> {
> @Override
> public void onFailure(Throwable cause) {
> throw new RuntimeException("failed produce:"+cause);
> }
> @Override
> public void onSuccess(UserRecordResult result) {
> }
> }
> })
> );
>
> {code}
>
> Any idea what I did wrong? or what the error in the KinesisIO?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)