[ 
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862850#comment-16862850
 ] 

Brachi Packter commented on BEAM-7357:
--------------------------------------

[~aromanenko] I think I find what makes the shard map update now.

You create a producer per bundle (in SetUp function) and if I multiply it by 
the number of workers, this gives huge amount of producers, I belive this make 
the "update shard map" call.

If I copy your code and create *one* producer ** for every wroker, then this 
error disappear.

 

Can you just remove the producer creation from setUp method, and move it to 
some static field in the class, that created once the class is initiated.

See similar issue that was with JDBCIO, connection pool was created per setup 
method, and we moved it to be a static member, and then we will have one pool 
for JVM. ask [~iemejia] for more detail.

> Kinesis IO.write throws LimitExceededException
> ----------------------------------------------
>
>                 Key: BEAM-7357
>                 URL: https://issues.apache.org/jira/browse/BEAM-7357
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.11.0
>            Reporter: Brachi Packter
>            Assignee: Alexey Romanenko
>            Priority: Major
>             Fix For: 2.14.0
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>        .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn<byte[], Void>() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
>     ByteBuffer data = ByteBuffer.wrap(c.element());
>     String partitionKey =UUID.randomUUID().toString();
>     ListenableFuture<UserRecordResult> f =
>     getProducer().addUserRecord("***", partitionKey, data);
>    Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback<UserRecordResult> {
>  @Override
>  public void onFailure(Throwable cause) {
>    throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to