weiqingy commented on code in PR #28498: URL: https://github.com/apache/flink/pull/28498#discussion_r3448964529
########## flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent.debezium; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.ProjectableDecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** {@link DecodingFormat} for Debezium using Avro encoding. */ +public class DebeziumAvroDecodingFormat + implements ProjectableDecodingFormat<DeserializationSchema<RowData>> { + + // ---------------------------------------------------------------------------------------- + // Mutable attributes + // ---------------------------------------------------------------------------------------- + + private List<String> metadataKeys; + + // ---------------------------------------------------------------------------------------- + // Debezium-specific attributes + // ---------------------------------------------------------------------------------------- + + private final String schemaRegistryURL; + private final String schema; + private final Map<String, ?> optionalPropertiesMap; + + public DebeziumAvroDecodingFormat( + String schemaRegistryURL, String schema, Map<String, ?> optionalPropertiesMap) { + this.schemaRegistryURL = schemaRegistryURL; + this.schema = schema; + this.optionalPropertiesMap = optionalPropertiesMap; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) { + physicalDataType = Projection.of(projections).project(physicalDataType); + + final List<ReadableMetadata> readableMetadata = + metadataKeys.stream() + .map( + k -> + Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .collect(Collectors.toList()); + final List<DataTypes.Field> metadataFields = + readableMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList()); + + final DataType producedDataType = + DataTypeUtils.appendRowFields(physicalDataType, metadataFields); + final TypeInformation<RowData> producedTypeInfo = + context.createTypeInformation(producedDataType); + + return new DebeziumAvroDeserializationSchema( + physicalDataType, + readableMetadata, + producedTypeInfo, + schemaRegistryURL, + schema, + optionalPropertiesMap); + } + + @Override + public Map<String, DataType> listReadableMetadata() { + final Map<String, DataType> metadataMap = new LinkedHashMap<>(); + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); + return metadataMap; + } + + @Override + public void applyReadableMetadata(List<String> metadataKeys) { + this.metadataKeys = metadataKeys; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + // ---------------------------------------------------------------------------------------- + // Metadata handling + // ---------------------------------------------------------------------------------------- + + /** List of metadata that can be read with this format. */ + enum ReadableMetadata { + INGESTION_TIMESTAMP( + "ingestion-timestamp", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), + DataTypes.FIELD("ts_ms", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + return row; + } + }), + + SOURCE_TIMESTAMP( + "source.timestamp", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("ts_ms"); + return row.getField(pos); + } + }), + + SOURCE_DATABASE( + "source.database", + DataTypes.STRING().nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("db"); + return row.getField(pos); + } + }), + + SOURCE_SCHEMA( + "source.schema", + DataTypes.STRING().nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("schema"); + return row.getField(pos); + } + }), + + SOURCE_TABLE( + "source.table", + DataTypes.STRING().nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("table"); + return row.getField(pos); + } + }), + + SOURCE_PROPERTIES( + "source.properties", + // key and value of the map are nullable to make handling easier in queries + DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()) + .nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + Map<StringData, StringData> result = new HashMap<>(); + for (int i = 0; i < SOURCE_PROPERTY_FIELDS.length; i++) { + Object value = row.getField(i); + result.put( + StringData.fromString(SOURCE_PROPERTY_FIELDS[i].getName()), + value == null ? null : StringData.fromString(value.toString())); + } + return new GenericMapData(result); + } + }); + + final String key; + final DataType dataType; + final DataTypes.Field requiredAvroField; + final MetadataConverter converter; + + ReadableMetadata( + String key, + DataType dataType, + DataTypes.Field requiredAvroField, + MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.requiredAvroField = requiredAvroField; + this.converter = converter; + } + } + + private static final DataTypes.Field[] SOURCE_PROPERTY_FIELDS = { Review Comment: Thanks for digging into this — the record-vs-map constraint is real and correctly diagnosed. The converter switch only wires `record → ROW` (`AvroToRowDataConverters.java:148-149`) and `map → MAP` (`:150-152`), so declaring `source` as `MAP` would have `AvroSchemaConverter` build a valid Avro *map* schema that then can't resolve against the registry's *record* — per Avro's resolution rules a writer record and a reader map aren't compatible, so it fails at decode. Good catch. To close the open question from my first comment (mismatched `source` → null or reject): it resolves by name — `RegistryAvroDeserializationSchema` sets both writer and reader schema on the datum reader (`RegistryAvroDeserializationSchema.java:102-103`), so the record comes back in reader order and the positional reads stay aligned. Resolution then splits by nullability: the connector-specific fields are nullable → `withDefault(null)` (`AvroSchemaConverter.java:554-557`), so `scn`/`commit_scn`/`lcr_position` come back null when a connector omits them; but `version`/`connector`/`name`/`db`/`table` are non-nullable → `noDefault()`, and a default-less reader field the writer lacks makes Avro throw and fail the whole message. So the fixed ROW's real limitation is that it can't surface fields it doesn't list (MySQL `gtid`, Postgres `lsn`) — and, latently, the non-nullable `version`/`connector`/`name`/`db`/`table` would hard-fail any connector whose `source` doesn't carry those exact names. A connector-agnostic map sidesteps both. I'd lean to option (2). Both options stringify the source fields into a `MAP<STRING,STRING>` — that lossiness is inherent to record→map either way — so the deciding factor is blast radius: (1) puts an opinionated record→map coercion into shared `AvroToRowDataConverters`, where any Avro user declaring a `MAP` target over a record field would hit it implicitly, to serve a Debezium-specific need; (2) keeps it local and explicit. And the registered `source` schema isn't available at planning time anyway (the `schema` option is the optional user override, normally null; the real one resolves from the registry by ID at runtime), so a runtime handler in `DebeziumAvroDeserializationSchema` is its natural home. One note if you take (2): to actually gain agnosticism it should build the map from the connector's *writer* `source` record — before it's projected onto the 13-field reader ROW — otherwise the connector-only fields are already dropped and it's just the same fixed list re-shaped as a map. The typed accessors (`source.database`, …) could then key into that map by name, and the positional `SOURCE_PROPERTY_POSITION` assumption goes away. On scope, my read is that the current typed set is enough to land this PR for the initial parity — assuming the supported connectors are documented — with the connector-agnostic map as a follow-up; that keeps the change focused and unblocks the relational connectors now. The final scope and merge call is of course the committers' to make. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
