Hello, I recently submitted a pull request to support the Collector API for the Kinesis Streams Connector.
The ability to use this API would save a great deal of shuttling bytes around in multiple Flink programs I've worked on. This is because to construct a stream of the desired type without Collector support, the Kinesis source must emit a List[Type], and this must be flattened to a Type stream. Because of the way Kinesis pricing works, it rarely makes sense to send one value per Kinesis record. In provisioned mode, Kinesis PUTs are priced to the nearest 25KB (https://aws.amazon.com/kinesis/data-streams/pricing/), so records are more sensibly packed with multiple values unless these values are quite large. Therefore, I suspect the need to handle multiple values per Kinesis record is quite common. The PR is located at https://github.com/apache/flink/pull/19417, and I'd love to get some feedback on Github or here. Thanks!