linliu-code commented on code in PR #13288:
URL: https://github.com/apache/hudi/pull/13288#discussion_r2098875255
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1321,6 +1323,64 @@ private static boolean needsRewriteToString(Schema
schema, boolean isEnum) {
}
}
+ /**
+ * Used to repair the schema after conversion from StructType to avro.
Timestamp logical types are assumed
+ * to be micros so we use the table schema to repair that field
+ */
+ public static Schema repairSchema(Schema convertedSchema, Schema
tableSchema) {
+ if (convertedSchema.getType() == tableSchema.getType()) {
+ if (convertedSchema.getType() == Schema.Type.RECORD) {
+ List<Schema.Field> fields = new
ArrayList<>(convertedSchema.getFields().size());
+ for (Schema.Field convertedField : convertedSchema.getFields()) {
+ Schema.Field tableField =
tableSchema.getField(convertedField.name());
+ if (tableField == null) {
+ throw new IllegalArgumentException("Missing field: " +
convertedField.name());
+ }
+ fields.add(new Schema.Field(
+ tableField.name(),
+ repairSchema(convertedField.schema(), tableField.schema()),
+ tableField.doc(),
+ tableField.defaultVal()
+ ));
+ }
+ return createNewSchemaFromFieldsWithReference(convertedSchema, fields);
+ } else if (convertedSchema.getType() == Schema.Type.ARRAY) {
+ return
Schema.createArray(repairSchema(convertedSchema.getElementType(),
tableSchema.getElementType()));
+ } else if (convertedSchema.getType() == Schema.Type.MAP) {
+ return Schema.createMap(repairSchema(convertedSchema.getValueType(),
tableSchema.getValueType()));
+ } else if (convertedSchema.getType() == Schema.Type.UNION) {
+ List<Schema> sourceNestedSchemas = convertedSchema.getTypes();
+ List<Schema> targetNestedSchemas = tableSchema.getTypes();
+ if (sourceNestedSchemas.size() != targetNestedSchemas.size()) {
+ throw new IllegalArgumentException("Union sizes do not match.");
+ }
+ List<Schema> types = new ArrayList<>(sourceNestedSchemas.size());
+ for (int i = 0; i < sourceNestedSchemas.size(); i++) {
+ types.add(repairSchema(sourceNestedSchemas.get(i),
targetNestedSchemas.get(i)));
+ }
+ return Schema.createUnion(types);
+ }
+ }
+
+ // Only repair timestamp-millis <-> timestamp-micros logical types
+ if (convertedSchema.getType() == Schema.Type.LONG && tableSchema.getType()
== Schema.Type.LONG) {
Review Comment:
We can also fix the Enum -> String -> Enum issue here. I think the logically
they are the same. CC: @yihua , @nsivabalan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]