JingsongLi commented on a change in pull request #12133:
URL: https://github.com/apache/flink/pull/12133#discussion_r426129528
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -157,4 +166,118 @@ private AvroSchemaConverter() {
}
throw new IllegalArgumentException("Unsupported Avro type '" +
schema.getType() + "'.");
}
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro
schema.
+ *
+ * @param logicalType logical type
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(LogicalType logicalType) {
+ return convertToSchema(logicalType, 0);
+ }
+
+ private static Schema convertToSchema(LogicalType logicalType, int
rowTypeCounter) {
+ switch (logicalType.getTypeRoot()) {
+ case NULL:
+ return SchemaBuilder.builder().nullType();
+ case BOOLEAN:
+ return
getNullableBuilder(logicalType).booleanType();
+ case INTEGER:
+ return
getNullableBuilder(logicalType).intType();
+ case BIGINT:
+ return
getNullableBuilder(logicalType).longType();
+ case FLOAT:
+ return
getNullableBuilder(logicalType).floatType();
+ case DOUBLE:
+ return
getNullableBuilder(logicalType).doubleType();
+ case CHAR:
+ case VARCHAR:
+ return
getNullableBuilder(logicalType).stringType();
+ case BINARY:
+ case VARBINARY:
+ return
getNullableBuilder(logicalType).bytesType();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
Review comment:
Not support now, we can support later.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]