[ https://issues.apache.org/jira/browse/FLINK-38236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013796#comment-18013796 ]
Fabrizzio Chavez commented on FLINK-38236: ------------------------------------------ If you agree with the request I can take the PR implementation. Regards, Fabrizzio > MongoDB CDC - Include Full Document as Metadata Field > ----------------------------------------------------- > > Key: FLINK-38236 > URL: https://issues.apache.org/jira/browse/FLINK-38236 > Project: Flink > Issue Type: New Feature > Components: Flink CDC > Affects Versions: cdc-3.5.0 > Reporter: Fabrizzio Chavez > Priority: Major > > Hello, > It would be beneficial to introduce a new metadata key named 'full_document' > to retrieve the raw data, particularly in scenarios where the user prefers > not to map attributes to explicit columns. > h3. Expected results: > {code:sql} > CREATE TABLE mongo_source ( > eventTime TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, > operation STRING METADATA FROM 'row_kind' VIRTUAL, > rawData STRING METADATA FROM 'full_document' VIRTUAL, // get raw data > _id STRING, > PRIMARY KEY(_id) NOT ENFORCED > ) WITH ( > 'connector' = 'mongodb-cdc', > 'hosts' = 'localhost:27017', > 'username' = 'myuser', > 'password' = 'mypassword', > 'database' = 'cdc_test', > 'collection' = 'users' > ); > {code} > h3. What to change in the code: > - The implementation will affect MongoDBReadableMetadata enum, where the > full document can be included using this piece of code: > {code:java} > /** It indicates the full document as string raw data. */ > FULL_DOCUMENT( > "full_document", > DataTypes.STRING().nullable(), > new MetadataConverter() { > private static final long serialVersionUID = 1L; > @Override > public Object read(SourceRecord record) { > Struct value = (Struct) record.value(); > String fullDocString = > value.getString(MongoDBEnvelope.FULL_DOCUMENT_FIELD); > return fullDocString != null ? > StringData.fromString(fullDocString) : null; > } > }), > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)