[ 
https://issues.apache.org/jira/browse/BEAM-7357?focusedWorklogId=251844&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251844
 ]

ASF GitHub Bot logged work on BEAM-7357:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/May/19 15:34
            Start Date: 31/May/19 15:34
    Worklog Time Spent: 10m 
      Work Description: aromanenko-dev commented on issue #8730: [BEAM-7357] 
KinesisIO: fix too many checks that writing stream exists.
URL: https://github.com/apache/beam/pull/8730#issuecomment-497754551
 
 
   Run Java PreCommit
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 251844)
    Time Spent: 20m  (was: 10m)

> Kinesis IO.write throws LimitExceededException
> ----------------------------------------------
>
>                 Key: BEAM-7357
>                 URL: https://issues.apache.org/jira/browse/BEAM-7357
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.11.0
>            Reporter: Brachi Packter
>            Assignee: Alexey Romanenko
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>        .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn<byte[], Void>() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
>     ByteBuffer data = ByteBuffer.wrap(c.element());
>     String partitionKey =UUID.randomUUID().toString();
>     ListenableFuture<UserRecordResult> f =
>     getProducer().addUserRecord("***", partitionKey, data);
>    Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback<UserRecordResult> {
>  @Override
>  public void onFailure(Throwable cause) {
>    throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to