Hi there, I'm looking to add some functionality to a custom Format <https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sourcessinks/#encoding--decoding-formats> that reads the `topic` metadata column in the context of serialization using the Flink SQL Kafka Connector.
I'm currently having difficulty understanding how to pass and process metadata columns in a EncodingFormat. How can I make a given metadata column (in this case `topic`) available in a format used by KafkaDynamicSink <https://github.com/apache/flink-connector-kafka/blob/86f796a01cba0d7b3adeb95f413e412c30e466f1/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java#L68> ? I noticed that currently the DynamicKafkaRecordSerializationSchema strips all but the requested physical columns <https://github.com/apache/flink-connector-kafka/blob/86f796a01cba0d7b3adeb95f413e412c30e466f1/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L100-L117> and only passes projected rows to the serializers. Is there additional work required to support passing metadata from DynamicTableSink to the EncodingFormat? If not, how should the EncodingFormat specify what metadata columns to consume, and make them available in the row passed to serialize()? Thanks in advance for your help!
