the-other-tim-brown commented on code in PR #17535:
URL: https://github.com/apache/hudi/pull/17535#discussion_r2608866464
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -1015,11 +1016,13 @@ static void validateSecondaryIndexSchemaEvolution(
}
// Use AvroSchemaCompatibility's field lookup logic to handle aliases
- Schema.Field writerField =
AvroSchemaCompatibility.lookupWriterField(writerSchema, tableField);
+ HoodieSchemaField writerField =
HoodieSchemaCompatibility.lookupWriterField(writerSchema, tableField);
if (writerField != null &&
!tableField.schema().equals(writerField.schema())) {
// Check if this is just making the field nullable/non-nullable, which
is safe from SI perspective
- if
(getNonNullTypeFromUnion(tableField.schema()).equals(getNonNullTypeFromUnion(writerField.schema())))
{
+ HoodieSchema nonNullTableField =
HoodieSchemaUtils.getNonNullTypeFromUnion(tableField.schema());
Review Comment:
Let's use the `getNonNullType` method on the `HoodieSchema` when possible
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java:
##########
@@ -191,4 +215,42 @@ public static boolean
areSchemasProjectionEquivalent(HoodieSchema schema1, Hoodi
}
return
AvroSchemaUtils.areSchemasProjectionEquivalent(schema1.toAvroSchema(),
schema2.toAvroSchema());
}
+
+ /**
+ * Identifies the writer field that corresponds to the specified reader
field.
+ * This function is adapted from AvroSchemaCompatibility#lookupWriterField
+ *
+ * <p>
+ * Matching includes reader name aliases.
+ * </p>
+ *
+ * @param writerSchema Schema of the record where to look for the writer
field.
+ * @param readerField Reader field to identify the corresponding writer
field
+ * of.
+ * @return the writer field, if any does correspond, or None.
+ */
+ public static HoodieSchemaField lookupWriterField(final HoodieSchema
writerSchema, final HoodieSchemaField readerField) {
+ assert (writerSchema.getType() == HoodieSchemaType.RECORD);
+ final List<HoodieSchemaField> writerFields = new ArrayList<>();
Review Comment:
It looks like the list is expected to have 0 or 1 elements so an Option may
fit this usecase better, what do you think?
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java:
##########
@@ -191,4 +215,42 @@ public static boolean
areSchemasProjectionEquivalent(HoodieSchema schema1, Hoodi
}
return
AvroSchemaUtils.areSchemasProjectionEquivalent(schema1.toAvroSchema(),
schema2.toAvroSchema());
}
+
+ /**
+ * Identifies the writer field that corresponds to the specified reader
field.
+ * This function is adapted from AvroSchemaCompatibility#lookupWriterField
+ *
+ * <p>
+ * Matching includes reader name aliases.
+ * </p>
+ *
+ * @param writerSchema Schema of the record where to look for the writer
field.
+ * @param readerField Reader field to identify the corresponding writer
field
+ * of.
+ * @return the writer field, if any does correspond, or None.
+ */
+ public static HoodieSchemaField lookupWriterField(final HoodieSchema
writerSchema, final HoodieSchemaField readerField) {
+ assert (writerSchema.getType() == HoodieSchemaType.RECORD);
Review Comment:
Let's use the `ValidationUtils` here so we can return a more customized
error message if the type is not `RECORD`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -116,21 +115,21 @@ public ClosableIterator<HoodieRecord<InternalRow>>
getRecordIterator(HoodieSchem
@Override
public ClosableIterator<HoodieRecord<InternalRow>>
getRecordIterator(HoodieSchema schema) throws IOException {
//TODO boundary to revisit in later pr to use HoodieSchema directly
Review Comment:
Let's remove this TODO now
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -1005,8 +1006,8 @@ static void validateSecondaryIndexSchemaEvolution(
for (Map.Entry<String, String> entry : columnToIndexName.entrySet()) {
String columnName = entry.getKey();
String indexName = entry.getValue();
-
- Schema.Field tableField = tableSchema.getField(columnName);
+
+ HoodieSchemaField tableField = tableSchema.getField(columnName).get();
if (tableField == null) {
Review Comment:
Let's update this to first return an `Option` and then we can check if it is
present instead of checking for `null` here.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/PartitionPathParser.java:
##########
@@ -158,19 +156,15 @@ private static Object inferDateValue(
throw new IllegalArgumentException(
"Unknown date format for partition path: " + partitionPath);
}
- if (fieldSchema.getLogicalType() instanceof LogicalTypes.Date) {
+ if (fieldSchema.getType() == HoodieSchemaType.DATE) {
return Date.valueOf(time.toLocalDate());
}
return Timestamp.from(time.toInstant(ZoneOffset.UTC));
}
- private static boolean isTimeBasedLogicalType(LogicalType logicalType) {
- return logicalType instanceof LogicalTypes.Date
- || logicalType instanceof LogicalTypes.TimestampMillis
- || logicalType instanceof LogicalTypes.TimestampMicros
- || logicalType instanceof LogicalTypes.TimeMillis
- || logicalType instanceof LogicalTypes.TimeMicros
- || logicalType instanceof LogicalTypes.LocalTimestampMicros
- || logicalType instanceof LogicalTypes.LocalTimestampMillis;
+ private static boolean isTimeBasedLogicalType(HoodieSchemaType logicalType) {
Review Comment:
```suggestion
private static boolean isTimeBasedType(HoodieSchemaType type) {
```
The logical type is an Avro concept. We have proper types for date, time,
timestamp, etc now
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java:
##########
@@ -83,28 +84,27 @@ class TestTableSchemaResolver {
@Test
void testRecreateSchemaWhenDropPartitionColumns() {
- Schema originSchema = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+ HoodieSchema originSchema =
HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
// case2
String[] pts1 = new String[0];
- Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema,
Option.of(pts1));
+ HoodieSchema s2 = TableSchemaResolver.appendPartitionColumns(originSchema,
Option.of(pts1));
assertEquals(originSchema, s2);
// case3: partition_path is in originSchema
String[] pts2 = {"partition_path"};
- Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema,
Option.of(pts2));
+ HoodieSchema s3 = TableSchemaResolver.appendPartitionColumns(originSchema,
Option.of(pts2));
assertEquals(originSchema, s3);
// case4: user_partition is not in originSchema
String[] pts3 = {"user_partition"};
- Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema,
Option.of(pts3));
+ HoodieSchema s4 = TableSchemaResolver.appendPartitionColumns(originSchema,
Option.of(pts3));
assertNotEquals(originSchema, s4);
assertTrue(s4.getFields().stream().anyMatch(f ->
f.name().equals("user_partition")));
- Schema.Field f = s4.getField("user_partition");
- assertEquals(f.schema(),
AvroSchemaUtils.createNullableSchema(Schema.Type.STRING));
+ HoodieSchemaField f = s4.getField("user_partition").get();
+ assertEquals(f.schema(),
HoodieSchema.createNullable(HoodieSchemaType.STRING));
// case5: user_partition is in originSchema, but partition_path is in
originSchema
- String[] pts4 = {"user_partition", "partition_path"};
Review Comment:
Do you think this is an error in the original test? It seems like this
should be used below.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/PartitionPathParser.java:
##########
@@ -53,17 +51,17 @@ public Object[] getPartitionFieldVals(Option<String[]>
partitionFields,
private static Object[] getPartitionValues(String[] partitionFields,
String partitionPath,
- Schema schema) {
+ HoodieSchema schema) {
String[] parts = partitionPath.split("/");
int pathSegment = 0;
boolean hasDateField = false;
Object[] partitionValues = new Object[partitionFields.length];
for (int i = 0; i < partitionFields.length; i++) {
String partitionField = partitionFields[i];
- Schema.Field field = schema.getField(partitionField);
+ Option<HoodieSchemaField> fieldOpt = schema.getField(partitionField);
// if the field is not present in the schema, we assume it is a string
- Schema fieldSchema = field == null ? Schema.create(Schema.Type.STRING) :
getNonNullTypeFromUnion(field.schema());
- LogicalType logicalType = fieldSchema.getLogicalType();
+ HoodieSchema fieldSchema = fieldOpt.isEmpty() ?
HoodieSchema.create(HoodieSchemaType.STRING) :
HoodieSchemaUtils.getNonNullTypeFromUnion(fieldOpt.get().schema());
+ HoodieSchemaType logicalType = fieldSchema.getType();
Review Comment:
```suggestion
Option<HoodieSchemaField> field = schema.getField(partitionField);
// if the field is not present in the schema, we assume it is a string
HoodieSchema fieldSchema = field.map(f ->
f.schema().getNonNullType()).orElseGet(() ->
HoodieSchema.create(HoodieSchemaType.STRING));
```
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java:
##########
@@ -62,6 +66,38 @@ public KafkaOffsetPostProcessor(TypedProperties props,
JavaSparkContext jssc) {
}
@Override
+ public HoodieSchema processSchema(HoodieSchema schema) {
Review Comment:
This was not overridden on purpose in case a user is extending the
deprecated method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]