Brachi Packter created BEAM-7357:
------------------------------------

             Summary: Kinesis IO.write throws LimitExceededException
                 Key: BEAM-7357
                 URL: https://issues.apache.org/jira/browse/BEAM-7357
             Project: Beam
          Issue Type: Bug
          Components: io-java-kinesis
    Affects Versions: 2.11.0
            Reporter: Brachi Packter


I used Kinesis IO to write to kinesis. I get very quickly many exceptions like:

[shard_map.cc:150] Shard map update for stream "***" failed. Code: 
LimitExceededException Message: Rate exceeded for stream *** under account ***; 
retrying in ..

Also, I see many exceptions like:

Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
 at 
org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)

I'm sure this stream exists because I can see some data from my pipeline that 
was successfully ingested to it.

 

Here is my code:

 

 
{code:java}
.apply(KinesisIO.write()
       .withStreamName("**")
       .withPartitioner(new KinesisPartitioner() {
                       @Override
                        public String getPartitionKey(byte[] value) {
                                        return UUID.randomUUID().toString()
                         }

                        @Override
                        public String getExplicitHashKey(byte[] value) {
                                        return null;
                        }
       })
       .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
 

I tried to not use the Kinesis IO. and everything works well, I can't figure 
out what went wrong.
I tried using the same API as the library did.

 
{code:java}
.apply(
 ParDo.of(new DoFn<byte[], Void>() {
 private transient IKinesisProducer inlineProducer;

 @Setup
 public void setup(){

 KinesisProducerConfiguration config =   
KinesisProducerConfiguration.fromProperties(new Properties());
 config.setRegion(Regions.US_EAST_1.getName());
 config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
BasicAWSCredentials("***", "***")));
 inlineProducer = new KinesisProducer(config);
 }

 @ProcessElement
 public void processElement(ProcessContext c) throws Exception {
    ByteBuffer data = ByteBuffer.wrap(c.element());
    String partitionKey =UUID.randomUUID().toString();
    ListenableFuture<UserRecordResult> f =
    getProducer().addUserRecord("***", partitionKey, data);
   Futures.addCallback(f, new UserRecordResultFutureCallback());
}

 class UserRecordResultFutureCallback implements 
FutureCallback<UserRecordResult> {

 @Override
 public void onFailure(Throwable cause) {
   throw new RuntimeException("failed produce:"+cause);
 }

 @Override
 public void onSuccess(UserRecordResult result) {

 }
 }

 })
 );
 
{code}
 

Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to