This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 533ead6ae94 [FLINK-33198][format/avro] Properly map Avro timestamp types in Flink Avro format 533ead6ae94 is described below commit 533ead6ae946cbc77525d276b6dea965d390181a Author: Peter Huang <huangzhenqiu0...@gmail.com> AuthorDate: Tue Jan 23 18:43:17 2024 -0800 [FLINK-33198][format/avro] Properly map Avro timestamp types in Flink Avro format This closes #23511. --- .../flink/formats/avro/AvroFileFormatFactory.java | 12 +- .../flink/formats/avro/AvroFormatFactory.java | 10 +- .../flink/formats/avro/AvroFormatOptions.java | 14 +- .../avro/AvroRowDataDeserializationSchema.java | 28 +- .../avro/AvroRowDataSerializationSchema.java | 17 ++ .../formats/avro/AvroRowDeserializationSchema.java | 86 ++++-- .../formats/avro/AvroRowSerializationSchema.java | 53 +++- .../formats/avro/AvroToRowDataConverters.java | 40 ++- .../formats/avro/RowDataToAvroConverters.java | 75 +++-- .../avro/typeutils/AvroSchemaConverter.java | 227 ++++++++++++--- .../flink/formats/avro/AvroFormatFactoryTest.java | 94 +++++- .../avro/AvroRowDataDeSerializationSchemaTest.java | 95 +++++- .../avro/AvroRowDeSerializationSchemaTest.java | 317 +++++++++++++++------ .../avro/typeutils/AvroSchemaConverterTest.java | 115 ++++++++ .../avro/typeutils/AvroTypeExtractionTest.java | 15 +- .../flink/formats/avro/utils/AvroTestUtils.java | 50 ++++ .../flink-avro/src/test/resources/avro/user.avsc | 13 +- 17 files changed, 1042 insertions(+), 219 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java index 5e128ec8ea3..2bc0fd1c036 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java @@ -57,6 +57,7 @@ import java.util.Set; import java.util.function.Function; import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC; +import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_TIMESTAMP_LEGACY_MAPPING; /** Avro format factory for file system. */ @Internal @@ -85,7 +86,8 @@ public class AvroFileFormatFactory implements BulkReaderFormatFactory, BulkWrite DynamicTableSink.Context context, DataType consumedDataType) { return new RowDataAvroWriterFactory( (RowType) consumedDataType.getLogicalType(), - formatOptions.get(AVRO_OUTPUT_CODEC)); + formatOptions.get(AVRO_OUTPUT_CODEC), + formatOptions.get(AVRO_TIMESTAMP_LEGACY_MAPPING)); } }; } @@ -104,6 +106,7 @@ public class AvroFileFormatFactory implements BulkReaderFormatFactory, BulkWrite public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet<>(); options.add(AVRO_OUTPUT_CODEC); + options.add(AVRO_TIMESTAMP_LEGACY_MAPPING); return options; } @@ -182,7 +185,8 @@ public class AvroFileFormatFactory implements BulkReaderFormatFactory, BulkWrite private final AvroWriterFactory<GenericRecord> factory; private final RowType rowType; - private RowDataAvroWriterFactory(RowType rowType, String codec) { + private RowDataAvroWriterFactory( + RowType rowType, String codec, boolean legacyTimestampMapping) { this.rowType = rowType; this.factory = new AvroWriterFactory<>( @@ -190,7 +194,9 @@ public class AvroFileFormatFactory implements BulkReaderFormatFactory, BulkWrite @Override public DataFileWriter<GenericRecord> createWriter(OutputStream out) throws IOException { - Schema schema = AvroSchemaConverter.convertToSchema(rowType); + Schema schema = + AvroSchemaConverter.convertToSchema( + rowType, legacyTimestampMapping); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); DataFileWriter<GenericRecord> dataFileWriter = diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java index 65b3c91badc..eda1cc1a898 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java @@ -45,6 +45,7 @@ import java.util.HashSet; import java.util.Set; import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_ENCODING; +import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_TIMESTAMP_LEGACY_MAPPING; /** * Table format factory for providing configured instances of Avro to RowData {@link @@ -61,6 +62,7 @@ public class AvroFormatFactory implements DeserializationFormatFactory, Serializ FactoryUtil.validateFactoryOptions(this, formatOptions); AvroEncoding encoding = formatOptions.get(AVRO_ENCODING); + boolean legacyTimestampMapping = formatOptions.get(AVRO_TIMESTAMP_LEGACY_MAPPING); return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() { @Override @@ -73,7 +75,8 @@ public class AvroFormatFactory implements DeserializationFormatFactory, Serializ final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation<RowData> rowDataTypeInfo = context.createTypeInformation(producedDataType); - return new AvroRowDataDeserializationSchema(rowType, rowDataTypeInfo, encoding); + return new AvroRowDataDeserializationSchema( + rowType, rowDataTypeInfo, encoding, legacyTimestampMapping); } @Override @@ -89,13 +92,15 @@ public class AvroFormatFactory implements DeserializationFormatFactory, Serializ FactoryUtil.validateFactoryOptions(this, formatOptions); AvroEncoding encoding = formatOptions.get(AVRO_ENCODING); + boolean legacyTimestampMapping = formatOptions.get(AVRO_TIMESTAMP_LEGACY_MAPPING); return new EncodingFormat<SerializationSchema<RowData>>() { @Override public SerializationSchema<RowData> createRuntimeEncoder( DynamicTableSink.Context context, DataType consumedDataType) { final RowType rowType = (RowType) consumedDataType.getLogicalType(); - return new AvroRowDataSerializationSchema(rowType, encoding); + return new AvroRowDataSerializationSchema( + rowType, encoding, legacyTimestampMapping); } @Override @@ -119,6 +124,7 @@ public class AvroFormatFactory implements DeserializationFormatFactory, Serializ public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet<>(); options.add(AVRO_ENCODING); + options.add(AVRO_TIMESTAMP_LEGACY_MAPPING); return options; } } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java index a9aa8c3a7a7..6a9d81ea195 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java @@ -36,7 +36,6 @@ public class AvroFormatOptions { .stringType() .defaultValue(SNAPPY_CODEC) .withDescription("The compression codec for avro"); - public static final ConfigOption<AvroEncoding> AVRO_ENCODING = ConfigOptions.key("encoding") .enumType(AvroEncoding.class) @@ -71,5 +70,18 @@ public class AvroFormatOptions { } } + public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING = + ConfigOptions.key("timestamp_mapping.legacy") + .booleanType() + .defaultValue(true) + .withDescription( + "Use the legacy mapping of timestamp in avro. " + + "Before 1.19, The default behavior of Flink wrongly mapped " + + "both SQL TIMESTAMP and TIMESTAMP_LTZ type to AVRO TIMESTAMP. " + + "The correct behavior is Flink SQL TIMESTAMP maps Avro LOCAL " + + "TIMESTAMP and Flink SQL TIMESTAMP_LTZ maps Avro TIMESTAMP, " + + "you can obtain the correct mapping by disable using this legacy mapping." + + " Use legacy behavior by default for compatibility consideration."); + private AvroFormatOptions() {} } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java index b692a2f1fb3..799eb90b98f 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java @@ -59,18 +59,18 @@ public class AvroRowDataDeserializationSchema implements DeserializationSchema<R private final AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter; /** - * Creates a Avro deserialization schema for the given logical type. + * Creates an Avro deserialization schema for the given logical type. * * @param rowType The logical type used to deserialize the data. * @param typeInfo The TypeInformation to be used by {@link * AvroRowDataDeserializationSchema#getProducedType()}. */ public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> typeInfo) { - this(rowType, typeInfo, AvroEncoding.BINARY); + this(rowType, typeInfo, AvroEncoding.BINARY, true); } /** - * Creates a Avro deserialization schema for the given logical type. + * Creates an Avro deserialization schema for the given logical type. * * @param rowType The logical type used to deserialize the data. * @param typeInfo The TypeInformation to be used by {@link @@ -86,6 +86,28 @@ public class AvroRowDataDeserializationSchema implements DeserializationSchema<R typeInfo); } + /** + * Creates an Avro deserialization schema for the given logical type. + * + * @param rowType The logical type used to deserialize the data. + * @param typeInfo The TypeInformation to be used by {@link + * AvroRowDataDeserializationSchema#getProducedType()}. + * @param encoding The serialization approach used to deserialize the data. + * @param legacyTimestampMapping Whether to use legacy timestamp mapping. + */ + public AvroRowDataDeserializationSchema( + RowType rowType, + TypeInformation<RowData> typeInfo, + AvroEncoding encoding, + boolean legacyTimestampMapping) { + this( + AvroDeserializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(rowType, legacyTimestampMapping), + encoding), + AvroToRowDataConverters.createRowConverter(rowType, legacyTimestampMapping), + typeInfo); + } + /** * Creates a Avro deserialization schema for the given logical type. * diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java index 76c9389998e..b88979cfc88 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java @@ -72,6 +72,23 @@ public class AvroRowDataSerializationSchema implements SerializationSchema<RowDa RowDataToAvroConverters.createConverter(rowType)); } + /** + * Creates an Avro serialization schema with the given record row type and legacy timestamp + * mapping flag. + * + * @param encoding The serialization approach used to serialize the data. + * @param legacyTimestampMapping Use the legacy timestamp mapping. + */ + public AvroRowDataSerializationSchema( + RowType rowType, AvroEncoding encoding, boolean legacyTimestampMapping) { + this( + rowType, + AvroSerializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(rowType, legacyTimestampMapping), + encoding), + RowDataToAvroConverters.createConverter(rowType, legacyTimestampMapping)); + } + /** * Creates an Avro serialization schema with the given record row type, nested schema and * runtime converters. diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java index 8f72d02499d..74429b102f2 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.avro; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; @@ -58,7 +59,9 @@ import java.sql.Time; import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneOffset; import java.time.temporal.ChronoField; import java.util.HashMap; import java.util.List; @@ -159,12 +162,17 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< @Override public Row deserialize(byte[] message) throws IOException { + return deserialize(message, true); + } + + @VisibleForTesting + Row deserialize(byte[] message, boolean legacyTimestampMapping) throws IOException { try { inputStream.setBuffer(message); record = datumReader.read(record, decoder); - return convertAvroRecordToRow(schema, typeInfo, record); + return convertAvroRecordToRow(schema, typeInfo, record, legacyTimestampMapping); } catch (Exception e) { - throw new IOException("Failed to deserialize Avro record.", e); + throw new RuntimeException("Failed to deserialize Avro record.", e); } } @@ -193,19 +201,27 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< // -------------------------------------------------------------------------------------------- - private Row convertAvroRecordToRow(Schema schema, RowTypeInfo typeInfo, IndexedRecord record) { + private Row convertAvroRecordToRow( + Schema schema, + RowTypeInfo typeInfo, + IndexedRecord record, + boolean legacyTimestampMapping) { final List<Schema.Field> fields = schema.getFields(); final TypeInformation<?>[] fieldInfo = typeInfo.getFieldTypes(); final int length = fields.size(); final Row row = new Row(length); for (int i = 0; i < length; i++) { final Schema.Field field = fields.get(i); - row.setField(i, convertAvroType(field.schema(), fieldInfo[i], record.get(i))); + row.setField( + i, + convertAvroType( + field.schema(), fieldInfo[i], record.get(i), legacyTimestampMapping)); } return row; } - private Object convertAvroType(Schema schema, TypeInformation<?> info, Object object) { + private Object convertAvroType( + Schema schema, TypeInformation<?> info, Object object, boolean legacyTimestampMapping) { // we perform the conversion based on schema information but enriched with pre-computed // type information where useful (i.e., for arrays) @@ -216,7 +232,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< case RECORD: if (object instanceof IndexedRecord) { return convertAvroRecordToRow( - schema, (RowTypeInfo) info, (IndexedRecord) object); + schema, + (RowTypeInfo) info, + (IndexedRecord) object, + legacyTimestampMapping); } throw new IllegalStateException( "IndexedRecord expected but was: " + object.getClass()); @@ -227,11 +246,13 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< if (info instanceof BasicArrayTypeInfo) { final TypeInformation<?> elementInfo = ((BasicArrayTypeInfo<?, ?>) info).getComponentInfo(); - return convertToObjectArray(schema.getElementType(), elementInfo, object); + return convertToObjectArray( + schema.getElementType(), elementInfo, object, legacyTimestampMapping); } else { final TypeInformation<?> elementInfo = ((ObjectArrayTypeInfo<?, ?>) info).getComponentInfo(); - return convertToObjectArray(schema.getElementType(), elementInfo, object); + return convertToObjectArray( + schema.getElementType(), elementInfo, object, legacyTimestampMapping); } case MAP: final MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) info; @@ -243,7 +264,8 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< convertAvroType( schema.getValueType(), mapTypeInfo.getValueTypeInfo(), - entry.getValue())); + entry.getValue(), + legacyTimestampMapping)); } return convertedMap; case UNION: @@ -260,7 +282,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< // generic type return object; } - return convertAvroType(actualSchema, info, object); + return convertAvroType(actualSchema, info, object, legacyTimestampMapping); case FIXED: final byte[] fixedBytes = ((GenericFixed) object).bytes(); if (info == Types.BIG_DEC) { @@ -285,7 +307,11 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< case LONG: if (info == Types.SQL_TIMESTAMP) { return convertToTimestamp( - object, schema.getLogicalType() == LogicalTypes.timestampMicros()); + object, + schema.getLogicalType() == LogicalTypes.timestampMicros() + || schema.getLogicalType() + == LogicalTypes.localTimestampMicros(), + legacyTimestampMapping); } else if (info == Types.SQL_TIME) { return convertToTime(object); } @@ -339,7 +365,8 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< return new Time(millis - LOCAL_TZ.getOffset(millis)); } - private Timestamp convertToTimestamp(Object object, boolean isMicros) { + private Timestamp convertToTimestamp( + Object object, boolean isMicros, boolean legacyTimestampMapping) { final long millis; if (object instanceof Long) { if (isMicros) { @@ -362,13 +389,15 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< } } else if (object instanceof Instant) { Instant instant = (Instant) object; - int offsetMillis = LOCAL_TZ.getOffset(instant.toEpochMilli()); - - long seconds = instant.getEpochSecond() - offsetMillis / 1000; - int nanos = instant.getNano() - offsetMillis % 1000 * 1000; - Timestamp timestamp = new Timestamp(seconds * 1000L); - timestamp.setNanos(nanos); - return timestamp; + return convertToTimestamp(instant); + } else if (object instanceof LocalDateTime) { + if (legacyTimestampMapping) { + throw new IllegalArgumentException( + "Unexpected object type for DATE logical type. Received: " + object); + } else { + Instant instant = ((LocalDateTime) object).toInstant(ZoneOffset.UTC); + return convertToTimestamp(instant); + } } else if (jodaConverter != null) { millis = jodaConverter.convertTimestamp(object); } else { @@ -378,13 +407,28 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< return new Timestamp(millis - LOCAL_TZ.getOffset(millis)); } + private Timestamp convertToTimestamp(Instant instant) { + int offsetMillis = LOCAL_TZ.getOffset(instant.toEpochMilli()); + + long seconds = instant.getEpochSecond() - offsetMillis / 1000; + int nanos = instant.getNano() - offsetMillis % 1000 * 1000; + Timestamp timestamp = new Timestamp(seconds * 1000L); + timestamp.setNanos(nanos); + return timestamp; + } + private Object[] convertToObjectArray( - Schema elementSchema, TypeInformation<?> elementInfo, Object object) { + Schema elementSchema, + TypeInformation<?> elementInfo, + Object object, + boolean legacyTimestampMapping) { final List<?> list = (List<?>) object; final Object[] convertedArray = (Object[]) Array.newInstance(elementInfo.getTypeClass(), list.size()); for (int i = 0; i < list.size(); i++) { - convertedArray[i] = convertAvroType(elementSchema, elementInfo, list.get(i)); + convertedArray[i] = + convertAvroType( + elementSchema, elementInfo, list.get(i), legacyTimestampMapping); } return convertedArray; } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java index 46f2f950958..1e80306538c 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.types.Row; @@ -60,7 +61,7 @@ import java.util.TimeZone; /** * Serialization schema that serializes {@link Row} into Avro bytes. * - * <p>Serializes objects that are represented in (nested) Flink rows. It support types that are + * <p>Serializes objects that are represented in (nested) Flink rows. It supports types that are * compatible with Flink's Table & SQL API. * * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime class @@ -130,9 +131,15 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { @Override public byte[] serialize(Row row) { + return serialize(row, true); + } + + @VisibleForTesting + byte[] serialize(Row row, boolean legacyTimestampMapping) { try { // convert to record - final GenericRecord record = convertRowToAvroRecord(schema, row); + final GenericRecord record = + convertRowToAvroRecord(schema, row, legacyTimestampMapping); arrayOutputStream.reset(); datumWriter.write(record, encoder); encoder.flush(); @@ -162,25 +169,27 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { // -------------------------------------------------------------------------------------------- - private GenericRecord convertRowToAvroRecord(Schema schema, Row row) { + private GenericRecord convertRowToAvroRecord( + Schema schema, Row row, boolean legacyTimestampMapping) { final List<Schema.Field> fields = schema.getFields(); final int length = fields.size(); final GenericRecord record = new GenericData.Record(schema); for (int i = 0; i < length; i++) { final Schema.Field field = fields.get(i); - record.put(i, convertFlinkType(field.schema(), row.getField(i))); + record.put( + i, convertFlinkType(field.schema(), row.getField(i), legacyTimestampMapping)); } return record; } - private Object convertFlinkType(Schema schema, Object object) { + private Object convertFlinkType(Schema schema, Object object, boolean legacyTimestampMapping) { if (object == null) { return null; } switch (schema.getType()) { case RECORD: if (object instanceof Row) { - return convertRowToAvroRecord(schema, (Row) object); + return convertRowToAvroRecord(schema, (Row) object, legacyTimestampMapping); } throw new IllegalStateException("Row expected but was: " + object.getClass()); case ENUM: @@ -191,7 +200,8 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { final GenericData.Array<Object> convertedArray = new GenericData.Array<>(array.length, schema); for (Object element : array) { - convertedArray.add(convertFlinkType(elementSchema, element)); + convertedArray.add( + convertFlinkType(elementSchema, element, legacyTimestampMapping)); } return convertedArray; case MAP: @@ -200,7 +210,10 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { for (Map.Entry<?, ?> entry : map.entrySet()) { convertedMap.put( new Utf8(entry.getKey().toString()), - convertFlinkType(schema.getValueType(), entry.getValue())); + convertFlinkType( + schema.getValueType(), + entry.getValue(), + legacyTimestampMapping)); } return convertedMap; case UNION: @@ -217,7 +230,7 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { // generic type return object; } - return convertFlinkType(actualSchema, object); + return convertFlinkType(actualSchema, object, legacyTimestampMapping); case FIXED: // check for logical type if (object instanceof BigDecimal) { @@ -248,9 +261,12 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { case LONG: // check for logical type if (object instanceof Timestamp) { - return convertFromTimestamp(schema, (Timestamp) object); + return convertFromTimestamp(schema, (Timestamp) object, legacyTimestampMapping); } else if (object instanceof LocalDateTime) { - return convertFromTimestamp(schema, Timestamp.valueOf((LocalDateTime) object)); + return convertFromTimestamp( + schema, + Timestamp.valueOf((LocalDateTime) object), + legacyTimestampMapping); } else if (object instanceof Time) { return convertFromTimeMicros(schema, (Time) object); } @@ -311,13 +327,22 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { } } - private long convertFromTimestamp(Schema schema, Timestamp date) { + private long convertFromTimestamp( + Schema schema, Timestamp date, boolean legacyTimestampMapping) { final LogicalType logicalType = schema.getLogicalType(); - if (logicalType == LogicalTypes.timestampMillis()) { + if (legacyTimestampMapping + && (logicalType == LogicalTypes.localTimestampMillis() + || logicalType == LogicalTypes.localTimestampMicros())) { + throw new RuntimeException("Unsupported local timestamp type."); + } + + if (logicalType == LogicalTypes.timestampMillis() + || logicalType == LogicalTypes.localTimestampMillis()) { // adopted from Apache Calcite final long time = date.getTime(); return time + (long) LOCAL_TZ.getOffset(time); - } else if (logicalType == LogicalTypes.timestampMicros()) { + } else if (logicalType == LogicalTypes.timestampMicros() + || logicalType == LogicalTypes.localTimestampMicros()) { long millis = date.getTime(); long micros = millis * 1000 + (date.getNanos() % 1_000_000 / 1000); long offset = LOCAL_TZ.getOffset(millis) * 1000L; diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java index 7f2ca46f1e5..9c63c56c4dc 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java @@ -42,6 +42,7 @@ import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.time.temporal.ChronoField; import java.util.HashMap; @@ -68,10 +69,15 @@ public class AvroToRowDataConverters { // ------------------------------------------------------------------------------------- public static AvroToRowDataConverter createRowConverter(RowType rowType) { + return createRowConverter(rowType, true); + } + + public static AvroToRowDataConverter createRowConverter( + RowType rowType, boolean legacyTimestampMapping) { final AvroToRowDataConverter[] fieldConverters = rowType.getFields().stream() .map(RowType.RowField::getType) - .map(AvroToRowDataConverters::createNullableConverter) + .map(type -> createNullableConverter(type, legacyTimestampMapping)) .toArray(AvroToRowDataConverter[]::new); final int arity = rowType.getFieldCount(); @@ -88,8 +94,9 @@ public class AvroToRowDataConverters { } /** Creates a runtime converter which is null safe. */ - private static AvroToRowDataConverter createNullableConverter(LogicalType type) { - final AvroToRowDataConverter converter = createConverter(type); + private static AvroToRowDataConverter createNullableConverter( + LogicalType type, boolean legacyTimestampMapping) { + final AvroToRowDataConverter converter = createConverter(type, legacyTimestampMapping); return avroObject -> { if (avroObject == null) { return null; @@ -99,7 +106,8 @@ public class AvroToRowDataConverters { } /** Creates a runtime converter which assuming input object is not null. */ - private static AvroToRowDataConverter createConverter(LogicalType type) { + private static AvroToRowDataConverter createConverter( + LogicalType type, boolean legacyTimestampMapping) { switch (type.getTypeRoot()) { case NULL: return avroObject -> null; @@ -121,6 +129,12 @@ public class AvroToRowDataConverters { return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTimestamp; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (legacyTimestampMapping) { + throw new UnsupportedOperationException("Unsupported type: " + type); + } else { + return AvroToRowDataConverters::convertToTimestamp; + } case CHAR: case VARCHAR: return avroObject -> StringData.fromString(avroObject.toString()); @@ -130,12 +144,12 @@ public class AvroToRowDataConverters { case DECIMAL: return createDecimalConverter((DecimalType) type); case ARRAY: - return createArrayConverter((ArrayType) type); + return createArrayConverter((ArrayType) type, legacyTimestampMapping); case ROW: return createRowConverter((RowType) type); case MAP: case MULTISET: - return createMapConverter(type); + return createMapConverter(type, legacyTimestampMapping); case RAW: default: throw new UnsupportedOperationException("Unsupported type: " + type); @@ -160,9 +174,10 @@ public class AvroToRowDataConverters { }; } - private static AvroToRowDataConverter createArrayConverter(ArrayType arrayType) { + private static AvroToRowDataConverter createArrayConverter( + ArrayType arrayType, boolean legacyTimestampMapping) { final AvroToRowDataConverter elementConverter = - createNullableConverter(arrayType.getElementType()); + createNullableConverter(arrayType.getElementType(), legacyTimestampMapping); final Class<?> elementClass = LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); @@ -177,11 +192,12 @@ public class AvroToRowDataConverters { }; } - private static AvroToRowDataConverter createMapConverter(LogicalType type) { + private static AvroToRowDataConverter createMapConverter( + LogicalType type, boolean legacyTimestampMapping) { final AvroToRowDataConverter keyConverter = - createConverter(DataTypes.STRING().getLogicalType()); + createConverter(DataTypes.STRING().getLogicalType(), legacyTimestampMapping); final AvroToRowDataConverter valueConverter = - createNullableConverter(extractValueTypeToAvroMap(type)); + createNullableConverter(extractValueTypeToAvroMap(type), legacyTimestampMapping); return avroObject -> { final Map<?, ?> map = (Map<?, ?>) avroObject; @@ -201,6 +217,8 @@ public class AvroToRowDataConverters { millis = (Long) object; } else if (object instanceof Instant) { millis = ((Instant) object).toEpochMilli(); + } else if (object instanceof LocalDateTime) { + return TimestampData.fromLocalDateTime((LocalDateTime) object); } else { JodaConverter jodaConverter = JodaConverter.getConverter(); if (jodaConverter != null) { diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java index 9d8d9425aff..af7a936b270 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java @@ -36,6 +36,7 @@ import org.apache.avro.util.Utf8; import java.io.Serializable; import java.nio.ByteBuffer; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -71,6 +72,11 @@ public class RowDataToAvroConverters { * Flink Table & SQL internal data structures to corresponding Avro data structures. */ public static RowDataToAvroConverter createConverter(LogicalType type) { + return createConverter(type, true); + } + + public static RowDataToAvroConverter createConverter( + LogicalType type, boolean legacyTimestampMapping) { final RowDataToAvroConverter converter; switch (type.getTypeRoot()) { case NULL: @@ -150,15 +156,45 @@ public class RowDataToAvroConverters { }; break; case TIMESTAMP_WITHOUT_TIME_ZONE: - converter = - new RowDataToAvroConverter() { - private static final long serialVersionUID = 1L; + if (legacyTimestampMapping) { + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; - @Override - public Object convert(Schema schema, Object object) { - return ((TimestampData) object).toInstant().toEpochMilli(); - } - }; + @Override + public Object convert(Schema schema, Object object) { + return ((TimestampData) object).toInstant().toEpochMilli(); + } + }; + } else { + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((TimestampData) object) + .toLocalDateTime() + .toInstant(ZoneOffset.UTC) + .toEpochMilli(); + } + }; + } + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (legacyTimestampMapping) { + throw new UnsupportedOperationException("Unsupported type: " + type); + } else { + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((TimestampData) object).toInstant().toEpochMilli(); + } + }; + } break; case DECIMAL: converter = @@ -172,14 +208,14 @@ public class RowDataToAvroConverters { }; break; case ARRAY: - converter = createArrayConverter((ArrayType) type); + converter = createArrayConverter((ArrayType) type, legacyTimestampMapping); break; case ROW: - converter = createRowConverter((RowType) type); + converter = createRowConverter((RowType) type, legacyTimestampMapping); break; case MAP: case MULTISET: - converter = createMapConverter(type); + converter = createMapConverter(type, legacyTimestampMapping); break; case RAW: default: @@ -217,10 +253,11 @@ public class RowDataToAvroConverters { }; } - private static RowDataToAvroConverter createRowConverter(RowType rowType) { + private static RowDataToAvroConverter createRowConverter( + RowType rowType, boolean legacyTimestampMapping) { final RowDataToAvroConverter[] fieldConverters = rowType.getChildren().stream() - .map(RowDataToAvroConverters::createConverter) + .map(legacyType -> createConverter(legacyType, legacyTimestampMapping)) .toArray(RowDataToAvroConverter[]::new); final LogicalType[] fieldTypes = rowType.getFields().stream() @@ -259,10 +296,12 @@ public class RowDataToAvroConverters { }; } - private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType) { + private static RowDataToAvroConverter createArrayConverter( + ArrayType arrayType, boolean legacyTimestampMapping) { LogicalType elementType = arrayType.getElementType(); final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType); - final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType()); + final RowDataToAvroConverter elementConverter = + createConverter(arrayType.getElementType(), legacyTimestampMapping); return new RowDataToAvroConverter() { private static final long serialVersionUID = 1L; @@ -282,10 +321,12 @@ public class RowDataToAvroConverters { }; } - private static RowDataToAvroConverter createMapConverter(LogicalType type) { + private static RowDataToAvroConverter createMapConverter( + LogicalType type, boolean legacyTimestampMapping) { LogicalType valueType = extractValueTypeToAvroMap(type); final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType); - final RowDataToAvroConverter valueConverter = createConverter(valueType); + final RowDataToAvroConverter valueConverter = + createConverter(valueType, legacyTimestampMapping); return new RowDataToAvroConverter() { private static final long serialVersionUID = 1L; diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java index 042a343af12..178056e145a 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java @@ -29,6 +29,7 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.types.logical.MapType; @@ -73,10 +74,24 @@ public class AvroSchemaConverter { @SuppressWarnings("unchecked") public static <T extends SpecificRecord> TypeInformation<Row> convertToTypeInfo( Class<T> avroClass) { + return convertToTypeInfo(avroClass, true); + } + + /** + * Converts an Avro class into a nested row structure with deterministic field order and data + * types that are compatible with Flink's Table & SQL API. + * + * @param avroClass Avro specific record that contains schema information + * @param legacyTimestampMapping legacy mapping of timestamp types + * @return type information matching the schema + */ + @SuppressWarnings("unchecked") + public static <T extends SpecificRecord> TypeInformation<Row> convertToTypeInfo( + Class<T> avroClass, boolean legacyTimestampMapping) { Preconditions.checkNotNull(avroClass, "Avro specific record class must not be null."); // determine schema to retrieve deterministic field order final Schema schema = SpecificData.get().getSchema(avroClass); - return (TypeInformation<Row>) convertToTypeInfo(schema); + return (TypeInformation<Row>) convertToTypeInfo(schema, true); } /** @@ -88,6 +103,20 @@ public class AvroSchemaConverter { */ @SuppressWarnings("unchecked") public static <T> TypeInformation<T> convertToTypeInfo(String avroSchemaString) { + return convertToTypeInfo(avroSchemaString, true); + } + + /** + * Converts an Avro schema string into a nested row structure with deterministic field order and + * data types that are compatible with Flink's Table & SQL API. + * + * @param avroSchemaString Avro schema definition string + * @param legacyTimestampMapping legacy mapping of timestamp types + * @return type information matching the schema + */ + @SuppressWarnings("unchecked") + public static <T> TypeInformation<T> convertToTypeInfo( + String avroSchemaString, boolean legacyTimestampMapping) { Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null."); final Schema schema; try { @@ -95,10 +124,11 @@ public class AvroSchemaConverter { } catch (SchemaParseException e) { throw new IllegalArgumentException("Could not parse Avro schema string.", e); } - return (TypeInformation<T>) convertToTypeInfo(schema); + return (TypeInformation<T>) convertToTypeInfo(schema, legacyTimestampMapping); } - private static TypeInformation<?> convertToTypeInfo(Schema schema) { + private static TypeInformation<?> convertToTypeInfo( + Schema schema, boolean legacyTimestampMapping) { switch (schema.getType()) { case RECORD: final List<Schema.Field> fields = schema.getFields(); @@ -107,7 +137,7 @@ public class AvroSchemaConverter { final String[] names = new String[fields.size()]; for (int i = 0; i < fields.size(); i++) { final Schema.Field field = fields.get(i); - types[i] = convertToTypeInfo(field.schema()); + types[i] = convertToTypeInfo(field.schema(), legacyTimestampMapping); names[i] = field.name(); } return Types.ROW_NAMED(names, types); @@ -115,9 +145,12 @@ public class AvroSchemaConverter { return Types.STRING; case ARRAY: // result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings - return Types.OBJECT_ARRAY(convertToTypeInfo(schema.getElementType())); + return Types.OBJECT_ARRAY( + convertToTypeInfo(schema.getElementType(), legacyTimestampMapping)); case MAP: - return Types.MAP(Types.STRING, convertToTypeInfo(schema.getValueType())); + return Types.MAP( + Types.STRING, + convertToTypeInfo(schema.getValueType(), legacyTimestampMapping)); case UNION: final Schema actualSchema; if (schema.getTypes().size() == 2 @@ -132,7 +165,7 @@ public class AvroSchemaConverter { // use Kryo for serialization return Types.GENERIC(Object.class); } - return convertToTypeInfo(actualSchema); + return convertToTypeInfo(actualSchema, legacyTimestampMapping); case FIXED: // logical decimal type if (schema.getLogicalType() instanceof LogicalTypes.Decimal) { @@ -159,12 +192,26 @@ public class AvroSchemaConverter { } return Types.INT; case LONG: - // logical timestamp type - if (schema.getLogicalType() == LogicalTypes.timestampMillis() - || schema.getLogicalType() == LogicalTypes.timestampMicros()) { - return Types.SQL_TIMESTAMP; - } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { - return Types.SQL_TIME; + if (legacyTimestampMapping) { + if (schema.getLogicalType() == LogicalTypes.timestampMillis() + || schema.getLogicalType() == LogicalTypes.timestampMicros()) { + return Types.SQL_TIMESTAMP; + } else if (schema.getLogicalType() == LogicalTypes.timeMicros() + || schema.getLogicalType() == LogicalTypes.timeMillis()) { + return Types.SQL_TIME; + } + } else { + // Avro logical timestamp types to Flink DataStream timestamp types + if (schema.getLogicalType() == LogicalTypes.timestampMillis() + || schema.getLogicalType() == LogicalTypes.timestampMicros()) { + return Types.INSTANT; + } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis() + || schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { + return Types.LOCAL_DATE_TIME; + } else if (schema.getLogicalType() == LogicalTypes.timeMicros() + || schema.getLogicalType() == LogicalTypes.timeMillis()) { + return Types.SQL_TIME; + } } return Types.LONG; case FLOAT: @@ -187,6 +234,19 @@ public class AvroSchemaConverter { * @return data type matching the schema */ public static DataType convertToDataType(String avroSchemaString) { + return convertToDataType(avroSchemaString, true); + } + + /** + * Converts an Avro schema string into a nested row structure with deterministic field order and + * data types that are compatible with Flink's Table & SQL API. + * + * @param avroSchemaString Avro schema definition string + * @param legacyTimestampMapping legacy mapping of local timestamps + * @return data type matching the schema + */ + public static DataType convertToDataType( + String avroSchemaString, boolean legacyTimestampMapping) { Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null."); final Schema schema; try { @@ -194,10 +254,10 @@ public class AvroSchemaConverter { } catch (SchemaParseException e) { throw new IllegalArgumentException("Could not parse Avro schema string.", e); } - return convertToDataType(schema); + return convertToDataType(schema, legacyTimestampMapping); } - private static DataType convertToDataType(Schema schema) { + private static DataType convertToDataType(Schema schema, boolean legacyMapping) { switch (schema.getType()) { case RECORD: final List<Schema.Field> schemaFields = schema.getFields(); @@ -205,17 +265,20 @@ public class AvroSchemaConverter { final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()]; for (int i = 0; i < schemaFields.size(); i++) { final Schema.Field field = schemaFields.get(i); - fields[i] = DataTypes.FIELD(field.name(), convertToDataType(field.schema())); + fields[i] = + DataTypes.FIELD( + field.name(), convertToDataType(field.schema(), legacyMapping)); } return DataTypes.ROW(fields).notNull(); case ENUM: return DataTypes.STRING().notNull(); case ARRAY: - return DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull(); + return DataTypes.ARRAY(convertToDataType(schema.getElementType(), legacyMapping)) + .notNull(); case MAP: return DataTypes.MAP( DataTypes.STRING().notNull(), - convertToDataType(schema.getValueType())) + convertToDataType(schema.getValueType(), legacyMapping)) .notNull(); case UNION: final Schema actualSchema; @@ -236,7 +299,7 @@ public class AvroSchemaConverter { return new AtomicDataType( new TypeInformationRawType<>(false, Types.GENERIC(Object.class))); } - DataType converted = convertToDataType(actualSchema); + DataType converted = convertToDataType(actualSchema, legacyMapping); return nullable ? converted.nullable() : converted; case FIXED: // logical decimal type @@ -270,16 +333,34 @@ public class AvroSchemaConverter { } return DataTypes.INT().notNull(); case LONG: - // logical timestamp type - if (schema.getLogicalType() == LogicalTypes.timestampMillis()) { - return DataTypes.TIMESTAMP(3).notNull(); - } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { - return DataTypes.TIMESTAMP(6).notNull(); - } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) { - return DataTypes.TIME(3).notNull(); - } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { - return DataTypes.TIME(6).notNull(); + if (legacyMapping) { + // Avro logical timestamp types to Flink SQL timestamp types + if (schema.getLogicalType() == LogicalTypes.timestampMillis()) { + return DataTypes.TIMESTAMP(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + return DataTypes.TIMESTAMP(6).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) { + return DataTypes.TIME(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6).notNull(); + } + } else { + // Avro logical timestamp types to Flink SQL timestamp types + if (schema.getLogicalType() == LogicalTypes.timestampMillis()) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) { + return DataTypes.TIME(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) { + return DataTypes.TIMESTAMP(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { + return DataTypes.TIMESTAMP(6).notNull(); + } } + return DataTypes.BIGINT().notNull(); case FLOAT: return DataTypes.FLOAT().notNull(); @@ -303,7 +384,22 @@ public class AvroSchemaConverter { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType schema) { - return convertToSchema(schema, "org.apache.flink.avro.generated.record"); + return convertToSchema(schema, true); + } + + /** + * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. + * + * <p>Use "org.apache.flink.avro.generated.record" as the type name. + * + * @param schema the schema type, usually it should be the top level record type, e.g. not a + * nested type + * @param legacyTimestampMapping whether to use the legacy timestamp mapping + * @return Avro's {@link Schema} matching this logical type. + */ + public static Schema convertToSchema(LogicalType schema, boolean legacyTimestampMapping) { + return convertToSchema( + schema, "org.apache.flink.avro.generated.record", legacyTimestampMapping); } /** @@ -317,6 +413,22 @@ public class AvroSchemaConverter { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType, String rowName) { + return convertToSchema(logicalType, rowName, true); + } + + /** + * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. + * + * <p>The "{rowName}_" is used as the nested row type name prefix in order to generate the right + * schema. Nested record type that only differs with type name is still compatible. + * + * @param logicalType logical type + * @param rowName the record name + * @param legacyTimestampMapping whether to use legal timestamp mapping + * @return Avro's {@link Schema} matching this logical type. + */ + public static Schema convertToSchema( + LogicalType logicalType, String rowName, boolean legacyTimestampMapping) { int precision; boolean nullable = logicalType.isNullable(); switch (logicalType.getTypeRoot()) { @@ -352,17 +464,53 @@ public class AvroSchemaConverter { final TimestampType timestampType = (TimestampType) logicalType; precision = timestampType.getPrecision(); org.apache.avro.LogicalType avroLogicalType; - if (precision <= 3) { - avroLogicalType = LogicalTypes.timestampMillis(); + if (legacyTimestampMapping) { + if (precision <= 3) { + avroLogicalType = LogicalTypes.timestampMillis(); + } else { + throw new IllegalArgumentException( + "Avro does not support TIMESTAMP type " + + "with precision: " + + precision + + ", it only supports precision less than 3."); + } } else { - throw new IllegalArgumentException( - "Avro does not support TIMESTAMP type " - + "with precision: " - + precision - + ", it only supports precision less than 3."); + if (precision <= 3) { + avroLogicalType = LogicalTypes.localTimestampMillis(); + } else if (precision <= 6) { + avroLogicalType = LogicalTypes.localTimestampMicros(); + } else { + throw new IllegalArgumentException( + "Avro does not support LOCAL TIMESTAMP type " + + "with precision: " + + precision + + ", it only supports precision less than 6."); + } } Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType()); return nullable ? nullableSchema(timestamp) : timestamp; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (legacyTimestampMapping) { + throw new UnsupportedOperationException( + "Unsupported to derive Schema for type: " + logicalType); + } else { + final LocalZonedTimestampType localZonedTimestampType = + (LocalZonedTimestampType) logicalType; + precision = localZonedTimestampType.getPrecision(); + if (precision <= 3) { + avroLogicalType = LogicalTypes.timestampMillis(); + } else if (precision <= 6) { + avroLogicalType = LogicalTypes.timestampMicros(); + } else { + throw new IllegalArgumentException( + "Avro does not support TIMESTAMP type " + + "with precision: " + + precision + + ", it only supports precision less than 6."); + } + timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType()); + return nullable ? nullableSchema(timestamp) : timestamp; + } case DATE: // use int to represents Date Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); @@ -397,7 +545,11 @@ public class AvroSchemaConverter { LogicalType fieldType = rowType.getTypeAt(i); SchemaBuilder.GenericDefault<Schema> fieldBuilder = builder.name(fieldName) - .type(convertToSchema(fieldType, rowName + "_" + fieldName)); + .type( + convertToSchema( + fieldType, + rowName + "_" + fieldName, + legacyTimestampMapping)); if (fieldType.isNullable()) { builder = fieldBuilder.withDefault(null); @@ -424,7 +576,6 @@ public class AvroSchemaConverter { .items(convertToSchema(arrayType.getElementType(), rowName)); return nullable ? nullableSchema(array) : array; case RAW: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: default: throw new UnsupportedOperationException( "Unsupported to derive Schema for type: " + logicalType); diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java index fef28062e63..0b9d572a13b 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java @@ -33,13 +33,16 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import java.util.HashMap; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link AvroFormatFactory}. */ class AvroFormatFactoryTest { @@ -48,11 +51,25 @@ class AvroFormatFactoryTest { ResolvedSchema.of( Column.physical("a", DataTypes.STRING()), Column.physical("b", DataTypes.INT()), - Column.physical("c", DataTypes.BOOLEAN())); + Column.physical("c", DataTypes.BOOLEAN()), + Column.physical("d", DataTypes.TIMESTAMP(3))); + + private static final ResolvedSchema NEW_SCHEMA = + ResolvedSchema.of( + Column.physical("a", DataTypes.STRING()), + Column.physical("b", DataTypes.INT()), + Column.physical("c", DataTypes.BOOLEAN()), + Column.physical("d", DataTypes.TIMESTAMP(3)), + Column.physical("e", DataTypes.TIMESTAMP(6)), + Column.physical("f", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), + Column.physical("g", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))); private static final RowType ROW_TYPE = (RowType) SCHEMA.toPhysicalRowDataType().getLogicalType(); + private static final RowType NEW_ROW_TYPE = + (RowType) NEW_SCHEMA.toPhysicalRowDataType().getLogicalType(); + @ParameterizedTest @EnumSource(AvroEncoding.class) void testSeDeSchema(AvroEncoding encoding) { @@ -60,7 +77,7 @@ class AvroFormatFactoryTest { new AvroRowDataDeserializationSchema( ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), encoding); - final Map<String, String> options = getAllOptions(); + final Map<String, String> options = getAllOptions(true); final DynamicTableSource actualSource = FactoryMocks.createTableSource(SCHEMA, options); assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class); @@ -87,16 +104,87 @@ class AvroFormatFactoryTest { assertThat(actualSer).isEqualTo(expectedSer); } + @Test + void testOldSeDeNewSchema() { + assertThatThrownBy( + () -> { + new AvroRowDataDeserializationSchema( + NEW_ROW_TYPE, InternalTypeInfo.of(NEW_ROW_TYPE)); + }) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); + + assertThatThrownBy( + () -> { + new AvroRowDataSerializationSchema(NEW_ROW_TYPE); + }) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); + } + + @Test + void testNewSeDeNewSchema() { + testSeDeSchema(NEW_ROW_TYPE, NEW_SCHEMA, false); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSeDeSchema(boolean legacyTimestampMapping) { + testSeDeSchema(ROW_TYPE, SCHEMA, legacyTimestampMapping); + } + + void testSeDeSchema(RowType rowType, ResolvedSchema schema, boolean legacyTimestampMapping) { + final AvroRowDataDeserializationSchema expectedDeser = + new AvroRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + AvroEncoding.BINARY, + legacyTimestampMapping); + + final Map<String, String> options = getAllOptions(legacyTimestampMapping); + + final DynamicTableSource actualSource = FactoryMocks.createTableSource(schema, options); + assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class); + TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock = + (TestDynamicTableFactory.DynamicTableSourceMock) actualSource; + + DeserializationSchema<RowData> actualDeser = + scanSourceMock.valueFormat.createRuntimeDecoder( + ScanRuntimeProviderContext.INSTANCE, schema.toPhysicalRowDataType()); + + assertThat(actualDeser).isEqualTo(expectedDeser); + + final AvroRowDataSerializationSchema expectedSer = + new AvroRowDataSerializationSchema( + rowType, AvroEncoding.BINARY, legacyTimestampMapping); + + final DynamicTableSink actualSink = FactoryMocks.createTableSink(schema, options); + assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class); + TestDynamicTableFactory.DynamicTableSinkMock sinkMock = + (TestDynamicTableFactory.DynamicTableSinkMock) actualSink; + + SerializationSchema<RowData> actualSer = + sinkMock.valueFormat.createRuntimeEncoder(null, schema.toPhysicalRowDataType()); + + assertThat(actualSer).isEqualTo(expectedSer); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ - private Map<String, String> getAllOptions() { + private Map<String, String> getAllOptions(boolean legacyTimestampMapping) { final Map<String, String> options = new HashMap<>(); options.put("connector", TestDynamicTableFactory.IDENTIFIER); options.put("target", "MyTarget"); options.put("buffer-size", "1000"); + if (!legacyTimestampMapping) { + options.put("avro.timestamp_mapping.legacy", "false"); + } + options.put("format", AvroFormatFactory.IDENTIFIER); return options; } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java index 561526112a2..84f9994f666 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java @@ -19,16 +19,21 @@ package org.apache.flink.formats.avro; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding; import org.apache.flink.formats.avro.generated.LogicalTimeRecord; +import org.apache.flink.formats.avro.generated.Timestamps; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.AvroTestUtils; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.AtomicDataType; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; import org.apache.flink.util.InstantiationUtil; import org.apache.avro.Schema; @@ -39,7 +44,10 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -84,7 +92,7 @@ class AvroRowDataDeSerializationSchemaTest { void testDeserializeNullRow(AvroEncoding encoding) throws Exception { final DataType dataType = ROW(FIELD("bool", BOOLEAN())).nullable(); AvroRowDataDeserializationSchema deserializationSchema = - createDeserializationSchema(dataType, encoding); + createDeserializationSchema(dataType, encoding, true); assertThat(deserializationSchema.deserialize(null)).isNull(); } @@ -166,9 +174,9 @@ class AvroRowDataDeSerializationSchemaTest { record.put(18, map2); AvroRowDataSerializationSchema serializationSchema = - createSerializationSchema(dataType, encoding); + createSerializationSchema(dataType, encoding, true); AvroRowDataDeserializationSchema deserializationSchema = - createDeserializationSchema(dataType, encoding); + createDeserializationSchema(dataType, encoding, true); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); GenericDatumWriter<IndexedRecord> datumWriter = new GenericDatumWriter<>(schema); @@ -292,9 +300,9 @@ class AvroRowDataDeSerializationSchemaTest { FIELD("type_time_millis", TIME(3).notNull())) .notNull(); AvroRowDataSerializationSchema serializationSchema = - createSerializationSchema(dataType, encoding); + createSerializationSchema(dataType, encoding, true); AvroRowDataDeserializationSchema deserializationSchema = - createDeserializationSchema(dataType, encoding); + createDeserializationSchema(dataType, encoding, true); RowData rowData = deserializationSchema.deserialize(input); byte[] output = serializationSchema.serialize(rowData); @@ -319,7 +327,7 @@ class AvroRowDataDeSerializationSchemaTest { void testSerializationWithTypesMismatch(AvroEncoding encoding) throws Exception { AvroRowDataSerializationSchema serializationSchema = createSerializationSchema( - ROW(FIELD("f0", INT()), FIELD("f1", STRING())).notNull(), encoding); + ROW(FIELD("f0", INT()), FIELD("f1", STRING())).notNull(), encoding, true); GenericRowData rowData = new GenericRowData(2); rowData.setField(0, 1); rowData.setField(1, 2); // This should be a STRING @@ -330,23 +338,90 @@ class AvroRowDataDeSerializationSchemaTest { .hasStackTraceContaining("Fail to serialize at field: f1"); } + @Test + void testTimestampTypeLegacyMapping() throws Exception { + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> testData = + AvroTestUtils.getTimestampTestData(); + + SpecificDatumWriter<Timestamps> datumWriter = new SpecificDatumWriter<>(Timestamps.class); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); + datumWriter.write((Timestamps) testData.f1, encoder); + encoder.flush(); + + DataType dataType = + AvroSchemaConverter.convertToDataType( + SpecificData.get().getSchema(Timestamps.class).toString()); + + // Timestamp with local timezone is converted to BigIntType + assertThat(dataType.getChildren().get(2)) + .isEqualTo(new AtomicDataType(new BigIntType(false))); + assertThat(dataType.getChildren().get(3)) + .isEqualTo(new AtomicDataType(new BigIntType(false))); + + assertThatThrownBy(() -> createSerializationSchema(dataType, AvroEncoding.BINARY, true)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); + + assertThatThrownBy(() -> createDeserializationSchema(dataType, AvroEncoding.BINARY, true)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); + } + + @Test + void testTimestampTypeNewMapping() throws Exception { + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> testData = + AvroTestUtils.getTimestampTestData(); + + SpecificDatumWriter<Timestamps> datumWriter = new SpecificDatumWriter<>(Timestamps.class); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); + datumWriter.write((Timestamps) testData.f1, encoder); + encoder.flush(); + byte[] input = byteArrayOutputStream.toByteArray(); + + DataType dataType = + AvroSchemaConverter.convertToDataType( + SpecificData.get().getSchema(Timestamps.class).toString(), false); + + AvroRowDataSerializationSchema serializationSchema = + createSerializationSchema(dataType, AvroEncoding.BINARY, false); + AvroRowDataDeserializationSchema deserializationSchema = + createDeserializationSchema(dataType, AvroEncoding.BINARY, false); + + RowData rowData = deserializationSchema.deserialize(input); + byte[] output = serializationSchema.serialize(rowData); + RowData rowData2 = deserializationSchema.deserialize(output); + assertThat(rowData2).isEqualTo(rowData); + + assertThat(rowData.getTimestamp(2, 3).toLocalDateTime().toString()) + .isEqualTo("2014-03-01T12:12:12.321"); + assertThat(rowData.getTimestamp(3, 6).toLocalDateTime().toString()) + .isEqualTo("1970-01-01T00:02:03.456"); + } + private AvroRowDataSerializationSchema createSerializationSchema( - DataType dataType, AvroEncoding encoding) throws Exception { + DataType dataType, AvroEncoding encoding, boolean legacyTimestampMapping) + throws Exception { final RowType rowType = (RowType) dataType.getLogicalType(); AvroRowDataSerializationSchema serializationSchema = - new AvroRowDataSerializationSchema(rowType, encoding); + new AvroRowDataSerializationSchema(rowType, encoding, legacyTimestampMapping); serializationSchema.open(null); return serializationSchema; } private AvroRowDataDeserializationSchema createDeserializationSchema( - DataType dataType, AvroEncoding encoding) throws Exception { + DataType dataType, AvroEncoding encoding, boolean legacyTimestampMapping) + throws Exception { final RowType rowType = (RowType) dataType.getLogicalType(); final TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType); AvroRowDataDeserializationSchema deserializationSchema = - new AvroRowDataDeserializationSchema(rowType, typeInfo, encoding); + new AvroRowDataDeserializationSchema( + rowType, typeInfo, encoding, legacyTimestampMapping); deserializationSchema.open(null); return deserializationSchema; } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java index c7bb6c52b40..1349ca2e03e 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java @@ -19,6 +19,7 @@ package org.apache.flink.formats.avro; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.formats.avro.utils.AvroTestUtils; import org.apache.flink.types.Row; import org.apache.flink.util.InstantiationUtil; @@ -26,33 +27,22 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.specific.SpecificRecord; -import org.junit.jupiter.api.Test; +import org.junit.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for the Avro serialization and deserialization schema. */ class AvroRowDeSerializationSchemaTest { - @Test - void testSpecificSerializeDeserializeFromClass() throws IOException { - final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = - AvroTestUtils.getSpecificTestData(); - - final AvroRowSerializationSchema serializationSchema = - new AvroRowSerializationSchema(testData.f0); - final AvroRowDeserializationSchema deserializationSchema = - new AvroRowDeserializationSchema(testData.f0); - - final byte[] bytes = serializationSchema.serialize(testData.f2); - final Row actual = deserializationSchema.deserialize(bytes); - - assertThat(actual).isEqualTo(testData.f2); - } - - @Test - void testSpecificSerializeDeserializeFromSchema() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSpecificSerializeDeserializeFromSchema(boolean legacyTimestampMapping) + throws IOException { final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData(); final String schemaString = testData.f1.getSchema().toString(); @@ -62,14 +52,20 @@ class AvroRowDeSerializationSchemaTest { final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString); - final byte[] bytes = serializationSchema.serialize(testData.f2); - final Row actual = deserializationSchema.deserialize(bytes); - - assertThat(actual).isEqualTo(testData.f2); + if (legacyTimestampMapping) { + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + assertThat(actual).isEqualTo(testData.f2); + } else { + final byte[] bytes = serializationSchema.serialize(testData.f2, false); + final Row actual = deserializationSchema.deserialize(bytes, false); + assertThat(actual).isEqualTo(testData.f2); + } } - @Test - void testGenericSerializeDeserialize() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGenericSerializeDeserialize(boolean legacyTimestampMapping) throws IOException { final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData(); final AvroRowSerializationSchema serializationSchema = @@ -77,14 +73,21 @@ class AvroRowDeSerializationSchemaTest { final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString()); - final byte[] bytes = serializationSchema.serialize(testData.f1); - final Row actual = deserializationSchema.deserialize(bytes); - - assertThat(actual).isEqualTo(testData.f1); + if (legacyTimestampMapping) { + final byte[] bytes = serializationSchema.serialize(testData.f1); + final Row actual = deserializationSchema.deserialize(bytes); + assertThat(actual).isEqualTo(testData.f1); + } else { + final byte[] bytes = serializationSchema.serialize(testData.f1, false); + final Row actual = deserializationSchema.deserialize(bytes, false); + assertThat(actual).isEqualTo(testData.f1); + } } - @Test - void testSpecificSerializeFromClassSeveralTimes() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSpecificSerializeFromClassSeveralTimes(boolean legacyTimestampMapping) + throws IOException { final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData(); @@ -93,16 +96,25 @@ class AvroRowDeSerializationSchemaTest { final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); - serializationSchema.serialize(testData.f2); - serializationSchema.serialize(testData.f2); - final byte[] bytes = serializationSchema.serialize(testData.f2); - final Row actual = deserializationSchema.deserialize(bytes); - - assertThat(actual).isEqualTo(testData.f2); + if (legacyTimestampMapping) { + serializationSchema.serialize(testData.f2); + serializationSchema.serialize(testData.f2); + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + assertThat(actual).isEqualTo(testData.f2); + } else { + serializationSchema.serialize(testData.f2, false); + serializationSchema.serialize(testData.f2, false); + final byte[] bytes = serializationSchema.serialize(testData.f2, false); + final Row actual = deserializationSchema.deserialize(bytes, false); + assertThat(actual).isEqualTo(testData.f2); + } } - @Test - void testSpecificSerializeFromSchemaSeveralTimes() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSpecificSerializeFromSchemaSeveralTimes(boolean legacyTimestampMapping) + throws IOException { final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData(); final String schemaString = testData.f1.getSchema().toString(); @@ -112,16 +124,24 @@ class AvroRowDeSerializationSchemaTest { final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString); - serializationSchema.serialize(testData.f2); - serializationSchema.serialize(testData.f2); - final byte[] bytes = serializationSchema.serialize(testData.f2); - final Row actual = deserializationSchema.deserialize(bytes); - - assertThat(actual).isEqualTo(testData.f2); + if (legacyTimestampMapping) { + serializationSchema.serialize(testData.f2); + serializationSchema.serialize(testData.f2); + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + assertThat(actual).isEqualTo(testData.f2); + } else { + serializationSchema.serialize(testData.f2, false); + serializationSchema.serialize(testData.f2, false); + final byte[] bytes = serializationSchema.serialize(testData.f2, false); + final Row actual = deserializationSchema.deserialize(bytes, false); + assertThat(actual).isEqualTo(testData.f2); + } } - @Test - void testGenericSerializeSeveralTimes() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGenericSerializeSeveralTimes(boolean legacyTimestampMapping) throws IOException { final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData(); final AvroRowSerializationSchema serializationSchema = @@ -129,16 +149,25 @@ class AvroRowDeSerializationSchemaTest { final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString()); - serializationSchema.serialize(testData.f1); - serializationSchema.serialize(testData.f1); - final byte[] bytes = serializationSchema.serialize(testData.f1); - final Row actual = deserializationSchema.deserialize(bytes); - - assertThat(actual).isEqualTo(testData.f1); + if (legacyTimestampMapping) { + serializationSchema.serialize(testData.f1); + serializationSchema.serialize(testData.f1); + final byte[] bytes = serializationSchema.serialize(testData.f1); + final Row actual = deserializationSchema.deserialize(bytes); + assertThat(actual).isEqualTo(testData.f1); + } else { + serializationSchema.serialize(testData.f1, false); + serializationSchema.serialize(testData.f1, false); + final byte[] bytes = serializationSchema.serialize(testData.f1, false); + final Row actual = deserializationSchema.deserialize(bytes, false); + assertThat(actual).isEqualTo(testData.f1); + } } - @Test - void testSpecificDeserializeFromClassSeveralTimes() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSpecificDeserializeFromClassSeveralTimes(boolean legacyTimestampMapping) + throws IOException { final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData(); @@ -147,16 +176,25 @@ class AvroRowDeSerializationSchemaTest { final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); - final byte[] bytes = serializationSchema.serialize(testData.f2); - deserializationSchema.deserialize(bytes); - deserializationSchema.deserialize(bytes); - final Row actual = deserializationSchema.deserialize(bytes); - - assertThat(actual).isEqualTo(testData.f2); + if (legacyTimestampMapping) { + final byte[] bytes = serializationSchema.serialize(testData.f2); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + final Row actual = deserializationSchema.deserialize(bytes); + assertThat(actual).isEqualTo(testData.f2); + } else { + final byte[] bytes = serializationSchema.serialize(testData.f2, false); + deserializationSchema.deserialize(bytes, false); + deserializationSchema.deserialize(bytes, false); + final Row actual = deserializationSchema.deserialize(bytes, false); + assertThat(actual).isEqualTo(testData.f2); + } } - @Test - void testSpecificDeserializeFromSchemaSeveralTimes() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSpecificDeserializeFromSchemaSeveralTimes(boolean legacyTimestampMapping) + throws IOException { final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData(); final String schemaString = testData.f1.getSchema().toString(); @@ -166,16 +204,24 @@ class AvroRowDeSerializationSchemaTest { final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString); - final byte[] bytes = serializationSchema.serialize(testData.f2); - deserializationSchema.deserialize(bytes); - deserializationSchema.deserialize(bytes); - final Row actual = deserializationSchema.deserialize(bytes); - - assertThat(actual).isEqualTo(testData.f2); + if (legacyTimestampMapping) { + final byte[] bytes = serializationSchema.serialize(testData.f2); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + final Row actual = deserializationSchema.deserialize(bytes); + assertThat(actual).isEqualTo(testData.f2); + } else { + final byte[] bytes = serializationSchema.serialize(testData.f2, false); + deserializationSchema.deserialize(bytes, false); + deserializationSchema.deserialize(bytes, false); + final Row actual = deserializationSchema.deserialize(bytes, false); + assertThat(actual).isEqualTo(testData.f2); + } } - @Test - void testGenericDeserializeSeveralTimes() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGenericDeserializeSeveralTimes(boolean legacyTimestampMapping) throws IOException { final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData(); final AvroRowSerializationSchema serializationSchema = @@ -183,16 +229,24 @@ class AvroRowDeSerializationSchemaTest { final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString()); - final byte[] bytes = serializationSchema.serialize(testData.f1); - deserializationSchema.deserialize(bytes); - deserializationSchema.deserialize(bytes); - final Row actual = deserializationSchema.deserialize(bytes); - - assertThat(actual).isEqualTo(testData.f1); + if (legacyTimestampMapping) { + final byte[] bytes = serializationSchema.serialize(testData.f1); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + final Row actual = deserializationSchema.deserialize(bytes); + assertThat(actual).isEqualTo(testData.f1); + } else { + final byte[] bytes = serializationSchema.serialize(testData.f1, false); + deserializationSchema.deserialize(bytes, false); + deserializationSchema.deserialize(bytes, false); + final Row actual = deserializationSchema.deserialize(bytes, false); + assertThat(actual).isEqualTo(testData.f1); + } } - @Test - void testSerializability() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSerializability(boolean legacyTimestampMapping) throws Exception { final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData(); final String schemaString = testData.f1.getSchema().toString(); @@ -201,17 +255,97 @@ class AvroRowDeSerializationSchemaTest { final AvroRowSerializationSchema classSer = new AvroRowSerializationSchema(testData.f0); final AvroRowDeserializationSchema classDeser = new AvroRowDeserializationSchema(testData.f0); - testSerializability(classSer, classDeser, testData.f2); + testSerializability(classSer, classDeser, testData.f2, legacyTimestampMapping); // from schema string final AvroRowSerializationSchema schemaSer = new AvroRowSerializationSchema(schemaString); final AvroRowDeserializationSchema schemaDeser = new AvroRowDeserializationSchema(schemaString); - testSerializability(schemaSer, schemaDeser, testData.f2); + testSerializability(schemaSer, schemaDeser, testData.f2, legacyTimestampMapping); + } + + void testTimestampSerializeDeserializeLegacyMapping() throws Exception { + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> testData = + AvroTestUtils.getTimestampTestData(); + + final String schemaString = testData.f1.getSchema().toString(); + + final AvroRowSerializationSchema serializationSchema = + new AvroRowSerializationSchema(schemaString); + final AvroRowDeserializationSchema deserializationSchema = + new AvroRowDeserializationSchema(schemaString); + + assertThatThrownBy( + () -> { + serializationSchema.serialize(testData.f3); + }) + .isInstanceOf(RuntimeException.class) + .hasMessage("Unsupported local timestamp type."); + + final byte[] bytes = serializationSchema.serialize(testData.f3, false); + + assertThatThrownBy( + () -> { + deserializationSchema.deserialize(bytes); + }) + .isInstanceOf(RuntimeException.class) + .hasMessage("Unsupported local timestamp type."); + } + + @Test + void testTimestampSpecificSerializeDeserializeNewMapping() throws Exception { + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> testData = + AvroTestUtils.getTimestampTestData(); + + final String schemaString = testData.f1.getSchema().toString(); + + final AvroRowSerializationSchema serializationSchema = + new AvroRowSerializationSchema(schemaString); + final AvroRowDeserializationSchema deserializationSchema = + new AvroRowDeserializationSchema(schemaString); + + final byte[] bytes = serializationSchema.serialize(testData.f3, false); + final Row actual = deserializationSchema.deserialize(bytes, false); + assertThat(actual).isEqualTo(testData.f3); + } + + @Test + void testTimestampGenericGenericSerializeDeserializeNewMapping() throws Exception { + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> testData = + AvroTestUtils.getTimestampTestData(); + + final String schemaString = testData.f2.getSchema().toString(); + + final AvroRowSerializationSchema serializationSchema = + new AvroRowSerializationSchema(schemaString); + final AvroRowDeserializationSchema deserializationSchema = + new AvroRowDeserializationSchema(schemaString); + + final byte[] bytes = serializationSchema.serialize(testData.f3); + final Row actual = deserializationSchema.deserialize(bytes); + assertThat(actual).isEqualTo(testData.f3); + } + + @Test + void testTimestampClassSerializeDeserializeNewMapping() throws Exception { + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> testData = + AvroTestUtils.getTimestampTestData(); + + final AvroRowSerializationSchema serializationSchema = + new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = + new AvroRowDeserializationSchema(testData.f0); + + final byte[] bytes = serializationSchema.serialize(testData.f3); + final Row actual = deserializationSchema.deserialize(bytes); + assertThat(actual).isEqualTo(testData.f3); } private void testSerializability( - AvroRowSerializationSchema ser, AvroRowDeserializationSchema deser, Row data) + AvroRowSerializationSchema ser, + AvroRowDeserializationSchema deser, + Row data, + boolean legacyTimestampMapping) throws Exception { final byte[] serBytes = InstantiationUtil.serializeObject(ser); final byte[] deserBytes = InstantiationUtil.serializeObject(deser); @@ -223,11 +357,18 @@ class AvroRowDeSerializationSchemaTest { InstantiationUtil.deserializeObject( deserBytes, Thread.currentThread().getContextClassLoader()); - final byte[] bytes = serCopy.serialize(data); - deserCopy.deserialize(bytes); - deserCopy.deserialize(bytes); - final Row actual = deserCopy.deserialize(bytes); - - assertThat(actual).isEqualTo(data); + if (legacyTimestampMapping) { + final byte[] bytes = serCopy.serialize(data); + deserCopy.deserialize(bytes); + deserCopy.deserialize(bytes); + final Row actual = deserCopy.deserialize(bytes); + assertThat(actual).isEqualTo(data); + } else { + final byte[] bytes = serCopy.serialize(data, false); + deserCopy.deserialize(bytes, false); + deserCopy.deserialize(bytes, false); + final Row actual = deserCopy.deserialize(bytes, false); + assertThat(actual).isEqualTo(data); + } } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java index 259f05c2ce9..f13a8200b15 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java @@ -21,6 +21,7 @@ package org.apache.flink.formats.avro.typeutils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.base.VoidSerializer; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding; import org.apache.flink.formats.avro.generated.User; @@ -41,6 +42,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -533,6 +535,47 @@ class AvroSchemaConverterTest { assertThat(schema).isEqualTo(new Schema.Parser().parse(schemaStr)); } + @Test + void testTimestampsSchemaToDataTypeToSchemaLegacyTimestampMapping() { + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> testData = + AvroTestUtils.getTimestampTestData(); + String schemaStr = testData.f1.getSchema().toString(); + DataType dataType = AvroSchemaConverter.convertToDataType(schemaStr); + assertThatThrownBy(() -> AvroSchemaConverter.convertToSchema(dataType.getLogicalType())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); + } + + @Test + void testTimestampsSchemaToTypeInfoLegacyTimestampMapping() { + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> testData = + AvroTestUtils.getTimestampTestData(); + String schemaStr = testData.f1.getSchema().toString(); + TypeInformation<Row> typeInfo = AvroSchemaConverter.convertToTypeInfo(schemaStr); + validateLegacyTimestampsSchema(typeInfo); + } + + @Test + void testTimestampsSchemaToDataTypeToSchemaNewMapping() { + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> testData = + AvroTestUtils.getTimestampTestData(); + String schemaStr = testData.f1.getSchema().toString(); + DataType dataType = AvroSchemaConverter.convertToDataType(schemaStr, false); + Schema schema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType(), false); + DataType dataType2 = AvroSchemaConverter.convertToDataType(schema.toString(), false); + validateTimestampsSchema(dataType2); + } + + @Test + void testTimestampsSchemaToTypeInfoNewMapping() { + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> testData = + AvroTestUtils.getTimestampTestData(); + String schemaStr = testData.f1.getSchema().toString(); + TypeInformation<Row> typeInfo = AvroSchemaConverter.convertToTypeInfo(schemaStr, false); + validateTimestampsSchema(typeInfo); + } + private void validateUserSchema(TypeInformation<?> actual) { final TypeInformation<Row> address = Types.ROW_NAMED( @@ -600,6 +643,78 @@ class AvroSchemaConverterTest { assertThat(userRowInfo.schemaEquals(actual)).isTrue(); } + private void validateTimestampsSchema(TypeInformation<?> actual) { + final TypeInformation<Row> timestamps = + Types.ROW_NAMED( + new String[] { + "type_timestamp_millis", + "type_timestamp_micros", + "type_local_timestamp_millis", + "type_local_timestamp_micros" + }, + Types.INSTANT, + Types.INSTANT, + Types.LOCAL_DATE_TIME, + Types.LOCAL_DATE_TIME); + final RowTypeInfo timestampsRowTypeInfo = (RowTypeInfo) timestamps; + assertThat(timestampsRowTypeInfo.schemaEquals(actual)).isTrue(); + } + + private void validateLegacyTimestampsSchema(TypeInformation<?> actual) { + final TypeInformation<Row> timestamps = + Types.ROW_NAMED( + new String[] { + "type_timestamp_millis", + "type_timestamp_micros", + "type_local_timestamp_millis", + "type_local_timestamp_micros" + }, + Types.SQL_TIMESTAMP, + Types.SQL_TIMESTAMP, + Types.LONG, + Types.LONG); + final RowTypeInfo timestampsRowTypeInfo = (RowTypeInfo) timestamps; + assertThat(timestampsRowTypeInfo.schemaEquals(actual)).isTrue(); + } + + private void validateLegacyTimestampsSchema(DataType actual) { + final DataType timestamps = + DataTypes.ROW( + DataTypes.FIELD( + "type_timestamp_millis", DataTypes.TIMESTAMP(3).notNull()), + DataTypes.FIELD( + "type_timestamp_micros", DataTypes.TIMESTAMP(6).notNull()), + DataTypes.FIELD( + "type_local_timestamp_millis", + DataTypes.BIGINT().notNull()), + DataTypes.FIELD( + "type_local_timestamp_micros", + DataTypes.BIGINT().notNull())) + .notNull(); + + assertThat(actual).isEqualTo(timestamps); + } + + private void validateTimestampsSchema(DataType actual) { + final DataType timestamps = + DataTypes.ROW( + DataTypes.FIELD( + "type_timestamp_millis", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull()), + DataTypes.FIELD( + "type_timestamp_micros", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull()), + DataTypes.FIELD( + "type_local_timestamp_millis", + DataTypes.TIMESTAMP(3).notNull()), + DataTypes.FIELD( + "type_local_timestamp_micros", + DataTypes.TIMESTAMP(6).notNull())) + .notNull(); + + assertThat(actual).isEqualTo(timestamps); + } + private void validateUserSchema(DataType actual) { final DataType address = DataTypes.ROW( diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java index 289d9ca0ce3..7827569df9a 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java @@ -111,8 +111,8 @@ class AvroTypeExtractionTest { + "\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " + "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", " + "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", " - + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", \"type_decimal_bytes\": \"\\u0007Ð\", " - + "\"type_decimal_fixed\": [7, -48]}\n" + + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", " + + "\"type_decimal_bytes\": \"\\u0007Ð\", \"type_decimal_fixed\": [7, -48]}\n" + "{\"name\": \"Charlie\", \"favorite_number\": null, " + "\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " + "\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], " @@ -123,7 +123,8 @@ class AvroTypeExtractionTest { + "\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " + "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", " + "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", " - + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", \"type_decimal_bytes\": \"\\u0007Ð\", " + + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", " + + "\"type_decimal_bytes\": \"\\u0007Ð\", " + "\"type_decimal_fixed\": [7, -48]}\n"; } @@ -162,8 +163,8 @@ class AvroTypeExtractionTest { + " \"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " + "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", " + "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", " - + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", \"type_decimal_bytes\": \"\\u0007Ð\", " - + "\"type_decimal_fixed\": [7, -48]}\n" + + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", " + + "\"type_decimal_bytes\": \"\\u0007Ð\", \"type_decimal_fixed\": [7, -48]}\n" + "{\"name\": \"Charlie\", \"favorite_number\": null, " + "\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " + "\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], " @@ -174,8 +175,8 @@ class AvroTypeExtractionTest { + "\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", " + "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", " + "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", " - + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", \"type_decimal_bytes\": \"\\u0007Ð\", " - + "\"type_decimal_fixed\": [7, -48]}\n"; + + "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", " + + "\"type_decimal_bytes\": \"\\u0007Ð\", \"type_decimal_fixed\": [7, -48]}\n"; } @ParameterizedTest diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java index 47fc67e710f..b4e12f8ca10 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java @@ -19,11 +19,13 @@ package org.apache.flink.formats.avro.utils; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding; import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.Colors; import org.apache.flink.formats.avro.generated.Fixed16; import org.apache.flink.formats.avro.generated.Fixed2; +import org.apache.flink.formats.avro.generated.Timestamps; import org.apache.flink.formats.avro.generated.User; import org.apache.flink.formats.avro.typeutils.AvroSerializerLargeGenericRecordTest; import org.apache.flink.types.Row; @@ -48,6 +50,7 @@ import java.sql.Time; import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.time.temporal.ChronoUnit; import java.util.Arrays; @@ -263,6 +266,53 @@ public final class AvroTestUtils { return t; } + public static Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> + getTimestampTestData() { + + final String schemaString = + "{\"type\":\"record\",\"name\":\"GenericTimestamps\",\"namespace\":\"org.apache.flink.formats.avro.generated\"," + + "\"fields\": [{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," + + "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," + + "\"logicalType\":\"timestamp-micros\"}},{\"name\": \"type_local_timestamp_millis\", \"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-millis\"}}," + + "{\"name\": \"type_local_timestamp_micros\", \"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\"}}]}"; + final Schema schema = new Schema.Parser().parse(schemaString); + final GenericRecord timestampRecord = new GenericData.Record(schema); + timestampRecord.put("type_timestamp_millis", Instant.parse("2014-03-01T12:12:12.321Z")); + timestampRecord.put( + "type_timestamp_micros", Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)); + timestampRecord.put( + "type_local_timestamp_millis", LocalDateTime.parse("2014-03-01T12:12:12.321")); + timestampRecord.put( + "type_local_timestamp_micros", LocalDateTime.parse("1970-01-01T00:00:00.123456")); + + final Timestamps timestamps = + Timestamps.newBuilder() + .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")) + .setTypeTimestampMicros( + Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)) + .setTypeLocalTimestampMillis(LocalDateTime.parse("2014-03-01T12:12:12.321")) + .setTypeLocalTimestampMicros( + LocalDateTime.parse("1970-01-01T00:00:00.123456")) + .build(); + + final Row timestampRow = new Row(4); + timestampRow.setField(0, Timestamp.valueOf("2014-03-01 12:12:12.321")); + timestampRow.setField( + 1, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))); + timestampRow.setField(2, Timestamp.valueOf(LocalDateTime.parse("2014-03-01T12:12:12.321"))); + timestampRow.setField( + 3, Timestamp.valueOf(LocalDateTime.parse("1970-01-01T00:00:00.123456"))); + + final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, GenericRecord, Row> t = + new Tuple4<>(); + t.f0 = Timestamps.class; + t.f1 = timestamps; + t.f2 = timestampRecord; + t.f3 = timestampRow; + + return t; + } + /** * Craft a large Avro Schema which contains more than 0xFFFF characters. * diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc index 858cf6face6..0462ec12539 100644 --- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc +++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc @@ -115,4 +115,15 @@ {"name": "type_date", "type": {"type": "int", "logicalType": "date"}}, {"name": "type_time_millis", "type": {"type": "int", "logicalType": "time-millis"}} ] - }] +}, +{"namespace": "org.apache.flink.formats.avro.generated", + "type": "record", + "name": "Timestamps", + "fields": [ + {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}}, + {"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": "timestamp-micros"}}, + {"name": "type_local_timestamp_millis", "type": {"type": "long", "logicalType": "local-timestamp-millis"}}, + {"name": "type_local_timestamp_micros", "type": {"type": "long", "logicalType": "local-timestamp-micros"}} + ] +} +]