[
https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lee SeungMin updated FLINK-36578:
---------------------------------
Affects Version/s: cdc-3.2.1
cdc-3.1.1
cdc-3.2.0
cdc-3.1.0
> Fixed bug when converting json to string
> ----------------------------------------
>
> Key: FLINK-36578
> URL: https://issues.apache.org/jira/browse/FLINK-36578
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1, cdc-3.2.1
> Reporter: Lee SeungMin
> Priority: Major
> Labels: pull-request-available
> Fix For: cdc-3.3.0
>
>
> The function to convert json to string is different in the snapshot step and
> the binlog step.
> This causes the following differences.
> - original : \{"key": "value", "key1": "value1"}
> - snapshot : \{"key": "value", "key1": "value1"}
> - binlog : \{"key":"value","key1":"value1"} // no whitespace
>
> It is already known issue, but I think it is necessary to process json in the
> same way in snapshot step and binlog stage.
> So I modified and added {{JsonStringFormatter}} to flink-cdc to handle json
> in the same way as the result of snapshot step.
> (modified code: added whitespace between key and value, and after comma to
> make it work the same as the snapshot step).
>
> different logic to convert json as string in snapshot and binlog step.
> {code:java}
> protected Object convertJson(Column column, Field fieldDefn, Object data)
> {
> return convertValue(column, fieldDefn, data, "{}", (r) -> {
> if (data instanceof byte[]) {
> // The BinlogReader sees these JSON values as binary encoded,
> so we use the binlog client library's utility
> // to parse MySQL's internal binary representation into a
> JSON string, using the standard formatter.
> if (((byte[]) data).length == 0) {
> r.deliver(column.isOptional() ? null : "{}");
> }
> else {
> try{
> r.deliver(JsonBinary.parseAsString((byte[]) data)); }
> catch (IOException e) {
> parsingErrorHandler.error("Failed to parse and read a
> JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
> r.deliver(column.isOptional() ? null : "{}");
> }
> }
> }
> else if (data instanceof String){
> // The SnapshotReader sees JSON values as UTF-8 encoded
> strings.
> r.deliver(data);
> }
> });
> } {code}
> ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)