KinesisIO works fine for me with aggegated records. Here's where deaggregate() is called in v1:
https://github.com/apache/beam/blob/9f4d54a2e60ec45150437c0b050f4c73cca91f36/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L170 And v2: https://github.com/apache/beam/blob/9f4d54a2e60ec45150437c0b050f4c73cca91f36/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/SimplifiedKinesisClient.java#L181 Were you having problems getting it to work? On Wed, Sep 2, 2020 at 8:22 AM Alexey Romanenko <[email protected]> wrote: > Yes, it’s not supported for now, but, at the first sight, it seems that we > just need to call UserRecord.deaggregate() in GetKinesisRecordsResult in > case if record is aggregated. > > > On 2 Sep 2020, at 14:30, Sunny, Mani Kolbe <[email protected]> wrote: > > Hi Alexey, > > I am looking for reading Kinesis stream with that has aggregated record. > From your reply, I take that it is currently not supported? Could that be > solved by adding an uncompression function on the pipeline ? > > Regards, > Mani > > *From:* Alexey Romanenko <[email protected]> > *Sent:* Tuesday, September 1, 2020 6:04 PM > *To:* [email protected] > *Subject:* Re: KinesisIO - aggregation > > *CAUTION:* This email originated from outside of D&B. Please do not click > links or open attachments unless you recognize the sender and know the > content is safe. > > Hello Mani, > > For Write part it should be already supported since KinesisIO uses KPL to > write records under the hood. So, it’s just a question of proper > configuration [1][2][3] > For Read part, since it’s based on AWS API, it’s more complicated and we > need to add a support for this explicitly. > > [1] > https://github.com/apache/beam/blob/afa59c463ca9c5d42a9ab13402a72c8b95240fa5/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java#L657 > <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fafa59c463ca9c5d42a9ab13402a72c8b95240fa5%2Fsdks%2Fjava%2Fio%2Fkinesis%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkinesis%2FKinesisIO.java%23L657&data=02%7C01%7CSunnyM%40dnb.com%7C0feee4d06b4f42c4431b08d84e98fa31%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637345766274765630&sdata=D0GHczRjCipS1I7QvHPN2QpBHpGFbSSnAwJjMNlIvBA%3D&reserved=0> > [2] https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html > <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.aws.amazon.com%2Fstreams%2Flatest%2Fdev%2Fkinesis-kpl-config.html&data=02%7C01%7CSunnyM%40dnb.com%7C0feee4d06b4f42c4431b08d84e98fa31%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637345766274765630&sdata=klWQU%2BKvxdIUqYuDSIsXrU69dmLwMyEyElSTOcvGWls%3D&reserved=0> > [3] > https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties > <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fawslabs%2Famazon-kinesis-producer%2Fblob%2Fmaster%2Fjava%2Famazon-kinesis-producer-sample%2Fdefault_config.properties&data=02%7C01%7CSunnyM%40dnb.com%7C0feee4d06b4f42c4431b08d84e98fa31%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637345766274775617&sdata=fvImCi9ToVRX3hmm18d7yNGTpZEHuVKW%2B5ey3pfez2Q%3D&reserved=0> > > > On 31 Aug 2020, at 16:12, Sunny, Mani Kolbe <[email protected]> wrote: > > Hello, > > Does Beam have plans to support Kinesis records with aggregation[1]? I see > some code [2] in the repo related to that. Is it planned for any near > future releases? > > Regards, > Mani > > > [1] > https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation > <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.aws.amazon.com%2Fstreams%2Flatest%2Fdev%2Fkinesis-kpl-concepts.html%23kinesis-kpl-concepts-aggretation&data=02%7C01%7CSunnyM%40dnb.com%7C0feee4d06b4f42c4431b08d84e98fa31%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637345766274775617&sdata=6Mgj77Fm2rRNHPna8x1P4q6uDPNejua2himihZk2yCQ%3D&reserved=0> > [2] > https://github.com/apache/beam/blob/22822b8c611f4d8888d3b2039e4f7e7d98fcab39/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/SimplifiedKinesisClient.java#L181 > <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F22822b8c611f4d8888d3b2039e4f7e7d98fcab39%2Fsdks%2Fjava%2Fio%2Famazon-web-services2%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Faws2%2Fkinesis%2FSimplifiedKinesisClient.java%23L181&data=02%7C01%7CSunnyM%40dnb.com%7C0feee4d06b4f42c4431b08d84e98fa31%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637345766274775617&sdata=AeDe7mB4TdPSSH%2Bdl6mq7PytrhKaxxsWDVQJsWjc9gg%3D&reserved=0> > > >
