leonardBang commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1461418192
##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java:
##########
@@ -317,6 +408,21 @@ public static Schema convertToSchema(LogicalType schema) {
* @return Avro's {@link Schema} matching this logical type.
*/
public static Schema convertToSchema(LogicalType logicalType, String
rowName) {
+ return convertToSchema(logicalType, rowName, true);
+ }
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro
schema.
+ *
+ * <p>The "{rowName}_" is used as the nested row type name prefix in order
to generate the right
+ * schema. Nested record type that only differs with type name is still
compatible.
+ *
+ * @param logicalType logical type
+ * @param rowName the record name
Review Comment:
also add annotation for param `legacyTimestampMapping` ?
##########
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java:
##########
@@ -87,16 +104,83 @@ void testSeDeSchema(AvroEncoding encoding) {
assertThat(actualSer).isEqualTo(expectedSer);
}
+ @Test
+ void testOldSeDeNewSchema() {
+ assertThatThrownBy(
+ () -> {
+ new AvroRowDataDeserializationSchema(
+ NEW_ROW_TYPE,
InternalTypeInfo.of(NEW_ROW_TYPE));
+ })
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Avro does not support TIMESTAMP type with precision:
6, it only supports precision less than 3.");
+
+ assertThatThrownBy(
+ () -> {
+ new AvroRowDataSerializationSchema(NEW_ROW_TYPE);
+ })
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Avro does not support TIMESTAMP type with precision:
6, it only supports precision less than 3.");
+ }
+
+ @Test
+ void testNewSeDeNewSchema() {
+ testSeDeSchema(NEW_ROW_TYPE, NEW_SCHEMA, false);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testSeDeSchema(boolean legacyMapping) {
+ testSeDeSchema(ROW_TYPE, SCHEMA, legacyMapping);
+ }
+
+ void testSeDeSchema(RowType rowType, ResolvedSchema schema, boolean
legacyMapping) {
+ final AvroRowDataDeserializationSchema expectedDeser =
+ new AvroRowDataDeserializationSchema(
+ rowType, InternalTypeInfo.of(rowType),
AvroEncoding.BINARY, legacyMapping);
+
+ final Map<String, String> options = getAllOptions(legacyMapping);
+
+ final DynamicTableSource actualSource =
FactoryMocks.createTableSource(schema, options);
+
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
+ TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+ DeserializationSchema<RowData> actualDeser =
+ scanSourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE,
schema.toPhysicalRowDataType());
+
+ assertThat(actualDeser).isEqualTo(expectedDeser);
+
+ final AvroRowDataSerializationSchema expectedSer =
+ new AvroRowDataSerializationSchema(rowType,
AvroEncoding.BINARY, legacyMapping);
+
+ final DynamicTableSink actualSink =
FactoryMocks.createTableSink(schema, options);
+
assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+ SerializationSchema<RowData> actualSer =
+ sinkMock.valueFormat.createRuntimeEncoder(null,
schema.toPhysicalRowDataType());
+
+ assertThat(actualSer).isEqualTo(expectedSer);
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
- private Map<String, String> getAllOptions() {
+ private Map<String, String> getAllOptions(boolean legacyMapping) {
Review Comment:
legacyTimestampMapping ?
##########
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java:
##########
@@ -330,23 +338,87 @@ void testSerializationWithTypesMismatch(AvroEncoding
encoding) throws Exception
.hasStackTraceContaining("Fail to serialize at field: f1");
}
+ @Test
+ void testTimestampTypeLegacyMapping() throws Exception {
+ final Tuple4<Class<? extends SpecificRecord>, SpecificRecord,
GenericRecord, Row> testData =
+ AvroTestUtils.getTimestampTestData();
+
+ SpecificDatumWriter<Timestamps> datumWriter = new
SpecificDatumWriter<>(Timestamps.class);
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ Encoder encoder =
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
+ datumWriter.write((Timestamps) testData.f1, encoder);
+ encoder.flush();
+
+ DataType dataType =
+ AvroSchemaConverter.convertToDataType(
+
SpecificData.get().getSchema(Timestamps.class).toString());
+
+ // Timestamp with local timezone is converted to BigIntType
+ assertThat(dataType.getChildren().get(2))
+ .isEqualTo(new AtomicDataType(new BigIntType(false)));
+ assertThat(dataType.getChildren().get(3))
+ .isEqualTo(new AtomicDataType(new BigIntType(false)));
+
+ assertThatThrownBy(() -> createSerializationSchema(dataType,
AvroEncoding.BINARY, true))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Avro does not support TIMESTAMP type with precision:
6, it only supports precision less than 3.");
+
+ assertThatThrownBy(() -> createDeserializationSchema(dataType,
AvroEncoding.BINARY, true))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Avro does not support TIMESTAMP type with precision:
6, it only supports precision less than 3.");
Review Comment:
Duplicated cases?
##########
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java:
##########
@@ -181,7 +181,6 @@ private void output(final AvroOutputFormat<User>
outputFormat) throws IOExceptio
user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L,
ChronoUnit.MICROS));
user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L,
ChronoUnit.MICROS));
-
Review Comment:
useless change
##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java:
##########
@@ -270,16 +328,34 @@ private static DataType convertToDataType(Schema schema) {
}
return DataTypes.INT().notNull();
case LONG:
- // logical timestamp type
- if (schema.getLogicalType() == LogicalTypes.timestampMillis())
{
- return DataTypes.TIMESTAMP(3).notNull();
- } else if (schema.getLogicalType() ==
LogicalTypes.timestampMicros()) {
- return DataTypes.TIMESTAMP(6).notNull();
- } else if (schema.getLogicalType() ==
LogicalTypes.timeMillis()) {
- return DataTypes.TIME(3).notNull();
- } else if (schema.getLogicalType() ==
LogicalTypes.timeMicros()) {
- return DataTypes.TIME(6).notNull();
+ if (legacyMapping) {
+ // logical timestamp type
Review Comment:
minor: Avro logical timestamp types to Flink SQL timestamp types
##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java:
##########
@@ -159,12 +190,24 @@ private static TypeInformation<?>
convertToTypeInfo(Schema schema) {
}
return Types.INT;
case LONG:
- // logical timestamp type
- if (schema.getLogicalType() == LogicalTypes.timestampMillis()
- || schema.getLogicalType() ==
LogicalTypes.timestampMicros()) {
- return Types.SQL_TIMESTAMP;
- } else if (schema.getLogicalType() ==
LogicalTypes.timeMicros()) {
- return Types.SQL_TIME;
+ if (legacyMapping) {
+ if (schema.getLogicalType() ==
LogicalTypes.timestampMillis()
+ || schema.getLogicalType() ==
LogicalTypes.timestampMicros()) {
+ return Types.SQL_TIMESTAMP;
+ } else if (schema.getLogicalType() ==
LogicalTypes.timeMicros()
+ || schema.getLogicalType() ==
LogicalTypes.timeMillis()) {
+ return Types.SQL_TIME;
+ }
+ } else {
+ if (schema.getLogicalType() ==
LogicalTypes.timestampMillis()
+ || schema.getLogicalType() ==
LogicalTypes.timestampMicros()
+ || schema.getLogicalType() ==
LogicalTypes.localTimestampMillis()
+ || schema.getLogicalType() ==
LogicalTypes.localTimestampMicros()) {
+ return Types.SQL_TIMESTAMP;
Review Comment:
Shouldn't we use same mapping for DataStream API like following ?
```
// Avro logical timestamp types to Flink DataStream timestamp types
if (schema.getLogicalType() == LogicalTypes.timestampMillis()
|| schema.getLogicalType() ==
LogicalTypes.timestampMicros()) {
return Types.INSTANT;
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()
|| schema.getLogicalType() ==
LogicalTypes.localTimestampMicros()) {
return Types.LOCAL_DATE_TIME;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]