JingsongLi commented on a change in pull request #12133:
URL: https://github.com/apache/flink/pull/12133#discussion_r426131440



##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -157,4 +166,118 @@ private AvroSchemaConverter() {
                }
                throw new IllegalArgumentException("Unsupported Avro type '" + 
schema.getType() + "'.");
        }
+
+       /**
+        * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
+        *
+        * @param logicalType logical type
+        * @return Avro's {@link Schema} matching this logical type.
+        */
+       public static Schema convertToSchema(LogicalType logicalType) {
+               return convertToSchema(logicalType, 0);
+       }
+
+       private static Schema convertToSchema(LogicalType logicalType, int 
rowTypeCounter) {
+               switch (logicalType.getTypeRoot()) {
+                       case NULL:
+                               return SchemaBuilder.builder().nullType();
+                       case BOOLEAN:
+                               return 
getNullableBuilder(logicalType).booleanType();
+                       case INTEGER:
+                               return 
getNullableBuilder(logicalType).intType();
+                       case BIGINT:
+                               return 
getNullableBuilder(logicalType).longType();
+                       case FLOAT:
+                               return 
getNullableBuilder(logicalType).floatType();
+                       case DOUBLE:
+                               return 
getNullableBuilder(logicalType).doubleType();
+                       case CHAR:
+                       case VARCHAR:
+                               return 
getNullableBuilder(logicalType).stringType();
+                       case BINARY:
+                       case VARBINARY:
+                               return 
getNullableBuilder(logicalType).bytesType();
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               // use long to represents Timestamp
+                               final TimestampType timestampType = 
(TimestampType) logicalType;
+                               int precision = timestampType.getPrecision();
+                               org.apache.avro.LogicalType avroLogicalType;
+                               if (precision == 3) {
+                                       avroLogicalType = 
LogicalTypes.timestampMillis();
+                               } else if (precision == 9) {
+                                       avroLogicalType = 
LogicalTypes.timestampMicros();
+                               } else {
+                                       throw new 
IllegalArgumentException("Avro Timestamp does not support Timestamp with 
precision: " +
+                                               precision +
+                                               ", it only supports precision 
of 3 or 9.");
+                               }
+                               return 
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+                       case DATE:
+                               // use int to represents Date
+                               return 
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+                       case TIME_WITHOUT_TIME_ZONE:
+                               // use int to represents Time, we only support 
millisecond when deserialization
+                               return 
LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+                       case DECIMAL:
+                               DecimalType decimalType = (DecimalType) 
logicalType;
+                               // store BigDecimal as byte[]
+                               return LogicalTypes
+                                       .decimal(decimalType.getPrecision(), 
decimalType.getScale())
+                                       
.addToSchema(SchemaBuilder.builder().bytesType());
+                       case ROW:
+                               RowType rowType = (RowType) logicalType;
+                               List<String> fieldNames = 
rowType.getFieldNames();
+                               // we have to make sure the record name is 
different in a Schema
+                               SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder
+                                       .builder()
+                                       .record("row_" + rowTypeCounter)
+                                       .fields();
+                               rowTypeCounter++;
+                               for (int i = 0; i < rowType.getFieldCount(); 
i++) {
+                                       builder = builder
+                                               .name(fieldNames.get(i))
+                                               
.type(convertToSchema(rowType.getTypeAt(i), rowTypeCounter))
+                                               .noDefault();
+                               }
+                               return builder.endRecord();
+                       case MAP:

Review comment:
       Will correct all multimap type support.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to