[
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885929#comment-16885929
]
Brachi Packter commented on BEAM-7357:
--------------------------------------
[~aromanenko] Still happens, can't run Data Flow with the latest snapshot
getting abstract method error: (Can it be related to this PR
[https://github.com/apache/beam/pull/8311]?)
{code:java}
java.lang.AbstractMethodError:
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.schemaElement(I)Ljava/lang/Object;
org.apache.beam.sdk.schemas.transforms.Convert$ConvertTransform$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:213)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:178)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:330)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:276){code}
> Kinesis IO.write throws LimitExceededException
> ----------------------------------------------
>
> Key: BEAM-7357
> URL: https://issues.apache.org/jira/browse/BEAM-7357
> Project: Beam
> Issue Type: Bug
> Components: io-java-kinesis
> Affects Versions: 2.11.0
> Reporter: Brachi Packter
> Assignee: Alexey Romanenko
> Priority: Major
> Fix For: 2.14.0
>
> Time Spent: 50m
> Remaining Estimate: 0h
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code:
> LimitExceededException Message: Rate exceeded for stream *** under account
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> at
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that
> was successfully ingested to it.
>
> Here is my code:
>
>
> {code:java}
> .apply(KinesisIO.write()
> .withStreamName("**")
> .withPartitioner(new KinesisPartitioner() {
> @Override
> public String getPartitionKey(byte[] value) {
> return UUID.randomUUID().toString()
> }
> @Override
> public String getExplicitHashKey(byte[] value) {
> return null;
> }
> })
> .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>
> I tried to not use the Kinesis IO. and everything works well, I can't figure
> out what went wrong.
> I tried using the same API as the library did.
>
> {code:java}
> .apply(
> ParDo.of(new DoFn<byte[], Void>() {
> private transient IKinesisProducer inlineProducer;
> @Setup
> public void setup(){
> KinesisProducerConfiguration config =
> KinesisProducerConfiguration.fromProperties(new Properties());
> config.setRegion(Regions.US_EAST_1.getName());
> config.setCredentialsProvider(new AWSStaticCredentialsProvider(new
> BasicAWSCredentials("***", "***")));
> inlineProducer = new KinesisProducer(config);
> }
> @ProcessElement
> public void processElement(ProcessContext c) throws Exception {
> ByteBuffer data = ByteBuffer.wrap(c.element());
> String partitionKey =UUID.randomUUID().toString();
> ListenableFuture<UserRecordResult> f =
> getProducer().addUserRecord("***", partitionKey, data);
> Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
> class UserRecordResultFutureCallback implements
> FutureCallback<UserRecordResult> {
> @Override
> public void onFailure(Throwable cause) {
> throw new RuntimeException("failed produce:"+cause);
> }
> @Override
> public void onSuccess(UserRecordResult result) {
> }
> }
> })
> );
>
> {code}
>
> Any idea what I did wrong? or what the error in the KinesisIO?
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)