[
https://issues.apache.org/jira/browse/FLINK-35715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ruan Hang updated FLINK-35715:
------------------------------
Priority: Blocker (was: Major)
> Mysql Source support schema cache to deserialize record
> -------------------------------------------------------
>
> Key: FLINK-35715
> URL: https://issues.apache.org/jira/browse/FLINK-35715
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.1.1
> Reporter: Hongshun Wang
> Priority: Blocker
> Fix For: cdc-3.2.0
>
>
>
> Current, DebeziumEventDeserializationSchema will deserialize each record with
> schema inferred by this record.
>
> {code:java}
> private RecordData extractDataRecord(Struct value, Schema valueSchema) throws
> Exception {
> DataType dataType = schemaDataTypeInference.infer(value, valueSchema);
> return (RecordData) getOrCreateConverter(dataType).convert(value,
> valueSchema);
> }
> {code}
> There are some issues:
> # Inferring and creating a converter as soon as a record arrives will incur
> additional costs.
> # Inferring from a record might not reflect the real table schema
> accurately. For instance, a timestamp type with precision 6 in MySQL might
> have a value with 0 nanoseconds of the millisecond. When inferred, it will
> appear to have a precision of 0.
> {code:java}
> protected DataType inferString(Object value, Schema schema) {
> if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
> int nano =
> Optional.ofNullable((String) value)
> .map(s -> ZonedTimestamp.FORMATTER.parse(s,
> Instant::from))
> .map(Instant::getNano)
> .orElse(0);
> int precision;
> if (nano == 0) {
> precision = 0;
> } else if (nano % 1000 > 0) {
> precision = 9;
> } else if (nano % 1000_000 > 0) {
> precision = 6;
> } else if (nano % 1000_000_000 > 0) {
> precision = 3;
> } else {
> precision = 0;
> }
> return DataTypes.TIMESTAMP_LTZ(precision);
> }
> return DataTypes.STRING();
> } {code}
> However, timestamps with different precisions will have different data
> formats in BinaryRecordData. Placing data with a timestamp of 0 precision and
> then parsing it with a precision of 6 will result in an exception being
> thrown.
>
> {code:java}
> //org.apache.flink.cdc.common.data.binary.BinaryRecordData#getTimestamp
> @Override
> public TimestampData getTimestamp(int pos, int precision) {
> assertIndexIsValid(pos);
> if (TimestampData.isCompact(precision)) {
> return
> TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
> }
> int fieldOffset = getFieldOffset(pos);
> final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
> return BinarySegmentUtils.readTimestampData(segments, offset,
> offsetAndNanoOfMilli);
> } {code}
> Thus, I think we should cache the table schema in Source, and only update it
> with SchemaChangeRecord. Thus, the schema of source
> SourceRecordEventDeserializer is always same with database.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)