Brachi Packter created BEAM-7357:
------------------------------------
Summary: Kinesis IO.write throws LimitExceededException
Key: BEAM-7357
URL: https://issues.apache.org/jira/browse/BEAM-7357
Project: Beam
Issue Type: Bug
Components: io-java-kinesis
Affects Versions: 2.11.0
Reporter: Brachi Packter
I used Kinesis IO to write to kinesis. I get very quickly many exceptions like:
[shard_map.cc:150] Shard map update for stream "***" failed. Code:
LimitExceededException Message: Rate exceeded for stream *** under account ***;
retrying in ..
Also, I see many exceptions like:
Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
at
org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
I'm sure this stream exists because I can see some data from my pipeline that
was successfully ingested to it.
Here is my code:
{code:java}
.apply(KinesisIO.write()
.withStreamName("**")
.withPartitioner(new KinesisPartitioner() {
@Override
public String getPartitionKey(byte[] value) {
return UUID.randomUUID().toString()
}
@Override
public String getExplicitHashKey(byte[] value) {
return null;
}
})
.withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
I tried to not use the Kinesis IO. and everything works well, I can't figure
out what went wrong.
I tried using the same API as the library did.
{code:java}
.apply(
ParDo.of(new DoFn<byte[], Void>() {
private transient IKinesisProducer inlineProducer;
@Setup
public void setup(){
KinesisProducerConfiguration config =
KinesisProducerConfiguration.fromProperties(new Properties());
config.setRegion(Regions.US_EAST_1.getName());
config.setCredentialsProvider(new AWSStaticCredentialsProvider(new
BasicAWSCredentials("***", "***")));
inlineProducer = new KinesisProducer(config);
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
ByteBuffer data = ByteBuffer.wrap(c.element());
String partitionKey =UUID.randomUUID().toString();
ListenableFuture<UserRecordResult> f =
getProducer().addUserRecord("***", partitionKey, data);
Futures.addCallback(f, new UserRecordResultFutureCallback());
}
class UserRecordResultFutureCallback implements
FutureCallback<UserRecordResult> {
@Override
public void onFailure(Throwable cause) {
throw new RuntimeException("failed produce:"+cause);
}
@Override
public void onSuccess(UserRecordResult result) {
}
}
})
);
{code}
Any idea what I did wrong? or what the error in the KinesisIO?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)