[ 
https://issues.apache.org/jira/browse/FLINK-38236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013796#comment-18013796
 ] 

Fabrizzio Chavez commented on FLINK-38236:
------------------------------------------

If you agree with the request I can take the PR implementation.

Regards,

Fabrizzio

> MongoDB CDC - Include Full Document as Metadata Field
> -----------------------------------------------------
>
>                 Key: FLINK-38236
>                 URL: https://issues.apache.org/jira/browse/FLINK-38236
>             Project: Flink
>          Issue Type: New Feature
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: Fabrizzio Chavez
>            Priority: Major
>
> Hello,
> It would be beneficial to introduce a new metadata key named 'full_document' 
> to retrieve the raw data, particularly in scenarios where the user prefers 
> not to map attributes to explicit columns.
> h3. Expected results:
> {code:sql}
> CREATE TABLE mongo_source (
>     eventTime TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
>     operation STRING METADATA FROM 'row_kind' VIRTUAL,
>     rawData STRING METADATA FROM 'full_document' VIRTUAL, // get raw data
>     _id STRING,
>     PRIMARY KEY(_id) NOT ENFORCED
> ) WITH (
>     'connector' = 'mongodb-cdc',
>     'hosts' = 'localhost:27017',
>     'username' = 'myuser',
>     'password' = 'mypassword',
>     'database' = 'cdc_test',
>     'collection' = 'users'
> );
> {code}
> h3. What to change in the code:
>  - The implementation will affect MongoDBReadableMetadata enum, where the 
> full document can be included using this piece of code:
> {code:java}
> /** It indicates the full document as string raw data. */
>     FULL_DOCUMENT(
>             "full_document",
>             DataTypes.STRING().nullable(),
>             new MetadataConverter() {
>                 private static final long serialVersionUID = 1L;
>                 @Override
>                 public Object read(SourceRecord record) {
>                     Struct value = (Struct) record.value();
>                     String fullDocString = 
> value.getString(MongoDBEnvelope.FULL_DOCUMENT_FIELD);
>                     return fullDocString != null ? 
> StringData.fromString(fullDocString) : null;
>                 }
>             }),
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to