Varsha Thanooj created BEAM-5060:
------------------------------------
Summary: Issues with aws KPL while writing to kinesis using beam
Key: BEAM-5060
URL: https://issues.apache.org/jira/browse/BEAM-5060
Project: Beam
Issue Type: Bug
Components: io-java-aws
Affects Versions: 2.5.0
Reporter: Varsha Thanooj
Assignee: Ismaël Mejía
I am trying to write data to kinesis using apache beam kinesis IO. But I am
having some issues.
PS: I am using aws sts.
The console output shows....
{code:java}
Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.NoSuchMethodError:
com.amazonaws.services.kinesis.producer.IKinesisProducer.addUserRecord(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/nio/ByteBuffer;)Lorg/apache/beam/repackaged/beam_sdks_java_io_kinesis/com/google/common/util/concurrent/ListenableFuture;
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.nestaway.beam_demo.KinesisSql.main(KinesisSql.java:153)
Caused by: java.lang.NoSuchMethodError:
com.amazonaws.services.kinesis.producer.IKinesisProducer.addUserRecord(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/nio/ByteBuffer;)Lorg/apache/beam/repackaged/beam_sdks_java_io_kinesis/com/google/common/util/concurrent/ListenableFuture;
at
org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.processElement(KinesisIO.java:568)
{code}
Code.......
data is a Pcollection in byte[ ] format.
{code:java}
data.apply(KinesisIO.write()
.withStreamName("stagWatchBallEventStream")
.withPartitionKey("a")
.withAWSClientsProvider(new CustomKinesisClientProvider()));
{code}
Custom Kinesis Client :
{code:java}
public class CustomKinesisClientProvider implements AWSClientsProvider {
private static final long serialVersionUID = 1L;
private static String ID = "XXXXX";
private static String SECRET = "XXXXX";
private static String TOKEN = "XXXXX";
private static BasicSessionCredentials sessionCredentials = new
BasicSessionCredentials(
ID,
SECRET,
TOKEN);
private static KinesisProducerConfiguration config = new
KinesisProducerConfiguration()
.setRecordMaxBufferedTime(3000)
.setMaxConnections(1)
.setRequestTimeout(60000)
.setRegion("us-west-2")
.setCredentialsProvider(new
AWSStaticCredentialsProvider(sessionCredentials));
@Override
public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration
config) {
return new KinesisProducer(config);
}
}{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)