[
https://issues.apache.org/jira/browse/FLINK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Benchao Li reassigned FLINK-36549:
----------------------------------
Assignee: Yu Xiao
> Using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON
> results in unexpected data loss.
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-36549
> URL: https://issues.apache.org/jira/browse/FLINK-36549
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.17.2, 1.18.1, 1.19.1
> Reporter: jiangyu
> Assignee: Yu Xiao
> Priority: Critical
> Labels: pull-request-available
>
> In Debezium/Canal/Maxwell/Ogg JSON, setting {{ignore-parse-errors}} would
> cause data loss if an operator chained with a format-related operator
> encounters an exception. The reason is that in the deserialization
> implementation of Debezium/Canal/Maxwell/Ogg JSON, enabling the
> {{ignore-parse-errors}} parameter skips exceptions related to the format's
> emitted data. For example, in Canal's JSON code, enabling the
> {{ignore-parse-errors}} parameter catches and skips exceptions for {{emitRow}}
> {code:java}
> @Override
> public void deserialize(@Nullable byte[] message, Collector<RowData> out)
> throws IOException {
> if (message == null || message.length == 0) {
> return;
> }
> try {
> final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
> if (database != null) {
> if (!databasePattern
> .matcher(root.get(ReadableMetadata.DATABASE.key).asText())
> .matches()) {
> return;
> }
> }
> if (table != null) {
> if (!tablePattern
> .matcher(root.get(ReadableMetadata.TABLE.key).asText())
> .matches()) {
> return;
> }
> }
> final GenericRowData row = (GenericRowData)
> jsonDeserializer.convertToRowData(root);
> String type = row.getString(2).toString(); // "type" field
> if (OP_INSERT.equals(type)) {
> // "data" field is an array of row, contains inserted rows
> ArrayData data = row.getArray(0);
> for (int i = 0; i < data.size(); i++) {
> GenericRowData insert = (GenericRowData) data.getRow(i,
> fieldCount);
> insert.setRowKind(RowKind.INSERT);
> emitRow(row, insert, out);
> }
> } else if (OP_UPDATE.equals(type)) {
> // "data" field is an array of row, contains new rows
> ArrayData data = row.getArray(0);
> // "old" field is an array of row, contains old values
> ArrayData old = row.getArray(1);
> for (int i = 0; i < data.size(); i++) {
> // the underlying JSON deserialization schema always produce
> GenericRowData.
> GenericRowData after = (GenericRowData) data.getRow(i,
> fieldCount);
> GenericRowData before = (GenericRowData) old.getRow(i,
> fieldCount);
> final JsonNode oldField = root.get(FIELD_OLD);
> for (int f = 0; f < fieldCount; f++) {
> if (before.isNullAt(f) &&
> oldField.findValue(fieldNames.get(f)) == null) {
> // fields in "old" (before) means the fields are
> changed
> // fields not in "old" (before) means the fields are
> not changed
> // so we just copy the not changed fields into before
> before.setField(f, after.getField(f));
> }
> }
> before.setRowKind(RowKind.UPDATE_BEFORE);
> after.setRowKind(RowKind.UPDATE_AFTER);
> emitRow(row, before, out);
> emitRow(row, after, out);
> }
> } else if (OP_DELETE.equals(type)) {
> // "data" field is an array of row, contains deleted rows
> ArrayData data = row.getArray(0);
> for (int i = 0; i < data.size(); i++) {
> GenericRowData insert = (GenericRowData) data.getRow(i,
> fieldCount);
> insert.setRowKind(RowKind.DELETE);
> emitRow(row, insert, out);
> }
> } else if (OP_CREATE.equals(type)) {
> // "data" field is null and "type" is "CREATE" which means
> // this is a DDL change event, and we should skip it.
> return;
> } else {
> if (!ignoreParseErrors) {
> throw new IOException(
> format(
> "Unknown \"type\" value \"%s\". The Canal
> JSON message is '%s'",
> type, new String(message)));
> }
> }
> } catch (Throwable t) {
> // a big try catch to protect the processing.
> if (!ignoreParseErrors) {
> throw new IOException(
> format("Corrupt Canal JSON message '%s'.", new
> String(message)), t);
> }
> }
> }
> private void emitRow(
> GenericRowData rootRow, GenericRowData physicalRow,
> Collector<RowData> out) {
> // shortcut in case no output projection is required
> if (!hasMetadata) {
> out.collect(physicalRow);
> return;
> }
> final int physicalArity = physicalRow.getArity();
> final int metadataArity = metadataConverters.length;
> final GenericRowData producedRow =
> new GenericRowData(physicalRow.getRowKind(), physicalArity +
> metadataArity);
> for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
> producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
> }
> for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
> producedRow.setField(
> physicalArity + metadataPos,
> metadataConverters[metadataPos].convert(rootRow));
> }
> out.collect(producedRow);
> }
> {code}
> The solution to this issue is to reimplement the deserialize(byte[] message,
> Collector<T> out) method of the corresponding DeserializationSchema in
> Debezium/Canal/Maxwell/Ogg JSON by referring to the default implementation of
> the deserialize(byte[] message, Collector<T> out) method in the
> DeserializationSchema interface.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)