No worries Alexey.. It is hard to keep track of everything as more and more IO connectors are added to Beam. Time to have subject matter experts for each IO 😊
From: Alexey Romanenko <[email protected]> Sent: Thursday, September 3, 2020 12:46 PM To: [email protected] Subject: Re: KinesisIO - aggregation CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe. Oops, my bad, I missed that it’s already supported. Thanks for clarification! On 3 Sep 2020, at 02:02, Jonothan Farr <[email protected]<mailto:[email protected]>> wrote: KinesisIO works fine for me with aggegated records. Here's where deaggregate() is called in v1: https://github.com/apache/beam/blob/9f4d54a2e60ec45150437c0b050f4c73cca91f36/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L170<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F9f4d54a2e60ec45150437c0b050f4c73cca91f36%2Fsdks%2Fjava%2Fio%2Fkinesis%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkinesis%2FSimplifiedKinesisClient.java%23L170&data=02%7C01%7CSunnyM%40dnb.com%7C0525200f0af34b810d8a08d84fff0599%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637347304016844584&sdata=852gs45OjNykmtexHctdfPqC55zrLCfLvn%2FjHsy0w4I%3D&reserved=0> And v2: https://github.com/apache/beam/blob/9f4d54a2e60ec45150437c0b050f4c73cca91f36/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/SimplifiedKinesisClient.java#L181<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F9f4d54a2e60ec45150437c0b050f4c73cca91f36%2Fsdks%2Fjava%2Fio%2Famazon-web-services2%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Faws2%2Fkinesis%2FSimplifiedKinesisClient.java%23L181&data=02%7C01%7CSunnyM%40dnb.com%7C0525200f0af34b810d8a08d84fff0599%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637347304016854579&sdata=n3oFvRpOmh8g3b8XPpPeDieirLky7qiYzJPzTK7TWWw%3D&reserved=0> Were you having problems getting it to work? On Wed, Sep 2, 2020 at 8:22 AM Alexey Romanenko <[email protected]<mailto:[email protected]>> wrote: Yes, it’s not supported for now, but, at the first sight, it seems that we just need to call UserRecord.deaggregate() in GetKinesisRecordsResult in case if record is aggregated. On 2 Sep 2020, at 14:30, Sunny, Mani Kolbe <[email protected]<mailto:[email protected]>> wrote: Hi Alexey, I am looking for reading Kinesis stream with that has aggregated record. From your reply, I take that it is currently not supported? Could that be solved by adding an uncompression function on the pipeline ? Regards, Mani From: Alexey Romanenko <[email protected]<mailto:[email protected]>> Sent: Tuesday, September 1, 2020 6:04 PM To: [email protected]<mailto:[email protected]> Subject: Re: KinesisIO - aggregation CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe. Hello Mani, For Write part it should be already supported since KinesisIO uses KPL to write records under the hood. So, it’s just a question of proper configuration [1][2][3] For Read part, since it’s based on AWS API, it’s more complicated and we need to add a support for this explicitly. [1] https://github.com/apache/beam/blob/afa59c463ca9c5d42a9ab13402a72c8b95240fa5/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java#L657<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fafa59c463ca9c5d42a9ab13402a72c8b95240fa5%2Fsdks%2Fjava%2Fio%2Fkinesis%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkinesis%2FKinesisIO.java%23L657&data=02%7C01%7CSunnyM%40dnb.com%7C0525200f0af34b810d8a08d84fff0599%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637347304016854579&sdata=9kda%2BGBNKQ2VPRbc%2FLH4ziY4x5jEyadHlGs%2Fsj5giGA%3D&reserved=0> [2] https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.aws.amazon.com%2Fstreams%2Flatest%2Fdev%2Fkinesis-kpl-config.html&data=02%7C01%7CSunnyM%40dnb.com%7C0525200f0af34b810d8a08d84fff0599%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637347304016864572&sdata=knaAIh60DFXQYqW7MtxypIjZlHYtjZkaAAkzcOOxcMc%3D&reserved=0> [3] https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fawslabs%2Famazon-kinesis-producer%2Fblob%2Fmaster%2Fjava%2Famazon-kinesis-producer-sample%2Fdefault_config.properties&data=02%7C01%7CSunnyM%40dnb.com%7C0525200f0af34b810d8a08d84fff0599%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637347304016864572&sdata=W0zbTFtrVGj4RE3hk%2Bs8SAKh0LG2OVeSpmwg5vbanrM%3D&reserved=0> On 31 Aug 2020, at 16:12, Sunny, Mani Kolbe <[email protected]<mailto:[email protected]>> wrote: Hello, Does Beam have plans to support Kinesis records with aggregation[1]? I see some code [2] in the repo related to that. Is it planned for any near future releases? Regards, Mani [1] https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.aws.amazon.com%2Fstreams%2Flatest%2Fdev%2Fkinesis-kpl-concepts.html%23kinesis-kpl-concepts-aggretation&data=02%7C01%7CSunnyM%40dnb.com%7C0525200f0af34b810d8a08d84fff0599%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637347304016874565&sdata=lxCpxPaV9oZ8NPjyRGVevVrUeVSxA5UB%2B6v5u7a9SXU%3D&reserved=0> [2] https://github.com/apache/beam/blob/22822b8c611f4d8888d3b2039e4f7e7d98fcab39/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/SimplifiedKinesisClient.java#L181<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F22822b8c611f4d8888d3b2039e4f7e7d98fcab39%2Fsdks%2Fjava%2Fio%2Famazon-web-services2%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Faws2%2Fkinesis%2FSimplifiedKinesisClient.java%23L181&data=02%7C01%7CSunnyM%40dnb.com%7C0525200f0af34b810d8a08d84fff0599%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637347304016874565&sdata=%2FnivLOrh6AGUqpiX%2B6r2c1w6fS0%2FCOH22uiQYEBsUjk%3D&reserved=0>
