wuchong commented on a change in pull request #13472: URL: https://github.com/apache/flink/pull/13472#discussion_r502180974
########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ########## @@ -359,13 +359,13 @@ public static Schema convertToSchema(LogicalType logicalType, int rowTypeCounter // we have to make sure the record name is different in a Schema SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder .builder() - .record("row_" + rowTypeCounter) + .record(rowName) .fields(); - rowTypeCounter++; for (int i = 0; i < rowType.getFieldCount(); i++) { + String fieldName = fieldNames.get(i); builder = builder - .name(fieldNames.get(i)) - .type(convertToSchema(rowType.getTypeAt(i), rowTypeCounter)) + .name(fieldName) + .type(convertToSchema(rowType.getTypeAt(i), fieldName)) Review comment: I think this doesn't work. Because field names may conflict. We may need to concat the parent row name and the field name as the child name. ########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ########## @@ -297,10 +297,10 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, 0); + return convertToSchema(logicalType, "row_0"); Review comment: I think this will conflict if a field name is "row_0". ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org