leonardBang commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1460846335


##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java:
##########
@@ -121,6 +128,12 @@ private static AvroToRowDataConverter 
createConverter(LogicalType type) {
                 return AvroToRowDataConverters::convertToTime;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 return AvroToRowDataConverters::convertToTimestamp;

Review Comment:
   IIUC, we should convert Flink SQL TIMESTAMP_NTZ type to AVRO Local Timestamp 
finally, this is the correct behavior



##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java:
##########
@@ -71,5 +70,12 @@ public InlineElement getDescription() {
         }
     }
 
+    @Deprecated
+    public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING =

Review Comment:
   Introducing a  **new** config option with `Deprecated` is a little confused 
to users, I think we can mark it as `Deprecated` when we decide to to deprecate 
it.



##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java:
##########
@@ -86,6 +86,27 @@ public AvroRowDataDeserializationSchema(
                 typeInfo);
     }
 
+    /**
+     * Creates an Avro deserialization schema for the given logical type.
+     *
+     * @param rowType The logical type used to deserialize the data.
+     * @param typeInfo The TypeInformation to be used by {@link
+     *     AvroRowDataDeserializationSchema#getProducedType()}.
+     * @param encoding The serialization approach used to deserialize the data.
+     * @param legacyMapping Whether to use legacy mapping.

Review Comment:
    the scope meaning of `legacyMapping` is too large for our case, what we 
want to express is only timestamp  types' legacy behavior, but this name looks 
like we have a legacy behavior for all types. How about 
`legacyTimestampMapping`?



##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java:
##########
@@ -352,17 +457,55 @@ public static Schema convertToSchema(LogicalType 
logicalType, String rowName) {
                 final TimestampType timestampType = (TimestampType) 
logicalType;
                 precision = timestampType.getPrecision();
                 org.apache.avro.LogicalType avroLogicalType;
-                if (precision <= 3) {
-                    avroLogicalType = LogicalTypes.timestampMillis();
+                if (legacyMapping) {
+                    if (precision <= 3) {
+                        avroLogicalType = LogicalTypes.timestampMillis();
+                    } else if (precision <= 6) {
+                        avroLogicalType = LogicalTypes.timestampMicros();
+                    } else {

Review Comment:
   I'm happy that you can  add precision support here, but I suggest we can 
open a new PR or commit to finish this work, it will be more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to