the-other-tim-brown commented on code in PR #13654:
URL: https://github.com/apache/hudi/pull/13654#discussion_r2244291635


##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -376,6 +381,157 @@ public static Schema 
createNewSchemaFromFieldsWithReference(Schema schema, List<
     return newSchema;
   }
 
+  /**
+   * If schemas are projection equivalent, then a record with schema1 does not 
need to be projected to schema2

Review Comment:
   Can you explain "Projection equivalent" as part of the java doc?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java:
##########
@@ -19,20 +19,275 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.function.UnaryOperator;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
+import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDate;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+
 public class HoodieArrayWritableAvroUtils {
 
+  public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable 
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+    return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema, 
newSchema, renameCols, new LinkedList<>());
+  }
+
+  private static Writable rewriteRecordWithNewSchema(Writable writable, Schema 
oldAvroSchema, Schema newAvroSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+    if (writable == null) {
+      return null;
+    }
+    Schema oldSchema = AvroSchemaUtils.resolveNullableSchema(oldAvroSchema);
+    Schema newSchema = AvroSchemaUtils.resolveNullableSchema(newAvroSchema);
+    if (oldSchema.equals(newSchema)) {
+      return writable;
+    }
+    return rewriteRecordWithNewSchemaInternal(writable, oldSchema, newSchema, 
renameCols, fieldNames);
+  }
+
+  private static Writable rewriteRecordWithNewSchemaInternal(Writable 
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException("cannot rewrite record with 
different type");
+        }
+
+        ArrayWritable arrayWritable = (ArrayWritable) writable;
+        List<Schema.Field> fields = newSchema.getFields();
+        // projection will keep the size from the "from" schema because it 
gets recycled
+        // and if the size changes the reader will fail
+        Writable[] values = new Writable[Math.max(fields.size(), 
arrayWritable.get().length)];
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field field = fields.get(i);
+          String fieldName = field.name();
+          fieldNames.push(fieldName);
+          Schema.Field oldField = oldSchema.getField(field.name());
+          if (oldField != null && !renameCols.containsKey(field.name())) {
+            values[i] = 
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], 
oldField.schema(), field.schema(), renameCols, fieldNames);
+          } else {
+            String fieldFullName = HoodieAvroUtils.createFullName(fieldNames);
+            String fieldNameFromOldSchema = renameCols.get(fieldFullName);
+            // deal with rename
+            Schema.Field oldFieldRenamed = fieldNameFromOldSchema == null ? 
null : oldSchema.getField(fieldNameFromOldSchema);
+            if (oldFieldRenamed != null) {
+              // find rename
+              values[i] = 
rewriteRecordWithNewSchema(arrayWritable.get()[oldFieldRenamed.pos()], 
oldFieldRenamed.schema(), field.schema(), renameCols, fieldNames);
+            } else {
+              // deal with default value
+              if (field.defaultVal() instanceof JsonProperties.Null) {
+                values[i] = NullWritable.get();
+              } else {
+                if (!isNullable(field.schema()) && field.defaultVal() == null) 
{
+                  throw new SchemaCompatibilityException("Field " + 
fieldFullName + " has no default value and is non-nullable");
+                }
+                if (field.defaultVal() != null) {
+                  switch 
(AvroSchemaUtils.resolveNullableSchema(field.schema()).getType()) {
+                    case BOOLEAN:
+                      values[i] = new BooleanWritable((Boolean) 
field.defaultVal());
+                      break;
+                    case INT:
+                      values[i] = new IntWritable((Integer) 
field.defaultVal());
+                      break;
+                    case LONG:
+                      values[i] = new LongWritable((Long) field.defaultVal());
+                      break;
+                    case FLOAT:
+                      values[i] = new FloatWritable((Float) 
field.defaultVal());
+                      break;
+                    case DOUBLE:
+                      values[i] = new DoubleWritable((Double) 
field.defaultVal());
+                      break;
+                    case STRING:
+                      values[i] = new Text(field.defaultVal().toString());
+                      break;
+                    default:
+                      throw new SchemaCompatibilityException("Field " + 
fieldFullName + " has no default value");
+                  }
+                }
+              }
+            }
+          }
+          fieldNames.pop();
+        }
+        return new ArrayWritable(Writable.class, values);
+      //arrayWritable.set(values);
+      //return arrayWritable;
+
+      case ENUM:
+        if ((writable instanceof BytesWritable)) {
+          return writable;
+        }
+        if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() 
!= Schema.Type.ENUM) {
+          throw new SchemaCompatibilityException(String.format("Only ENUM or 
STRING type can be converted ENUM type. Schema type was %s", 
oldSchema.getType().getName()));
+        }
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return new BytesWritable(((Text) writable).copyBytes());
+        }
+        return writable;
+      case ARRAY:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as an array", writable.getClass().getName()));
+        }
+        ArrayWritable array = (ArrayWritable) writable;
+        fieldNames.push("element");
+        for (int i = 0; i < array.get().length; i++) {
+          array.get()[i] = rewriteRecordWithNewSchema(array.get()[i], 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames);
+        }
+        fieldNames.pop();
+        return array;
+      case MAP:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as a map", writable.getClass().getName()));
+        }
+        ArrayWritable map = (ArrayWritable) writable;
+        fieldNames.push("value");
+        for (int i = 0; i < map.get().length; i++) {
+          Writable mapEntry = map.get()[i];
+          ((ArrayWritable) mapEntry).get()[1] = 
rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1], 
oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames);
+        }
+        return map;
+
+      case UNION:
+        throw new IllegalArgumentException("should not be here?");
+
+      default:
+        return rewritePrimaryType(writable, oldSchema, newSchema);
+    }
+  }
+
+  public static Writable rewritePrimaryType(Writable writable, Schema 
oldSchema, Schema newSchema) {
+    if (oldSchema.getType() == newSchema.getType()) {
+      switch (oldSchema.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+          return writable;
+        case FIXED:
+          if (oldSchema.getFixedSize() != newSchema.getFixedSize()) {
+            // Check whether this is a [[Decimal]]'s precision change
+            if (oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+              LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+              return 
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+            } else {
+              throw new HoodieAvroSchemaException("Fixed type size change is 
not currently supported");
+            }
+          }
+
+          // For [[Fixed]] data type both size and name have to match
+          //
+          // NOTE: That for values wrapped into [[Union]], to make sure that 
reverse lookup (by
+          //       full-name) is working we have to make sure that both 
schema's name and namespace
+          //       do match
+          if (Objects.equals(oldSchema.getFullName(), 
newSchema.getFullName())) {
+            return writable;
+          } else {
+            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+            return 
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+          }
+
+        default:
+          throw new HoodieAvroSchemaException("Unknown schema type: " + 
newSchema.getType());
+      }
+    } else {
+      return rewritePrimaryTypeWithDiffSchemaType(writable, oldSchema, 
newSchema);
+    }
+  }
+
+  private static Writable rewritePrimaryTypeWithDiffSchemaType(Writable 
writable, Schema oldSchema, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case NULL:
+      case BOOLEAN:
+        break;
+      case INT:
+        if (newSchema.getLogicalType() == LogicalTypes.date() && 
oldSchema.getType() == Schema.Type.STRING) {
+          return new 
IntWritable(HoodieAvroUtils.fromJavaDate(java.sql.Date.valueOf(writable.toString())));
+        }
+        break;
+      case LONG:
+        if (oldSchema.getType() == Schema.Type.INT) {
+          return new LongWritable(((IntWritable) writable).get());
+        }
+        break;
+      case FLOAT:
+        if ((oldSchema.getType() == Schema.Type.INT)
+            || (oldSchema.getType() == Schema.Type.LONG)) {
+          return oldSchema.getType() == Schema.Type.INT ? new 
FloatWritable(((IntWritable) writable).get()) : new 
FloatWritable(((LongWritable) writable).get());
+        }
+        break;
+      case DOUBLE:
+        if (oldSchema.getType() == Schema.Type.FLOAT) {
+          // java float cannot convert to double directly, deal with float 
precision change
+          return new DoubleWritable(Double.parseDouble(((FloatWritable) 
writable).get() + ""));
+        } else if (oldSchema.getType() == Schema.Type.INT) {
+          return new DoubleWritable(((IntWritable) writable).get());
+        } else if (oldSchema.getType() == Schema.Type.LONG) {
+          return new DoubleWritable(((LongWritable) writable).get());
+        }
+        break;
+      case BYTES:
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return new BytesWritable(getUTF8Bytes(writable.toString()));
+        }
+        break;
+      case STRING:
+        if (oldSchema.getType() == Schema.Type.ENUM) {
+          return writable;
+        }
+        if (oldSchema.getType() == Schema.Type.BYTES) {
+          return new Text(StringUtils.fromUTF8Bytes(((BytesWritable) 
writable).getBytes()));
+        }
+        if (oldSchema.getLogicalType() == LogicalTypes.date()) {
+          return new Text(toJavaDate(((IntWritable) 
writable).get()).toString());
+        }
+        if (oldSchema.getType() == Schema.Type.INT
+            || oldSchema.getType() == Schema.Type.LONG
+            || oldSchema.getType() == Schema.Type.FLOAT
+            || oldSchema.getType() == Schema.Type.DOUBLE) {
+          return new Text(writable.toString());
+        }
+        throw new IllegalStateException("need to do this");
+      case FIXED:
+        throw new UnsupportedOperationException("need to do this");

Review Comment:
   Let's update these error messages?



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -376,6 +381,157 @@ public static Schema 
createNewSchemaFromFieldsWithReference(Schema schema, List<
     return newSchema;
   }
 
+  /**
+   * If schemas are projection equivalent, then a record with schema1 does not 
need to be projected to schema2
+   * because the projection will be the identity.
+   */
+  public static boolean areSchemasProjectionEquivalent(Schema schema1, Schema 
schema2) {
+    if (Objects.equals(schema1, schema2)) {
+      return true;
+    }
+    if (schema1 == null || schema2 == null) {
+      return false;
+    }
+    return 
areSchemasProjectionEquivalentInternal(resolveNullableSchema(schema1), 
resolveNullableSchema(schema2));
+  }
+
+  @VisibleForTesting
+  static boolean areSchemasProjectionEquivalentInternal(Schema schema1, Schema 
schema2) {
+    if (Objects.equals(schema1, schema2)) {
+      return true;
+    }
+    switch (schema1.getType()) {
+      case RECORD:
+        if (schema2.getType() != Schema.Type.RECORD) {
+          return false;
+        }
+        List<Schema.Field> fields1 = schema1.getFields();
+        List<Schema.Field> fields2 = schema2.getFields();
+        if (fields1.size() != fields2.size()) {
+          return false;
+        }
+        for (int i = 0; i < fields1.size(); i++) {
+          if 
(!fields1.get(i).name().toLowerCase(Locale.ROOT).equals(fields2.get(i).name().toLowerCase(Locale.ROOT)))
 {

Review Comment:
   you can use `equalsIgnoreCase` to shorten this



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java:
##########
@@ -19,20 +19,275 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.function.UnaryOperator;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
+import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDate;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+
 public class HoodieArrayWritableAvroUtils {
 
+  public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable 
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+    return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema, 
newSchema, renameCols, new LinkedList<>());
+  }
+
+  private static Writable rewriteRecordWithNewSchema(Writable writable, Schema 
oldAvroSchema, Schema newAvroSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+    if (writable == null) {
+      return null;
+    }
+    Schema oldSchema = AvroSchemaUtils.resolveNullableSchema(oldAvroSchema);
+    Schema newSchema = AvroSchemaUtils.resolveNullableSchema(newAvroSchema);
+    if (oldSchema.equals(newSchema)) {
+      return writable;
+    }
+    return rewriteRecordWithNewSchemaInternal(writable, oldSchema, newSchema, 
renameCols, fieldNames);
+  }
+
+  private static Writable rewriteRecordWithNewSchemaInternal(Writable 
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException("cannot rewrite record with 
different type");
+        }
+
+        ArrayWritable arrayWritable = (ArrayWritable) writable;
+        List<Schema.Field> fields = newSchema.getFields();
+        // projection will keep the size from the "from" schema because it 
gets recycled
+        // and if the size changes the reader will fail
+        Writable[] values = new Writable[Math.max(fields.size(), 
arrayWritable.get().length)];
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field field = fields.get(i);
+          String fieldName = field.name();
+          fieldNames.push(fieldName);
+          Schema.Field oldField = oldSchema.getField(field.name());
+          if (oldField != null && !renameCols.containsKey(field.name())) {
+            values[i] = 
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], 
oldField.schema(), field.schema(), renameCols, fieldNames);
+          } else {
+            String fieldFullName = HoodieAvroUtils.createFullName(fieldNames);
+            String fieldNameFromOldSchema = renameCols.get(fieldFullName);
+            // deal with rename
+            Schema.Field oldFieldRenamed = fieldNameFromOldSchema == null ? 
null : oldSchema.getField(fieldNameFromOldSchema);
+            if (oldFieldRenamed != null) {
+              // find rename
+              values[i] = 
rewriteRecordWithNewSchema(arrayWritable.get()[oldFieldRenamed.pos()], 
oldFieldRenamed.schema(), field.schema(), renameCols, fieldNames);
+            } else {
+              // deal with default value
+              if (field.defaultVal() instanceof JsonProperties.Null) {
+                values[i] = NullWritable.get();
+              } else {
+                if (!isNullable(field.schema()) && field.defaultVal() == null) 
{
+                  throw new SchemaCompatibilityException("Field " + 
fieldFullName + " has no default value and is non-nullable");
+                }
+                if (field.defaultVal() != null) {
+                  switch 
(AvroSchemaUtils.resolveNullableSchema(field.schema()).getType()) {
+                    case BOOLEAN:
+                      values[i] = new BooleanWritable((Boolean) 
field.defaultVal());
+                      break;
+                    case INT:
+                      values[i] = new IntWritable((Integer) 
field.defaultVal());
+                      break;
+                    case LONG:
+                      values[i] = new LongWritable((Long) field.defaultVal());
+                      break;
+                    case FLOAT:
+                      values[i] = new FloatWritable((Float) 
field.defaultVal());
+                      break;
+                    case DOUBLE:
+                      values[i] = new DoubleWritable((Double) 
field.defaultVal());
+                      break;
+                    case STRING:
+                      values[i] = new Text(field.defaultVal().toString());
+                      break;
+                    default:
+                      throw new SchemaCompatibilityException("Field " + 
fieldFullName + " has no default value");
+                  }
+                }
+              }
+            }
+          }
+          fieldNames.pop();
+        }
+        return new ArrayWritable(Writable.class, values);
+      //arrayWritable.set(values);
+      //return arrayWritable;
+
+      case ENUM:
+        if ((writable instanceof BytesWritable)) {
+          return writable;
+        }
+        if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() 
!= Schema.Type.ENUM) {
+          throw new SchemaCompatibilityException(String.format("Only ENUM or 
STRING type can be converted ENUM type. Schema type was %s", 
oldSchema.getType().getName()));
+        }
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return new BytesWritable(((Text) writable).copyBytes());
+        }
+        return writable;
+      case ARRAY:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as an array", writable.getClass().getName()));
+        }
+        ArrayWritable array = (ArrayWritable) writable;
+        fieldNames.push("element");
+        for (int i = 0; i < array.get().length; i++) {
+          array.get()[i] = rewriteRecordWithNewSchema(array.get()[i], 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames);
+        }
+        fieldNames.pop();
+        return array;
+      case MAP:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as a map", writable.getClass().getName()));
+        }
+        ArrayWritable map = (ArrayWritable) writable;
+        fieldNames.push("value");
+        for (int i = 0; i < map.get().length; i++) {
+          Writable mapEntry = map.get()[i];
+          ((ArrayWritable) mapEntry).get()[1] = 
rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1], 
oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames);
+        }
+        return map;
+
+      case UNION:
+        throw new IllegalArgumentException("should not be here?");
+
+      default:
+        return rewritePrimaryType(writable, oldSchema, newSchema);
+    }
+  }
+
+  public static Writable rewritePrimaryType(Writable writable, Schema 
oldSchema, Schema newSchema) {
+    if (oldSchema.getType() == newSchema.getType()) {
+      switch (oldSchema.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+          return writable;
+        case FIXED:
+          if (oldSchema.getFixedSize() != newSchema.getFixedSize()) {
+            // Check whether this is a [[Decimal]]'s precision change
+            if (oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+              LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+              return 
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+            } else {
+              throw new HoodieAvroSchemaException("Fixed type size change is 
not currently supported");
+            }
+          }
+
+          // For [[Fixed]] data type both size and name have to match
+          //
+          // NOTE: That for values wrapped into [[Union]], to make sure that 
reverse lookup (by
+          //       full-name) is working we have to make sure that both 
schema's name and namespace
+          //       do match
+          if (Objects.equals(oldSchema.getFullName(), 
newSchema.getFullName())) {
+            return writable;
+          } else {
+            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+            return 
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+          }
+
+        default:
+          throw new HoodieAvroSchemaException("Unknown schema type: " + 
newSchema.getType());
+      }
+    } else {
+      return rewritePrimaryTypeWithDiffSchemaType(writable, oldSchema, 
newSchema);
+    }
+  }
+
+  private static Writable rewritePrimaryTypeWithDiffSchemaType(Writable 
writable, Schema oldSchema, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case NULL:
+      case BOOLEAN:
+        break;
+      case INT:
+        if (newSchema.getLogicalType() == LogicalTypes.date() && 
oldSchema.getType() == Schema.Type.STRING) {
+          return new 
IntWritable(HoodieAvroUtils.fromJavaDate(java.sql.Date.valueOf(writable.toString())));
+        }
+        break;
+      case LONG:
+        if (oldSchema.getType() == Schema.Type.INT) {
+          return new LongWritable(((IntWritable) writable).get());
+        }
+        break;
+      case FLOAT:
+        if ((oldSchema.getType() == Schema.Type.INT)
+            || (oldSchema.getType() == Schema.Type.LONG)) {
+          return oldSchema.getType() == Schema.Type.INT ? new 
FloatWritable(((IntWritable) writable).get()) : new 
FloatWritable(((LongWritable) writable).get());
+        }
+        break;
+      case DOUBLE:
+        if (oldSchema.getType() == Schema.Type.FLOAT) {
+          // java float cannot convert to double directly, deal with float 
precision change
+          return new DoubleWritable(Double.parseDouble(((FloatWritable) 
writable).get() + ""));
+        } else if (oldSchema.getType() == Schema.Type.INT) {
+          return new DoubleWritable(((IntWritable) writable).get());
+        } else if (oldSchema.getType() == Schema.Type.LONG) {
+          return new DoubleWritable(((LongWritable) writable).get());
+        }
+        break;
+      case BYTES:
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return new BytesWritable(getUTF8Bytes(writable.toString()));
+        }
+        break;
+      case STRING:
+        if (oldSchema.getType() == Schema.Type.ENUM) {
+          return writable;
+        }
+        if (oldSchema.getType() == Schema.Type.BYTES) {
+          return new Text(StringUtils.fromUTF8Bytes(((BytesWritable) 
writable).getBytes()));
+        }
+        if (oldSchema.getLogicalType() == LogicalTypes.date()) {
+          return new Text(toJavaDate(((IntWritable) 
writable).get()).toString());
+        }
+        if (oldSchema.getType() == Schema.Type.INT
+            || oldSchema.getType() == Schema.Type.LONG
+            || oldSchema.getType() == Schema.Type.FLOAT
+            || oldSchema.getType() == Schema.Type.DOUBLE) {
+          return new Text(writable.toString());
+        }
+        throw new IllegalStateException("need to do this");

Review Comment:
   Instead of throwing here, can it just default to `new 
Text(writable.toString())`



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java:
##########
@@ -19,20 +19,275 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.function.UnaryOperator;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
+import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDate;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+
 public class HoodieArrayWritableAvroUtils {
 
+  public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable 
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+    return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema, 
newSchema, renameCols, new LinkedList<>());
+  }
+
+  private static Writable rewriteRecordWithNewSchema(Writable writable, Schema 
oldAvroSchema, Schema newAvroSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+    if (writable == null) {
+      return null;
+    }
+    Schema oldSchema = AvroSchemaUtils.resolveNullableSchema(oldAvroSchema);
+    Schema newSchema = AvroSchemaUtils.resolveNullableSchema(newAvroSchema);
+    if (oldSchema.equals(newSchema)) {
+      return writable;
+    }
+    return rewriteRecordWithNewSchemaInternal(writable, oldSchema, newSchema, 
renameCols, fieldNames);
+  }
+
+  private static Writable rewriteRecordWithNewSchemaInternal(Writable 
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException("cannot rewrite record with 
different type");
+        }
+
+        ArrayWritable arrayWritable = (ArrayWritable) writable;
+        List<Schema.Field> fields = newSchema.getFields();
+        // projection will keep the size from the "from" schema because it 
gets recycled
+        // and if the size changes the reader will fail
+        Writable[] values = new Writable[Math.max(fields.size(), 
arrayWritable.get().length)];
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field field = fields.get(i);
+          String fieldName = field.name();
+          fieldNames.push(fieldName);
+          Schema.Field oldField = oldSchema.getField(field.name());
+          if (oldField != null && !renameCols.containsKey(field.name())) {
+            values[i] = 
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], 
oldField.schema(), field.schema(), renameCols, fieldNames);
+          } else {
+            String fieldFullName = HoodieAvroUtils.createFullName(fieldNames);
+            String fieldNameFromOldSchema = renameCols.get(fieldFullName);
+            // deal with rename
+            Schema.Field oldFieldRenamed = fieldNameFromOldSchema == null ? 
null : oldSchema.getField(fieldNameFromOldSchema);
+            if (oldFieldRenamed != null) {
+              // find rename
+              values[i] = 
rewriteRecordWithNewSchema(arrayWritable.get()[oldFieldRenamed.pos()], 
oldFieldRenamed.schema(), field.schema(), renameCols, fieldNames);
+            } else {
+              // deal with default value
+              if (field.defaultVal() instanceof JsonProperties.Null) {
+                values[i] = NullWritable.get();
+              } else {
+                if (!isNullable(field.schema()) && field.defaultVal() == null) 
{
+                  throw new SchemaCompatibilityException("Field " + 
fieldFullName + " has no default value and is non-nullable");
+                }
+                if (field.defaultVal() != null) {
+                  switch 
(AvroSchemaUtils.resolveNullableSchema(field.schema()).getType()) {
+                    case BOOLEAN:
+                      values[i] = new BooleanWritable((Boolean) 
field.defaultVal());
+                      break;
+                    case INT:
+                      values[i] = new IntWritable((Integer) 
field.defaultVal());
+                      break;
+                    case LONG:
+                      values[i] = new LongWritable((Long) field.defaultVal());
+                      break;
+                    case FLOAT:
+                      values[i] = new FloatWritable((Float) 
field.defaultVal());
+                      break;
+                    case DOUBLE:
+                      values[i] = new DoubleWritable((Double) 
field.defaultVal());
+                      break;
+                    case STRING:
+                      values[i] = new Text(field.defaultVal().toString());
+                      break;
+                    default:
+                      throw new SchemaCompatibilityException("Field " + 
fieldFullName + " has no default value");
+                  }
+                }
+              }
+            }
+          }
+          fieldNames.pop();
+        }
+        return new ArrayWritable(Writable.class, values);
+      //arrayWritable.set(values);
+      //return arrayWritable;
+
+      case ENUM:
+        if ((writable instanceof BytesWritable)) {
+          return writable;
+        }
+        if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() 
!= Schema.Type.ENUM) {
+          throw new SchemaCompatibilityException(String.format("Only ENUM or 
STRING type can be converted ENUM type. Schema type was %s", 
oldSchema.getType().getName()));
+        }
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return new BytesWritable(((Text) writable).copyBytes());
+        }
+        return writable;
+      case ARRAY:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as an array", writable.getClass().getName()));
+        }
+        ArrayWritable array = (ArrayWritable) writable;
+        fieldNames.push("element");
+        for (int i = 0; i < array.get().length; i++) {
+          array.get()[i] = rewriteRecordWithNewSchema(array.get()[i], 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames);
+        }
+        fieldNames.pop();
+        return array;
+      case MAP:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as a map", writable.getClass().getName()));
+        }
+        ArrayWritable map = (ArrayWritable) writable;
+        fieldNames.push("value");
+        for (int i = 0; i < map.get().length; i++) {
+          Writable mapEntry = map.get()[i];
+          ((ArrayWritable) mapEntry).get()[1] = 
rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1], 
oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames);
+        }
+        return map;
+
+      case UNION:
+        throw new IllegalArgumentException("should not be here?");
+
+      default:
+        return rewritePrimaryType(writable, oldSchema, newSchema);
+    }
+  }
+
+  public static Writable rewritePrimaryType(Writable writable, Schema 
oldSchema, Schema newSchema) {
+    if (oldSchema.getType() == newSchema.getType()) {
+      switch (oldSchema.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+          return writable;
+        case FIXED:
+          if (oldSchema.getFixedSize() != newSchema.getFixedSize()) {
+            // Check whether this is a [[Decimal]]'s precision change
+            if (oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+              LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+              return 
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+            } else {
+              throw new HoodieAvroSchemaException("Fixed type size change is 
not currently supported");
+            }
+          }
+
+          // For [[Fixed]] data type both size and name have to match
+          //
+          // NOTE: That for values wrapped into [[Union]], to make sure that 
reverse lookup (by
+          //       full-name) is working we have to make sure that both 
schema's name and namespace
+          //       do match
+          if (Objects.equals(oldSchema.getFullName(), 
newSchema.getFullName())) {
+            return writable;
+          } else {
+            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+            return 
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+          }
+
+        default:
+          throw new HoodieAvroSchemaException("Unknown schema type: " + 
newSchema.getType());
+      }
+    } else {
+      return rewritePrimaryTypeWithDiffSchemaType(writable, oldSchema, 
newSchema);
+    }
+  }
+
+  private static Writable rewritePrimaryTypeWithDiffSchemaType(Writable 
writable, Schema oldSchema, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case NULL:
+      case BOOLEAN:
+        break;
+      case INT:
+        if (newSchema.getLogicalType() == LogicalTypes.date() && 
oldSchema.getType() == Schema.Type.STRING) {
+          return new 
IntWritable(HoodieAvroUtils.fromJavaDate(java.sql.Date.valueOf(writable.toString())));
+        }
+        break;
+      case LONG:
+        if (oldSchema.getType() == Schema.Type.INT) {
+          return new LongWritable(((IntWritable) writable).get());
+        }
+        break;
+      case FLOAT:
+        if ((oldSchema.getType() == Schema.Type.INT)
+            || (oldSchema.getType() == Schema.Type.LONG)) {
+          return oldSchema.getType() == Schema.Type.INT ? new 
FloatWritable(((IntWritable) writable).get()) : new 
FloatWritable(((LongWritable) writable).get());
+        }
+        break;
+      case DOUBLE:
+        if (oldSchema.getType() == Schema.Type.FLOAT) {
+          // java float cannot convert to double directly, deal with float 
precision change
+          return new DoubleWritable(Double.parseDouble(((FloatWritable) 
writable).get() + ""));
+        } else if (oldSchema.getType() == Schema.Type.INT) {
+          return new DoubleWritable(((IntWritable) writable).get());
+        } else if (oldSchema.getType() == Schema.Type.LONG) {
+          return new DoubleWritable(((LongWritable) writable).get());
+        }
+        break;
+      case BYTES:
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return new BytesWritable(getUTF8Bytes(writable.toString()));
+        }
+        break;
+      case STRING:
+        if (oldSchema.getType() == Schema.Type.ENUM) {
+          return writable;
+        }
+        if (oldSchema.getType() == Schema.Type.BYTES) {
+          return new Text(StringUtils.fromUTF8Bytes(((BytesWritable) 
writable).getBytes()));
+        }
+        if (oldSchema.getLogicalType() == LogicalTypes.date()) {

Review Comment:
   should there be any special handling of timestamps in this conversion code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to