[ 
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864134#comment-16864134
 ] 

Alexey Romanenko commented on BEAM-7357:
----------------------------------------

[~brachi_packter] What kind of error do you have in this case? Could you post 
an error stacktrace? 
Also, it would be helpful (if it's possible) if you could provide more details 
about your environment and pipeline, like what is your pipeline topology, which 
runner do you use, number of workers in your cluster, etc. 
For now, I can't reproduce it on my side, so all additional info will be 
helpful.

> Kinesis IO.write throws LimitExceededException
> ----------------------------------------------
>
>                 Key: BEAM-7357
>                 URL: https://issues.apache.org/jira/browse/BEAM-7357
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.11.0
>            Reporter: Brachi Packter
>            Assignee: Alexey Romanenko
>            Priority: Major
>             Fix For: 2.14.0
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>        .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn<byte[], Void>() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
>     ByteBuffer data = ByteBuffer.wrap(c.element());
>     String partitionKey =UUID.randomUUID().toString();
>     ListenableFuture<UserRecordResult> f =
>     getProducer().addUserRecord("***", partitionKey, data);
>    Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback<UserRecordResult> {
>  @Override
>  public void onFailure(Throwable cause) {
>    throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to