dawidwys commented on a change in pull request #13763:
URL: https://github.com/apache/flink/pull/13763#discussion_r511842204
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -417,4 +431,11 @@ public static LogicalType
extractValueTypeToAvroMap(LogicalType type) {
}
return builder;
}
+
+ /** Returns schema with nullable true. */
+ private static Schema nullableSchema(Schema schema) {
Review comment:
Could we stick to a single way of declaring `Schema` nullable? With this
PR we have two methods for the same purpose:
* `nullableSchema`
* `getNullableBuilder`
Either use the `nullableSchema` everywhere or use
`getNullableBuilder(...).type(...)`.
##########
File path:
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
##########
@@ -104,48 +104,115 @@ public void testRowTypeAvroSchemaConversion() {
DataTypes.FIELD("row3",
DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING())))))
.build().toRowDataType().getLogicalType();
Schema schema = AvroSchemaConverter.convertToSchema(rowType);
- assertEquals("{\n" +
+ assertEquals("[ {\n" +
Review comment:
I think it would be nice to add a test that we can convert back and
forth between `DataType` and `Schema` in respect to the field names.
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -362,7 +369,11 @@ public static Schema convertToSchema(LogicalType
logicalType, String rowName) {
.record(rowName)
.fields();
for (int i = 0; i < rowType.getFieldCount();
i++) {
- String fieldName = rowName + "_" +
fieldNames.get(i);
+ String fieldName = fieldNames.get(i);
+ if (rowName.equals(fieldName)) {
+ // Can not build schema when
the record and field have the same name
+ fieldName = rowName + "_" +
fieldName;
+ }
Review comment:
I think the correct solution will be:
```
RowType rowType = (RowType) logicalType;
List<String> fieldNames =
rowType.getFieldNames();
// we have to make sure the record name is
different in a Schema
SchemaBuilder.FieldAssembler<Schema> builder =
getNullableBuilder(logicalType)
.record(rowName)
.fields();
for (int i = 0; i < rowType.getFieldCount();
i++) {
String fieldName = fieldNames.get(i);
builder = builder
.name(fieldName)
.type(convertToSchema(rowType.getTypeAt(i), rowName + "_" + fieldName))
.noDefault();
}
return builder.endRecord();
```
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -362,7 +369,11 @@ public static Schema convertToSchema(LogicalType
logicalType, String rowName) {
.record(rowName)
.fields();
for (int i = 0; i < rowType.getFieldCount();
i++) {
- String fieldName = rowName + "_" +
fieldNames.get(i);
+ String fieldName = fieldNames.get(i);
+ if (rowName.equals(fieldName)) {
+ // Can not build schema when
the record and field have the same name
+ fieldName = rowName + "_" +
fieldName;
+ }
Review comment:
That is not correct. The builder does support same names for fields in
different nested levels.
Avro in general does not support same record types with different schemas.
And it does it rightly so. Therefore a schema like:
```
{
"type": "record",
"name": "top",
"fields": [
{
"name": "top",
"type": {
"type": "record",
"name": "nested",
"fields": [
{"type": "string", "name": "top"}
]
}
}
]
}
```
is valid and supported. However if we change the name of the `nested` record
to `top` it will be invalid:
```
{
"type": "record",
"name": "top",
"fields": [
{
"name": "top",
"type": {
"type": "record",
"name": "top",
"fields": [
{"type": "string", "name": "top"}
]
}
}
]
}
```
I think the core problem lays in how the `rowName` is generated. I think we
should never adjust the `fieldName`, but we should append the `fieldName` to
the `rowName`.
BTW another shortcoming that I see is that we are losing the record name
when converting from `Schema` to `DataType`. I think it is not a real issue
though.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]