jiangyu created FLINK-36549:
-------------------------------
Summary: Using the ignore-parse-errors parameter in
Debezium/Canal/Maxwell/Ogg JSON results in unexpected data loss.
Key: FLINK-36549
URL: https://issues.apache.org/jira/browse/FLINK-36549
Project: Flink
Issue Type: Bug
Reporter: jiangyu
In Debezium/Canal/Maxwell/Ogg JSON, setting {{ignore-parse-errors}} would cause
data loss if an operator chained with a format-related operator encounters an
exception. The reason is that in the deserialization implementation of
Debezium/Canal/Maxwell/Ogg JSON, enabling the {{ignore-parse-errors}} parameter
skips exceptions related to the format's emitted data. For example, in Canal's
JSON code, enabling the {{ignore-parse-errors}} parameter catches and skips
exceptions for {{emitRow}}
{code:java}
@Override
public void deserialize(@Nullable byte[] message, Collector<RowData> out)
throws IOException {
if (message == null || message.length == 0) {
return;
}
try {
final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
if (database != null) {
if (!databasePattern
.matcher(root.get(ReadableMetadata.DATABASE.key).asText())
.matches()) {
return;
}
}
if (table != null) {
if (!tablePattern
.matcher(root.get(ReadableMetadata.TABLE.key).asText())
.matches()) {
return;
}
}
final GenericRowData row = (GenericRowData)
jsonDeserializer.convertToRowData(root);
String type = row.getString(2).toString(); // "type" field
if (OP_INSERT.equals(type)) {
// "data" field is an array of row, contains inserted rows
ArrayData data = row.getArray(0);
for (int i = 0; i < data.size(); i++) {
GenericRowData insert = (GenericRowData) data.getRow(i,
fieldCount);
insert.setRowKind(RowKind.INSERT);
emitRow(row, insert, out);
}
} else if (OP_UPDATE.equals(type)) {
// "data" field is an array of row, contains new rows
ArrayData data = row.getArray(0);
// "old" field is an array of row, contains old values
ArrayData old = row.getArray(1);
for (int i = 0; i < data.size(); i++) {
// the underlying JSON deserialization schema always produce
GenericRowData.
GenericRowData after = (GenericRowData) data.getRow(i,
fieldCount);
GenericRowData before = (GenericRowData) old.getRow(i,
fieldCount);
final JsonNode oldField = root.get(FIELD_OLD);
for (int f = 0; f < fieldCount; f++) {
if (before.isNullAt(f) &&
oldField.findValue(fieldNames.get(f)) == null) {
// fields in "old" (before) means the fields are changed
// fields not in "old" (before) means the fields are
not changed
// so we just copy the not changed fields into before
before.setField(f, after.getField(f));
}
}
before.setRowKind(RowKind.UPDATE_BEFORE);
after.setRowKind(RowKind.UPDATE_AFTER);
emitRow(row, before, out);
emitRow(row, after, out);
}
} else if (OP_DELETE.equals(type)) {
// "data" field is an array of row, contains deleted rows
ArrayData data = row.getArray(0);
for (int i = 0; i < data.size(); i++) {
GenericRowData insert = (GenericRowData) data.getRow(i,
fieldCount);
insert.setRowKind(RowKind.DELETE);
emitRow(row, insert, out);
}
} else if (OP_CREATE.equals(type)) {
// "data" field is null and "type" is "CREATE" which means
// this is a DDL change event, and we should skip it.
return;
} else {
if (!ignoreParseErrors) {
throw new IOException(
format(
"Unknown \"type\" value \"%s\". The Canal JSON
message is '%s'",
type, new String(message)));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Canal JSON message '%s'.", new
String(message)), t);
}
}
}
private void emitRow(
GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData>
out) {
// shortcut in case no output projection is required
if (!hasMetadata) {
out.collect(physicalRow);
return;
}
final int physicalArity = physicalRow.getArity();
final int metadataArity = metadataConverters.length;
final GenericRowData producedRow =
new GenericRowData(physicalRow.getRowKind(), physicalArity +
metadataArity);
for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
}
for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
producedRow.setField(
physicalArity + metadataPos,
metadataConverters[metadataPos].convert(rootRow));
}
out.collect(producedRow);
}
{code}
The solution to this issue is to reimplement the deserialize(byte[] message,
Collector<T> out) method of the corresponding DeserializationSchema in
Debezium/Canal/Maxwell/Ogg JSON by referring to the default implementation of
the deserialize(byte[] message, Collector<T> out) method in the
DeserializationSchema interface.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)