Hi Jonothan,

That is good news! I was of the impression, it is not supported or have to 
enable some flag. By the way, what is the difference between KinesisIO and aws 
V2. Is aws V2 is a set of classes to support AWS related connectors and 
KinesisIO is referring back to them?

I mean for read/write from Kinesis, KinesisIO is the still the way to go?

Regards,
Mani

From: Jonothan Farr <[email protected]>
Sent: Thursday, September 3, 2020 1:02 AM
To: [email protected]
Subject: Re: KinesisIO - aggregation

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

KinesisIO works fine for me with aggegated records. Here's where deaggregate() 
is called in v1:

https://github.com/apache/beam/blob/9f4d54a2e60ec45150437c0b050f4c73cca91f36/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L170<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F9f4d54a2e60ec45150437c0b050f4c73cca91f36%2Fsdks%2Fjava%2Fio%2Fkinesis%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkinesis%2FSimplifiedKinesisClient.java%23L170&data=02%7C01%7CSunnyM%40dnb.com%7C9ed5ba8b092c4643e59e08d84f9ca5e0%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637346881515965488&sdata=wLQ2ou1IJOgNW%2FJ3JNgS1WeWnYyQnE5CSEparVAGAOE%3D&reserved=0>

And v2:

https://github.com/apache/beam/blob/9f4d54a2e60ec45150437c0b050f4c73cca91f36/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/SimplifiedKinesisClient.java#L181<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F9f4d54a2e60ec45150437c0b050f4c73cca91f36%2Fsdks%2Fjava%2Fio%2Famazon-web-services2%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Faws2%2Fkinesis%2FSimplifiedKinesisClient.java%23L181&data=02%7C01%7CSunnyM%40dnb.com%7C9ed5ba8b092c4643e59e08d84f9ca5e0%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637346881515975481&sdata=hqEy9YHc70qy74KFG0CNU3%2BJIcTxpW2jZF7xjpagLFo%3D&reserved=0>

Were you having problems getting it to work?

On Wed, Sep 2, 2020 at 8:22 AM Alexey Romanenko 
<[email protected]<mailto:[email protected]>> wrote:
Yes, it’s not supported for now, but, at the first sight, it seems that we just 
need to call UserRecord.deaggregate() in GetKinesisRecordsResult in case if 
record is aggregated.


On 2 Sep 2020, at 14:30, Sunny, Mani Kolbe 
<[email protected]<mailto:[email protected]>> wrote:

Hi Alexey,

I am looking for reading Kinesis stream with that has aggregated record. From 
your reply, I take that it is currently not supported? Could that be solved by 
adding an uncompression function on the pipeline ?

Regards,
Mani

From: Alexey Romanenko 
<[email protected]<mailto:[email protected]>>
Sent: Tuesday, September 1, 2020 6:04 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: KinesisIO - aggregation

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

Hello Mani,

For Write part it should be already supported since KinesisIO uses KPL to write 
records under the hood. So, it’s just a question of proper configuration 
[1][2][3]
For Read part, since it’s based on AWS API, it’s more complicated and we need 
to add a support for this explicitly.

[1] 
https://github.com/apache/beam/blob/afa59c463ca9c5d42a9ab13402a72c8b95240fa5/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java#L657<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fafa59c463ca9c5d42a9ab13402a72c8b95240fa5%2Fsdks%2Fjava%2Fio%2Fkinesis%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkinesis%2FKinesisIO.java%23L657&data=02%7C01%7CSunnyM%40dnb.com%7C9ed5ba8b092c4643e59e08d84f9ca5e0%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637346881515975481&sdata=lXWOD%2BvVO6e6AkOmoMjVDIitGnLfNnoOeunxAIMwrbw%3D&reserved=0>
[2] 
https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.aws.amazon.com%2Fstreams%2Flatest%2Fdev%2Fkinesis-kpl-config.html&data=02%7C01%7CSunnyM%40dnb.com%7C9ed5ba8b092c4643e59e08d84f9ca5e0%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637346881515985474&sdata=%2FuJFYGCd3wrMW2u7Lxkm9uR9jMQv0ud8JMijSTj4Yps%3D&reserved=0>
[3] 
https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fawslabs%2Famazon-kinesis-producer%2Fblob%2Fmaster%2Fjava%2Famazon-kinesis-producer-sample%2Fdefault_config.properties&data=02%7C01%7CSunnyM%40dnb.com%7C9ed5ba8b092c4643e59e08d84f9ca5e0%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637346881515985474&sdata=vULfC%2B4HbDgUeAc94rurIIvxm6AbX8RX7lfxOok2yKk%3D&reserved=0>

On 31 Aug 2020, at 16:12, Sunny, Mani Kolbe 
<[email protected]<mailto:[email protected]>> wrote:

Hello,

Does Beam have plans to support Kinesis records with aggregation[1]? I see some 
code [2] in the repo related to that. Is it planned for any near future 
releases?

Regards,
Mani


[1] 
https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.aws.amazon.com%2Fstreams%2Flatest%2Fdev%2Fkinesis-kpl-concepts.html%23kinesis-kpl-concepts-aggretation&data=02%7C01%7CSunnyM%40dnb.com%7C9ed5ba8b092c4643e59e08d84f9ca5e0%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637346881515995467&sdata=UpJo6P8fkm632IyTaeLrityfGRSdevN6ND%2FfD2MvCIM%3D&reserved=0>
[2] 
https://github.com/apache/beam/blob/22822b8c611f4d8888d3b2039e4f7e7d98fcab39/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/SimplifiedKinesisClient.java#L181<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F22822b8c611f4d8888d3b2039e4f7e7d98fcab39%2Fsdks%2Fjava%2Fio%2Famazon-web-services2%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Faws2%2Fkinesis%2FSimplifiedKinesisClient.java%23L181&data=02%7C01%7CSunnyM%40dnb.com%7C9ed5ba8b092c4643e59e08d84f9ca5e0%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637346881515995467&sdata=Z55XBOaVjlRrJtpEWI7ck7PlGx7SpABaiTm3HAVp%2Bec%3D&reserved=0>

Reply via email to