leonardBang commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1460846335
########## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java: ########## @@ -121,6 +128,12 @@ private static AvroToRowDataConverter createConverter(LogicalType type) { return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTimestamp; Review Comment: IIUC, we should convert Flink SQL TIMESTAMP_NTZ type to AVRO Local Timestamp finally, this is the correct behavior ########## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java: ########## @@ -71,5 +70,12 @@ public InlineElement getDescription() { } } + @Deprecated + public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING = Review Comment: Introducing a **new** config option with `Deprecated` is a little confused to users, I think we can mark it as `Deprecated` when we decide to to deprecate it. ########## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java: ########## @@ -86,6 +86,27 @@ public AvroRowDataDeserializationSchema( typeInfo); } + /** + * Creates an Avro deserialization schema for the given logical type. + * + * @param rowType The logical type used to deserialize the data. + * @param typeInfo The TypeInformation to be used by {@link + * AvroRowDataDeserializationSchema#getProducedType()}. + * @param encoding The serialization approach used to deserialize the data. + * @param legacyMapping Whether to use legacy mapping. Review Comment: the scope meaning of `legacyMapping` is too large for our case, what we want to express is only timestamp types' legacy behavior, but this name looks like we have a legacy behavior for all types. How about `legacyTimestampMapping`? ########## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java: ########## @@ -352,17 +457,55 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) { final TimestampType timestampType = (TimestampType) logicalType; precision = timestampType.getPrecision(); org.apache.avro.LogicalType avroLogicalType; - if (precision <= 3) { - avroLogicalType = LogicalTypes.timestampMillis(); + if (legacyMapping) { + if (precision <= 3) { + avroLogicalType = LogicalTypes.timestampMillis(); + } else if (precision <= 6) { + avroLogicalType = LogicalTypes.timestampMicros(); + } else { Review Comment: I'm happy that you can add precision support here, but I suggest we can open a new PR or commit to finish this work, it will be more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org