[ 
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852940#comment-16852940
 ] 

Alexey Romanenko commented on BEAM-7357:
----------------------------------------

Yes, I'll fix this. Thanks for the report!

> Kinesis IO.write throws LimitExceededException
> ----------------------------------------------
>
>                 Key: BEAM-7357
>                 URL: https://issues.apache.org/jira/browse/BEAM-7357
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.11.0
>            Reporter: Brachi Packter
>            Priority: Major
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>        .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn<byte[], Void>() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
>     ByteBuffer data = ByteBuffer.wrap(c.element());
>     String partitionKey =UUID.randomUUID().toString();
>     ListenableFuture<UserRecordResult> f =
>     getProducer().addUserRecord("***", partitionKey, data);
>    Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback<UserRecordResult> {
>  @Override
>  public void onFailure(Throwable cause) {
>    throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to