the-other-tim-brown commented on code in PR #17763:
URL: https://github.com/apache/hudi/pull/17763#discussion_r2655529568
##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -116,96 +118,112 @@ public AvroSchemaConverterWithTimestampNTZ(Configuration
conf) {
this.pathsToInt96 = new
HashSet<>(Arrays.asList(conf.getStrings("parquet.avro.writeFixedAsInt96", new
String[0])));
}
- /**
- * Given a schema, check to see if it is a union of a null type and a
regular schema,
- * and then return the non-null sub-schema. Otherwise, return the given
schema.
- *
- * @param schema The schema to check
- * @return The non-null portion of a union schema, or the given schema
- */
- public static Schema getNonNull(Schema schema) {
- if (schema.getType().equals(Schema.Type.UNION)) {
- List<Schema> schemas = schema.getTypes();
- if (schemas.size() == 2) {
- if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
- return schemas.get(1);
- } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
- return schemas.get(0);
- } else {
- return schema;
- }
- } else {
- return schema;
- }
- } else {
- return schema;
- }
- }
-
@Override
- public MessageType convert(Schema avroSchema) {
- if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
- throw new IllegalArgumentException("Avro schema must be a record.");
+ public MessageType convert(HoodieSchema schema) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ throw new IllegalArgumentException("Hoodie schema must be a record.");
}
- return new MessageType(avroSchema.getFullName(),
convertFields(avroSchema.getFields(), ""));
+ return new MessageType(schema.getFullName(),
convertFields(schema.getFields(), ""));
}
- private List<Type> convertFields(List<Schema.Field> fields, String
schemaPath) {
+ private List<Type> convertFields(List<HoodieSchemaField> fields, String
schemaPath) {
List<Type> types = new ArrayList<Type>();
Review Comment:
Nit: size this array to the incoming fields size
##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -267,121 +271,123 @@ private Type convertUnion(String fieldName, Schema
schema, Type.Repetition repet
}
}
- private Type convertUnionToGroupType(String fieldName, Type.Repetition
repetition, List<Schema> nonNullSchemas,
+ private Type convertUnionToGroupType(String fieldName, Type.Repetition
repetition, List<HoodieSchema> nonNullSchemas,
String schemaPath) {
List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
int index = 0;
- for (Schema childSchema : nonNullSchemas) {
+ for (HoodieSchema childSchema : nonNullSchemas) {
unionTypes.add( convertField("member" + index++, childSchema,
Type.Repetition.OPTIONAL, schemaPath));
}
return new GroupType(repetition, fieldName, unionTypes);
}
- private Type convertField(Schema.Field field, String schemaPath) {
+ private Type convertField(HoodieSchemaField field, String schemaPath) {
return convertField(field.name(), field.schema(), schemaPath);
}
@Override
- public Schema convert(MessageType parquetSchema) {
+ public HoodieSchema convert(MessageType parquetSchema) {
return convertFields(parquetSchema.getName(), parquetSchema.getFields(),
new HashMap<>());
}
- Schema convert(GroupType parquetSchema) {
+ HoodieSchema convert(GroupType parquetSchema) {
return convertFields(parquetSchema.getName(), parquetSchema.getFields(),
new HashMap<>());
}
- private Schema convertFields(String name, List<Type> parquetFields,
Map<String, Integer> names) {
- List<Schema.Field> fields = new ArrayList<Schema.Field>();
+ private HoodieSchema convertFields(String name, List<Type> parquetFields,
Map<String, Integer> names) {
+ List<HoodieSchemaField> fields = new ArrayList<>();
Review Comment:
Similarly, size this list to the incoming list size
##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -116,96 +118,112 @@ public AvroSchemaConverterWithTimestampNTZ(Configuration
conf) {
this.pathsToInt96 = new
HashSet<>(Arrays.asList(conf.getStrings("parquet.avro.writeFixedAsInt96", new
String[0])));
}
- /**
- * Given a schema, check to see if it is a union of a null type and a
regular schema,
- * and then return the non-null sub-schema. Otherwise, return the given
schema.
- *
- * @param schema The schema to check
- * @return The non-null portion of a union schema, or the given schema
- */
- public static Schema getNonNull(Schema schema) {
- if (schema.getType().equals(Schema.Type.UNION)) {
- List<Schema> schemas = schema.getTypes();
- if (schemas.size() == 2) {
- if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
- return schemas.get(1);
- } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
- return schemas.get(0);
- } else {
- return schema;
- }
- } else {
- return schema;
- }
- } else {
- return schema;
- }
- }
-
@Override
- public MessageType convert(Schema avroSchema) {
- if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
- throw new IllegalArgumentException("Avro schema must be a record.");
+ public MessageType convert(HoodieSchema schema) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ throw new IllegalArgumentException("Hoodie schema must be a record.");
}
- return new MessageType(avroSchema.getFullName(),
convertFields(avroSchema.getFields(), ""));
+ return new MessageType(schema.getFullName(),
convertFields(schema.getFields(), ""));
}
- private List<Type> convertFields(List<Schema.Field> fields, String
schemaPath) {
+ private List<Type> convertFields(List<HoodieSchemaField> fields, String
schemaPath) {
List<Type> types = new ArrayList<Type>();
- for (Schema.Field field : fields) {
- if (field.schema().getType().equals(Schema.Type.NULL)) {
+ for (HoodieSchemaField field : fields) {
+ if (field.schema().getType() == HoodieSchemaType.NULL) {
continue; // Avro nulls are not encoded, unless they are null unions
Review Comment:
nit: update comment to remove `Avro`
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java:
##########
@@ -181,21 +181,21 @@ private static StorageConfiguration<?>
tryOverrideDefaultConfigs(StorageConfigur
return conf;
}
- private ClosableIterator<IndexedRecord>
getIndexedRecordIteratorInternal(Schema schema, Map<String, String>
renamedColumns) throws IOException {
+ private ClosableIterator<IndexedRecord>
getIndexedRecordIteratorInternal(HoodieSchema schema, Map<String, String>
renamedColumns) throws IOException {
// NOTE: We have to set both Avro read-schema and projection schema to make
// sure that in case the file-schema is not equal to read-schema
we'd still
// be able to read that file (in case projection is a proper one)
Configuration hadoopConf =
storage.getConf().unwrapCopyAs(Configuration.class);
//TODO boundary for now to revisit in later pr to use HoodieSchema
- Schema repairedFileSchema =
AvroSchemaRepair.repairLogicalTypes(getSchema().getAvroSchema(), schema);
+ Schema repairedFileSchema =
AvroSchemaRepair.repairLogicalTypes(getSchema().toAvroSchema(),
schema.toAvroSchema());
Option<Schema> promotedSchema = Option.empty();
- if (!renamedColumns.isEmpty() ||
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema,
schema)) {
+ if (!renamedColumns.isEmpty() ||
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema,
schema.toAvroSchema())) {
Review Comment:
Will `recordNeedsRewriteForExtendedAvroTypePromotion` be handled in another
PR?
##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -267,121 +271,123 @@ private Type convertUnion(String fieldName, Schema
schema, Type.Repetition repet
}
}
- private Type convertUnionToGroupType(String fieldName, Type.Repetition
repetition, List<Schema> nonNullSchemas,
+ private Type convertUnionToGroupType(String fieldName, Type.Repetition
repetition, List<HoodieSchema> nonNullSchemas,
String schemaPath) {
List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
int index = 0;
- for (Schema childSchema : nonNullSchemas) {
+ for (HoodieSchema childSchema : nonNullSchemas) {
unionTypes.add( convertField("member" + index++, childSchema,
Type.Repetition.OPTIONAL, schemaPath));
}
return new GroupType(repetition, fieldName, unionTypes);
}
- private Type convertField(Schema.Field field, String schemaPath) {
+ private Type convertField(HoodieSchemaField field, String schemaPath) {
return convertField(field.name(), field.schema(), schemaPath);
}
@Override
- public Schema convert(MessageType parquetSchema) {
+ public HoodieSchema convert(MessageType parquetSchema) {
return convertFields(parquetSchema.getName(), parquetSchema.getFields(),
new HashMap<>());
}
- Schema convert(GroupType parquetSchema) {
+ HoodieSchema convert(GroupType parquetSchema) {
return convertFields(parquetSchema.getName(), parquetSchema.getFields(),
new HashMap<>());
}
- private Schema convertFields(String name, List<Type> parquetFields,
Map<String, Integer> names) {
- List<Schema.Field> fields = new ArrayList<Schema.Field>();
+ private HoodieSchema convertFields(String name, List<Type> parquetFields,
Map<String, Integer> names) {
+ List<HoodieSchemaField> fields = new ArrayList<>();
Integer nameCount = names.merge(name, 1, (oldValue, value) -> oldValue +
1);
for (Type parquetType : parquetFields) {
- Schema fieldSchema = convertField(parquetType, names);
+ HoodieSchema fieldSchema = convertField(parquetType, names);
if (parquetType.isRepetition(REPEATED)) {
throw new UnsupportedOperationException("REPEATED not supported
outside LIST or MAP. Type: " + parquetType);
} else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
- fields.add(new Schema.Field(
+ fields.add(HoodieSchemaField.of(
parquetType.getName(), optional(fieldSchema), null, NULL_VALUE));
} else { // REQUIRED
- fields.add(new Schema.Field(
+ fields.add(HoodieSchemaField.of(
parquetType.getName(), fieldSchema, null, (Object) null));
}
}
- Schema schema = Schema.createRecord(name, null, nameCount > 1 ? name +
nameCount : null, false);
- schema.setFields(fields);
+ HoodieSchema schema = HoodieSchema.createRecord(name, null, nameCount > 1
? name + nameCount : null, false, fields);
return schema;
}
- private Schema convertField(final Type parquetType, Map<String, Integer>
names) {
+ private HoodieSchema convertField(final Type parquetType, Map<String,
Integer> names) {
if (parquetType.isPrimitive()) {
final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
final PrimitiveTypeName parquetPrimitiveTypeName =
asPrimitive.getPrimitiveTypeName();
final LogicalTypeAnnotation annotation =
parquetType.getLogicalTypeAnnotation();
- Schema schema = parquetPrimitiveTypeName.convert(
- new PrimitiveType.PrimitiveTypeNameConverter<Schema,
RuntimeException>() {
+
+ // Handle logical type annotations directly with HoodieSchema creation
methods
+ if (annotation != null) {
+ HoodieSchema logicalSchema =
convertLogicalTypeAnnotationToHoodieSchema(annotation, parquetType);
+ if (logicalSchema != null) {
+ return logicalSchema;
+ }
+ }
+
+ // Fallback to basic type conversion if no logical type annotation
+ HoodieSchema schema = parquetPrimitiveTypeName.convert(
+ new PrimitiveType.PrimitiveTypeNameConverter<HoodieSchema,
RuntimeException>() {
@Override
- public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
- return Schema.create(Schema.Type.BOOLEAN);
+ public HoodieSchema convertBOOLEAN(PrimitiveTypeName
primitiveTypeName) {
+ return HoodieSchema.create(HoodieSchemaType.BOOLEAN);
}
@Override
- public Schema convertINT32(PrimitiveTypeName primitiveTypeName) {
- return Schema.create(Schema.Type.INT);
+ public HoodieSchema convertINT32(PrimitiveTypeName
primitiveTypeName) {
+ return HoodieSchema.create(HoodieSchemaType.INT);
}
@Override
- public Schema convertINT64(PrimitiveTypeName primitiveTypeName) {
- return Schema.create(Schema.Type.LONG);
+ public HoodieSchema convertINT64(PrimitiveTypeName
primitiveTypeName) {
+ return HoodieSchema.create(HoodieSchemaType.LONG);
}
@Override
- public Schema convertINT96(PrimitiveTypeName primitiveTypeName) {
+ public HoodieSchema convertINT96(PrimitiveTypeName
primitiveTypeName) {
if (readInt96AsFixed) {
- return Schema.createFixed("INT96", "INT96 represented as
byte[12]", null, 12);
+ return HoodieSchema.createFixed("INT96", "INT96 represented as
byte[12]", null, 12);
}
throw new IllegalArgumentException(
"INT96 is deprecated. As interim enable READ_INT96_AS_FIXED
flag to read as byte array.");
}
@Override
- public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
- return Schema.create(Schema.Type.FLOAT);
+ public HoodieSchema convertFLOAT(PrimitiveTypeName
primitiveTypeName) {
+ return HoodieSchema.create(HoodieSchemaType.FLOAT);
}
@Override
- public Schema convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
- return Schema.create(Schema.Type.DOUBLE);
+ public HoodieSchema convertDOUBLE(PrimitiveTypeName
primitiveTypeName) {
+ return HoodieSchema.create(HoodieSchemaType.DOUBLE);
}
@Override
- public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName
primitiveTypeName) {
+ public HoodieSchema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName
primitiveTypeName) {
if (annotation instanceof
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
- return Schema.create(Schema.Type.STRING);
+ return HoodieSchema.createUUID();
} else {
int size = parquetType.asPrimitiveType().getTypeLength();
- return Schema.createFixed(parquetType.getName(), null, null,
size);
+ return HoodieSchema.createFixed(parquetType.getName(), null,
null, size);
}
}
@Override
- public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
+ public HoodieSchema convertBINARY(PrimitiveTypeName
primitiveTypeName) {
if (annotation instanceof
LogicalTypeAnnotation.StringLogicalTypeAnnotation ||
annotation instanceof
LogicalTypeAnnotation.EnumLogicalTypeAnnotation) {
- return Schema.create(Schema.Type.STRING);
+ return HoodieSchema.create(HoodieSchemaType.STRING);
} else {
- return Schema.create(Schema.Type.BYTES);
+ return HoodieSchema.create(HoodieSchemaType.BYTES);
}
}
});
- LogicalType logicalType = convertLogicalType(annotation);
- if (logicalType != null && (!(annotation instanceof
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) ||
- parquetPrimitiveTypeName == BINARY ||
- parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
- schema = logicalType.addToSchema(schema);
- }
-
return schema;
} else {
GroupType parquetGroupType = parquetType.asGroupType();
LogicalTypeAnnotation logicalTypeAnnotation =
parquetGroupType.getLogicalTypeAnnotation();
if (logicalTypeAnnotation != null) {
- return logicalTypeAnnotation.accept(new
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Schema>() {
+ return logicalTypeAnnotation.accept(new
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<HoodieSchema>() {
@Override
- public java.util.Optional<Schema>
visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+ public java.util.Optional<HoodieSchema>
visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
Review Comment:
While we're here, let's import `Optional` to shorten the line lengths in
this file
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]