Hi there,

I'm looking to add some functionality to a custom Format
<https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sourcessinks/#encoding--decoding-formats>
that reads the `topic` metadata column in the context of serialization
using the Flink SQL Kafka Connector.

I'm currently having difficulty understanding how to pass and process
metadata columns in a EncodingFormat. How can I make a given metadata
column (in this case `topic`) available in a format used by KafkaDynamicSink
<https://github.com/apache/flink-connector-kafka/blob/86f796a01cba0d7b3adeb95f413e412c30e466f1/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java#L68>
?

I noticed that currently the DynamicKafkaRecordSerializationSchema strips
all but the requested physical columns
<https://github.com/apache/flink-connector-kafka/blob/86f796a01cba0d7b3adeb95f413e412c30e466f1/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L100-L117>
and
only passes projected rows to the serializers. Is there additional work
required to support passing metadata from DynamicTableSink to the
EncodingFormat? If not, how should the EncodingFormat specify what metadata
columns to consume, and make them available in the row passed to
serialize()?

Thanks in advance for your help!

Reply via email to