the-other-tim-brown commented on code in PR #17535:
URL: https://github.com/apache/hudi/pull/17535#discussion_r2614521506
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java:
##########
@@ -65,65 +65,66 @@
public class HoodieArrayWritableAvroUtils {
- public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+ public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable
writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String>
renameCols) {
return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema,
newSchema, renameCols, new LinkedList<>());
}
- private static Writable rewriteRecordWithNewSchema(Writable writable, Schema
oldAvroSchema, Schema newAvroSchema, Map<String, String> renameCols,
Deque<String> fieldNames) {
+ private static Writable rewriteRecordWithNewSchema(Writable writable,
HoodieSchema oldAvroSchema, HoodieSchema newAvroSchema, Map<String, String>
renameCols, Deque<String> fieldNames) {
if (writable == null) {
return null;
}
- Schema oldSchema = AvroSchemaUtils.getNonNullTypeFromUnion(oldAvroSchema);
- Schema newSchema = AvroSchemaUtils.getNonNullTypeFromUnion(newAvroSchema);
- if (areSchemasProjectionEquivalent(oldSchema, newSchema)) {
+ HoodieSchema oldSchema =
HoodieSchemaUtils.getNonNullTypeFromUnion(oldAvroSchema);
+ HoodieSchema newSchema =
HoodieSchemaUtils.getNonNullTypeFromUnion(newAvroSchema);
+ if (HoodieSchemaCompatibility.areSchemasProjectionEquivalent(oldSchema,
newSchema)) {
return writable;
}
return rewriteRecordWithNewSchemaInternal(writable, oldSchema, newSchema,
renameCols, fieldNames);
}
- private static Writable rewriteRecordWithNewSchemaInternal(Writable
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols,
Deque<String> fieldNames) {
+ private static Writable rewriteRecordWithNewSchemaInternal(Writable
writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String>
renameCols, Deque<String> fieldNames) {
switch (newSchema.getType()) {
case RECORD:
if (!(writable instanceof ArrayWritable)) {
throw new SchemaCompatibilityException(String.format("Cannot rewrite
%s as a record", writable.getClass().getName()));
}
ArrayWritable arrayWritable = (ArrayWritable) writable;
- List<Schema.Field> fields = newSchema.getFields();
+ List<HoodieSchemaField> fields = newSchema.getFields();
// projection will keep the size from the "from" schema because it
gets recycled
// and if the size changes the reader will fail
boolean noFieldsRenaming = renameCols.isEmpty();
String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames);
Writable[] values = new Writable[Math.max(fields.size(),
arrayWritable.get().length)];
for (int i = 0; i < fields.size(); i++) {
- Schema.Field newField = newSchema.getFields().get(i);
+ HoodieSchemaField newField = newSchema.getFields().get(i);
String newFieldName = newField.name();
fieldNames.push(newFieldName);
- Schema.Field oldField = noFieldsRenaming
+ Option<HoodieSchemaField> oldFieldOpt = noFieldsRenaming
? oldSchema.getField(newFieldName)
: oldSchema.getField(getOldFieldNameWithRenaming(namePrefix,
newFieldName, renameCols));
- if (oldField != null) {
+ if (oldFieldOpt.isPresent()) {
+ HoodieSchemaField oldField = oldFieldOpt.get();
values[i] =
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()],
oldField.schema(), newField.schema(), renameCols, fieldNames);
- } else if (newField.defaultVal() instanceof JsonProperties.Null) {
+ } else if (newField.defaultVal().isPresent() &&
newField.defaultVal().get() instanceof JsonProperties.Null) {
Review Comment:
`JsonProperties` needs to be `HoodieJsonProperties` now
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java:
##########
@@ -121,6 +123,26 @@ public static boolean isSchemaCompatible(HoodieSchema
readerSchema, HoodieSchema
return isSchemaCompatible(readerSchema, writerSchema, true);
}
+ /**
+ * Checks if two schemas are compatible with projection support.
+ * This allows the reader schema to have fewer fields than the writer schema.
+ *
+ * @param readerSchema the schema used to read the data
+ * @param writerSchema the schema used to write the data
+ * @param checkNaming controls whether schemas fully-qualified names
should be checked
+ * @param allowProjection whether to allow fewer fields in reader schema
+ * @return true if reader schema can read data written with writer schema
+ * @throws IllegalArgumentException if schemas are null
+ */
+ public static boolean isSchemaCompatible(HoodieSchema readerSchema,
HoodieSchema writerSchema,
Review Comment:
Are there existing test cases we can migrate to
TestHoodieSchemaCompatibility as part of this PR?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java:
##########
@@ -65,65 +65,66 @@
public class HoodieArrayWritableAvroUtils {
Review Comment:
Should we update this class name to just be `HoodieArrayWritableSchemaUtils`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]