gmdfalk opened a new pull request, #6353:
URL: https://github.com/apache/paimon/pull/6353

   ### Purpose
   
   Add `--metadata_column` support to Paimon Kafka CDC connector, similar to 
the already existing options added for MySQL and Postgres: 
https://github.com/apache/paimon/pull/2077
   
   Supported metadata columns are those on 
`org.apache.kafka.clients.consumer.ConsumerRecord` i.e.:
   * topic
   * partition
   * offset
   * timestamp
   * timestampType: This is the name of the enum i.e. `NoTimestampType`, 
`CreateTime` or `LogAppendTime`
   
   The feature is backwards compatible. It's only active when 
`--metadata_column` is supplied resp. 
`SynchronizationActionBase.withMetadataColumns` is used.
   
   For now, I've only implemented this for the 
`KafkaDebeziumAvroDeserializationSchema` and 
`KafkaDebeziumJsonDeserializationSchema`.
   
   ### Tests
   
[KafkaMetadataConverter.java](paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java)
   
   ### API and Format
   Changes are contained to the flink cdc package but I did have to update 
`CdcSourceRecord` since it previously didn't provide a way to surface arbitrary 
metadata for a record.
   
   ### Documentation
   
   Added the new `--metadata_column` parameter to Kafka CDC docs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to