This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a479aee9131bebc67c1eb512542eb7c8f1e54233 Author: Jark Wu <j...@apache.org> AuthorDate: Thu Jun 4 20:02:13 2020 +0800 [hotfix][avro] Fix AvroRowSerializationSchema doesn't support TIMESTAMP type --- .../apache/flink/formats/avro/AvroRowSerializationSchema.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java index d4c73197..30f9754 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java @@ -48,6 +48,9 @@ import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -242,14 +245,20 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { // check for logical types if (object instanceof Date) { return convertFromDate(schema, (Date) object); + } else if (object instanceof LocalDate) { + return convertFromDate(schema, Date.valueOf((LocalDate) object)); } else if (object instanceof Time) { return convertFromTime(schema, (Time) object); + } else if (object instanceof LocalTime) { + return convertFromTime(schema, Time.valueOf((LocalTime) object)); } return object; case LONG: // check for logical type if (object instanceof Timestamp) { return convertFromTimestamp(schema, (Timestamp) object); + } else if (object instanceof LocalDateTime) { + return convertFromTimestamp(schema, Timestamp.valueOf((LocalDateTime) object)); } return object; case FLOAT: