JingsongLi commented on a change in pull request #12133:
URL: https://github.com/apache/flink/pull/12133#discussion_r426129667
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -157,4 +166,118 @@ private AvroSchemaConverter() {
}
throw new IllegalArgumentException("Unsupported Avro type '" +
schema.getType() + "'.");
}
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro
schema.
+ *
+ * @param logicalType logical type
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(LogicalType logicalType) {
+ return convertToSchema(logicalType, 0);
+ }
+
+ private static Schema convertToSchema(LogicalType logicalType, int
rowTypeCounter) {
+ switch (logicalType.getTypeRoot()) {
+ case NULL:
+ return SchemaBuilder.builder().nullType();
+ case BOOLEAN:
+ return
getNullableBuilder(logicalType).booleanType();
+ case INTEGER:
+ return
getNullableBuilder(logicalType).intType();
+ case BIGINT:
+ return
getNullableBuilder(logicalType).longType();
+ case FLOAT:
+ return
getNullableBuilder(logicalType).floatType();
+ case DOUBLE:
+ return
getNullableBuilder(logicalType).doubleType();
+ case CHAR:
+ case VARCHAR:
+ return
getNullableBuilder(logicalType).stringType();
+ case BINARY:
+ case VARBINARY:
+ return
getNullableBuilder(logicalType).bytesType();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ // use long to represents Timestamp
+ final TimestampType timestampType =
(TimestampType) logicalType;
+ int precision = timestampType.getPrecision();
+ org.apache.avro.LogicalType avroLogicalType;
+ if (precision == 3) {
+ avroLogicalType =
LogicalTypes.timestampMillis();
+ } else if (precision == 9) {
+ avroLogicalType =
LogicalTypes.timestampMicros();
+ } else {
+ throw new
IllegalArgumentException("Avro Timestamp does not support Timestamp with
precision: " +
+ precision +
+ ", it only supports precision
of 3 or 9.");
+ }
+ return
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+ case DATE:
+ // use int to represents Date
+ return
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+ case TIME_WITHOUT_TIME_ZONE:
+ // use int to represents Time, we only support
millisecond when deserialization
+ return
LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType)
logicalType;
+ // store BigDecimal as byte[]
+ return LogicalTypes
+ .decimal(decimalType.getPrecision(),
decimalType.getScale())
+
.addToSchema(SchemaBuilder.builder().bytesType());
+ case ROW:
+ RowType rowType = (RowType) logicalType;
+ List<String> fieldNames =
rowType.getFieldNames();
+ // we have to make sure the record name is
different in a Schema
+ SchemaBuilder.FieldAssembler<Schema> builder =
SchemaBuilder
+ .builder()
+ .record("row_" + rowTypeCounter)
+ .fields();
+ rowTypeCounter++;
+ for (int i = 0; i < rowType.getFieldCount();
i++) {
+ builder = builder
+ .name(fieldNames.get(i))
+
.type(convertToSchema(rowType.getTypeAt(i), rowTypeCounter))
+ .noDefault();
+ }
+ return builder.endRecord();
+ case MAP:
+ MapType mapType = (MapType) logicalType;
+ if (!hasFamily(mapType.getKeyType(),
LogicalTypeFamily.CHARACTER_STRING)) {
+ throw new IllegalArgumentException(
+ "Avro assumes map keys are
strings, " + mapType.getKeyType() +
+ " type as key type is
not supported.");
+ }
+ return SchemaBuilder
+ .builder()
+ .nullable() // will be UNION of Array
and null
Review comment:
Remove this comments.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]