[
https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lee SeungMin updated FLINK-36578:
---------------------------------
Description:
The function to convert json to string is different in the snapshot step and
the binlog step.
This causes the following differences.
- original : \{"key": "value", "key1": "value1"}
- snapshot : \{"key": "value", "key1": "value1"}
- binlog : \{"key":"value","key1":"value1"} // no whitespace
code
{code:java}
protected Object convertJson(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, "{}", (r) -> {
if (data instanceof byte[]) {
// The BinlogReader sees these JSON values as binary encoded,
so we use the binlog client library's utility
// to parse MySQL's internal binary representation into a JSON
string, using the standard formatter.
if (((byte[]) data).length == 0) {
r.deliver(column.isOptional() ? null : "{}");
}
else {
try{
r.deliver(JsonBinary.parseAsString((byte[]) data)); }
catch (IOException e) {
parsingErrorHandler.error("Failed to parse and read a
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
r.deliver(column.isOptional() ? null : "{}");
}
}
}
else if (data instanceof String){
// The SnapshotReader sees JSON values as UTF-8 encoded
strings.
r.deliver(data);
}
});
} {code}
([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])
So added modified JsonStringFormatter (added whitespace between key and value,
and after comma to make it work the same as the snapshot step).
was:
The function to convert json to string is different in the snapshot step and
the binlog step.
This causes the following differences.
- original : \{"key": "value", "key1": "value1"}
- snapshot : \{"key": "value", "key1": "value1"}
- binlog : \{"key":"value","key1":"value1"} // no whitespace
code
{code:java}
protected Object convertJson(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, "{}", (r) -> {
if (data instanceof byte[]) {
// The BinlogReader sees these JSON values as binary encoded,
so we use the binlog client library's utility
// to parse MySQL's internal binary representation into a JSON
string, using the standard formatter. if (((byte[]) data).length
== 0) {
r.deliver(column.isOptional() ? null : "{}");
}
else {
try{
r.deliver(JsonBinary.parseAsString((byte[]) data)); }
catch (IOException e) {
parsingErrorHandler.error("Failed to parse and read a
JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
r.deliver(column.isOptional() ? null : "{}");
}
}
}
else if (data instanceof String){ // The
SnapshotReader sees JSON values as UTF-8 encoded strings.
r.deliver(data); } });
} {code}
([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])
So added modified JsonStringFormatter (added whitespace between key and value,
and after comma to make it work the same as the snapshot step).
> Fixed bug when converting json to string
> ----------------------------------------
>
> Key: FLINK-36578
> URL: https://issues.apache.org/jira/browse/FLINK-36578
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: Lee SeungMin
> Priority: Major
> Labels: pull-request-available
>
> The function to convert json to string is different in the snapshot step and
> the binlog step.
> This causes the following differences.
> - original : \{"key": "value", "key1": "value1"}
> - snapshot : \{"key": "value", "key1": "value1"}
> - binlog : \{"key":"value","key1":"value1"} // no whitespace
>
> code
> {code:java}
> protected Object convertJson(Column column, Field fieldDefn, Object data)
> {
> return convertValue(column, fieldDefn, data, "{}", (r) -> {
> if (data instanceof byte[]) {
> // The BinlogReader sees these JSON values as binary encoded,
> so we use the binlog client library's utility
> // to parse MySQL's internal binary representation into a
> JSON string, using the standard formatter.
> if (((byte[]) data).length == 0) {
> r.deliver(column.isOptional() ? null : "{}");
> }
> else {
> try{
> r.deliver(JsonBinary.parseAsString((byte[]) data)); }
> catch (IOException e) {
> parsingErrorHandler.error("Failed to parse and read a
> JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e);
> r.deliver(column.isOptional() ? null : "{}");
> }
> }
> }
> else if (data instanceof String){
> // The SnapshotReader sees JSON values as UTF-8 encoded
> strings.
> r.deliver(data);
> }
> });
> } {code}
> ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381])
>
> So added modified JsonStringFormatter (added whitespace between key and
> value, and after comma to make it work the same as the snapshot step).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)