leonardBang commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1460846335
##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java:
##########
@@ -121,6 +128,12 @@ private static AvroToRowDataConverter
createConverter(LogicalType type) {
return AvroToRowDataConverters::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTimestamp;
Review Comment:
IIUC, we should convert Flink SQL TIMESTAMP_NTZ type to AVRO Local Timestamp
finally, this is the correct behavior
##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java:
##########
@@ -71,5 +70,12 @@ public InlineElement getDescription() {
}
}
+ @Deprecated
+ public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING =
Review Comment:
Introducing a **new** config option with `Deprecated` is a little confused
to users, I think we can mark it as `Deprecated` when we decide to to deprecate
it.
##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java:
##########
@@ -86,6 +86,27 @@ public AvroRowDataDeserializationSchema(
typeInfo);
}
+ /**
+ * Creates an Avro deserialization schema for the given logical type.
+ *
+ * @param rowType The logical type used to deserialize the data.
+ * @param typeInfo The TypeInformation to be used by {@link
+ * AvroRowDataDeserializationSchema#getProducedType()}.
+ * @param encoding The serialization approach used to deserialize the data.
+ * @param legacyMapping Whether to use legacy mapping.
Review Comment:
the scope meaning of `legacyMapping` is too large for our case, what we
want to express is only timestamp types' legacy behavior, but this name looks
like we have a legacy behavior for all types. How about
`legacyTimestampMapping`?
##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java:
##########
@@ -352,17 +457,55 @@ public static Schema convertToSchema(LogicalType
logicalType, String rowName) {
final TimestampType timestampType = (TimestampType)
logicalType;
precision = timestampType.getPrecision();
org.apache.avro.LogicalType avroLogicalType;
- if (precision <= 3) {
- avroLogicalType = LogicalTypes.timestampMillis();
+ if (legacyMapping) {
+ if (precision <= 3) {
+ avroLogicalType = LogicalTypes.timestampMillis();
+ } else if (precision <= 6) {
+ avroLogicalType = LogicalTypes.timestampMicros();
+ } else {
Review Comment:
I'm happy that you can add precision support here, but I suggest we can
open a new PR or commit to finish this work, it will be more clear.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]