Hello Blake, Sorry for the delay, I have posted a few comments to your PR.
Thanks for the contribution! Danny On Fri, Apr 15, 2022 at 6:03 PM Blake Wilson <bl...@yellowpapersun.com> wrote: > Great to know! Thanks for the reference. > > On Fri, Apr 15, 2022 at 6:15 AM Danny Cranmer <dannycran...@apache.org> > wrote: > > > Yes, the Flink Kinesis Consumer detects aggregated records and seamlessly > > de-aggregates them for you [1]. > > > > Thanks, > > > > [1] > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatch.java#L93 > > > > > > > > On Thu, 14 Apr 2022, 23:56 Blake Wilson, <bl...@yellowpapersun.com> > wrote: > > > > > Thanks for offering to review, Danny. > > > > > > Thanks also for pointing out that KCL can de-aggregate records > aggregated > > > by KPL. Several applications I've worked on batch multiple records > > without > > > using the KPL unfortunately. > > > > > > Is de-aggregation supported by the Kinesis Connector Source? I found > > > mention of aggregation only in the FlinkKinesisProducer when searching > > > online for this feature. > > > > > > On Thu, Apr 14, 2022 at 12:51 AM Danny Cranmer < > dannycran...@apache.org> > > > wrote: > > > > > > > Just to clarify, the native KCL/KPL aggregation [1] handles the > > partition > > > > key rebalancing for you out of the box. > > > > > > > > > > > > [1] https://docs.aws.amazon.com/streams/latest/dev/kinesis > > > > -kpl-concepts.html#kinesis-kpl-concepts-aggretation > > > > > > > > On Thu, Apr 14, 2022 at 8:48 AM Danny Cranmer < > dannycran...@apache.org > > > > > > > wrote: > > > > > > > > > Hey Blake, > > > > > > > > > > I am happy to take a look, but I will not have capacity until next > > > week. > > > > > > > > > > The current way to achieve multiple records per PUT is to use the > > > native > > > > > KCL/KPL aggregation [1], which is supported by the Flink > connector. A > > > > > downside of aggregation is that the sender has to manage the > > > partitioning > > > > > strategy. For example, each record in your list will be sent to the > > > same > > > > > shard. If the sender implements grouping of records by partition > key, > > > > then > > > > > care needs to be taken during shard scaling. > > > > > > > > > > Thanks, > > > > > > > > > > [1] > > > > > > > > > > > > > > > https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation > > > > > > > > > > > > > > > On Tue, Apr 12, 2022 at 3:52 AM Blake Wilson < > > bl...@yellowpapersun.com > > > > > > > > > wrote: > > > > > > > > > >> Hello, I recently submitted a pull request to support the > Collector > > > API > > > > >> for > > > > >> the Kinesis Streams Connector. > > > > >> > > > > >> The ability to use this API would save a great deal of shuttling > > bytes > > > > >> around in multiple Flink programs I've worked on. This is because > to > > > > >> construct a stream of the desired type without Collector support, > > the > > > > >> Kinesis source must emit a List[Type], and this must be flattened > > to a > > > > >> Type > > > > >> stream. > > > > >> > > > > >> Because of the way Kinesis pricing works, it rarely makes sense to > > > send > > > > >> one > > > > >> value per Kinesis record. In provisioned mode, Kinesis PUTs are > > priced > > > > to > > > > >> the nearest 25KB ( > > > https://aws.amazon.com/kinesis/data-streams/pricing/ > > > > ), > > > > >> so > > > > >> records are more sensibly packed with multiple values unless these > > > > values > > > > >> are quite large. Therefore, I suspect the need to handle multiple > > > values > > > > >> per Kinesis record is quite common. > > > > >> > > > > >> The PR is located at https://github.com/apache/flink/pull/19417, > > and > > > > I'd > > > > >> love to get some feedback on Github or here. > > > > >> > > > > >> Thanks! > > > > >> > > > > > > > > > > > > > > >