the-other-tim-brown commented on code in PR #13654:
URL: https://github.com/apache/hudi/pull/13654#discussion_r2251582224


##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1146,12 +1146,11 @@ private static Object 
rewriteRecordWithNewSchemaInternal(Object oldRecord,
     }
   }
 
-  @VisibleForTesting
-  public static String createNamePredix(boolean noFieldsRenaming, 
Deque<String> fieldNames) {
+  public static String createNamePrefix(boolean noFieldsRenaming, 
Deque<String> fieldNames) {

Review Comment:
   The logic of this can be simplified to `noFieldsRenaming || 
fieldNames.isEmpty() ? null : createFullName(fieldNames)`



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -376,6 +380,160 @@ public static Schema 
createNewSchemaFromFieldsWithReference(Schema schema, List<
     return newSchema;
   }
 
+  /**
+   * If schemas are projection equivalent, then a record with schema1 does not 
need to be projected to schema2
+   * because the projection will be the identity.
+   *
+   *  Two schemas are considered projection equivalent if the field names and 
types are equivalent.
+   *  The names of records, namespaces, or docs do not need to match. 
Nullability is ignored.
+   */
+  public static boolean areSchemasProjectionEquivalent(Schema schema1, Schema 
schema2) {
+    if (Objects.equals(schema1, schema2)) {
+      return true;
+    }
+    if (schema1 == null || schema2 == null) {
+      return false;
+    }
+    return 
areSchemasProjectionEquivalentInternal(resolveNullableSchema(schema1), 
resolveNullableSchema(schema2));
+  }
+
+  @VisibleForTesting
+  static boolean areSchemasProjectionEquivalentInternal(Schema schema1, Schema 
schema2) {
+    if (Objects.equals(schema1, schema2)) {
+      return true;
+    }
+    switch (schema1.getType()) {
+      case RECORD:
+        if (schema2.getType() != Schema.Type.RECORD) {
+          return false;
+        }
+        List<Schema.Field> fields1 = schema1.getFields();
+        List<Schema.Field> fields2 = schema2.getFields();
+        if (fields1.size() != fields2.size()) {
+          return false;
+        }
+        for (int i = 0; i < fields1.size(); i++) {
+          if (!fields1.get(i).name().equalsIgnoreCase(fields2.get(i).name())) {
+            return false;
+          }
+          if (!areSchemasProjectionEquivalent(fields1.get(i).schema(), 
fields2.get(i).schema())) {
+            return false;
+          }
+        }
+        return true;
+
+      case ARRAY:
+        if (schema2.getType() != Schema.Type.ARRAY) {
+          return false;
+        }
+        return areSchemasProjectionEquivalent(schema1.getElementType(), 
schema2.getElementType());
+
+      case MAP:
+        if (schema2.getType() != Schema.Type.MAP) {
+          return false;
+        }
+        return areSchemasProjectionEquivalent(schema1.getValueType(), 
schema2.getValueType());
+      case UNION:
+        throw new IllegalArgumentException("Union schemas are not supported 
besides nullable");
+      default:
+        return areSchemaPrimitivesProjectionEquivalent(schema1, schema2);
+    }
+  }
+
+  @VisibleForTesting
+  static boolean areSchemaPrimitivesProjectionEquivalent(Schema schema1, 
Schema schema2) {
+    if (!areLogicalTypesProjectionEquivalent(schema1.getLogicalType(), 
schema2.getLogicalType())) {
+      return false;
+    }
+    if (Objects.requireNonNull(schema1.getType()) == Schema.Type.FIXED) {
+      return schema2.getType() == Schema.Type.FIXED
+          && schema1.getFixedSize() == schema2.getFixedSize();
+    }
+    if (Objects.requireNonNull(schema1.getType()) == Schema.Type.ENUM) {
+      return schema2.getType() == Schema.Type.ENUM
+          && areEnumSymbolsProjectionEquivalent(schema1.getEnumSymbols(), 
schema2.getEnumSymbols());
+    }
+    return Objects.equals(schema1.getType(), schema2.getType());
+  }
+
+  private static boolean areEnumSymbolsProjectionEquivalent(List<String> 
enumSymbols1, List<String> enumSymbols2) {
+    Set<String> set1 = new HashSet<>(enumSymbols1);
+    Set<String> set2 = new HashSet<>(enumSymbols2);
+    return set2.containsAll(set1);
+  }
+
+  private static boolean areLogicalTypesProjectionEquivalent(LogicalType 
logicalType1, LogicalType logicalType2) {
+    if (Objects.equals(logicalType1, logicalType2)) {
+      return true;
+    }
+    if (logicalType1 == null || logicalType2 == null) {
+      return false;
+    }
+    if (logicalType1 instanceof LogicalTypes.Decimal && logicalType2 
instanceof LogicalTypes.Decimal) {
+      return ((LogicalTypes.Decimal) logicalType1).getScale() == 
((LogicalTypes.Decimal) logicalType2).getScale()
+          && ((LogicalTypes.Decimal) logicalType1).getPrecision() == 
((LogicalTypes.Decimal) logicalType2).getPrecision();
+    }
+    return false;
+  }
+
+  public static Schema pruneDataSchema(Schema dataSchema, Schema 
requiredSchema, Set<String> excludeFields) {

Review Comment:
   Can you add a quick java doc on how to use this?



##########
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java:
##########
@@ -118,4 +124,282 @@ public void testCastOrderingField() {
     WritableComparable reflexive = new IntWritable(8675309);
     assertEquals(reflexive, 
readerContext.getRecordContext().convertValueToEngineType(reflexive));
   }
+
+  @Test
+  void testRewriteStringToDateInt() throws AvroSerdeException {
+    Schema oldSchema = Schema.create(Schema.Type.STRING);
+    Schema newSchema = 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+    Writable oldWritable = new Text("2023-01-01");
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = 
HoodieHiveUtils.getDateWriteable(HoodieAvroUtils.fromJavaDate(Date.valueOf("2023-01-01")));
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteIntToLong() throws AvroSerdeException {
+    Writable oldWritable = new IntWritable(42);
+    Schema oldSchema = Schema.create(Schema.Type.INT);
+    Schema newSchema = Schema.create(Schema.Type.LONG);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = new LongWritable(42);
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteLongToFloat() throws AvroSerdeException {
+    Writable oldWritable = new LongWritable(123);
+    Schema oldSchema = Schema.create(Schema.Type.LONG);
+    Schema newSchema = Schema.create(Schema.Type.FLOAT);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = new FloatWritable(123.0f);
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteFloatToDouble() throws AvroSerdeException {
+    Writable oldWritable = new FloatWritable(3.14f);
+    Schema oldSchema = Schema.create(Schema.Type.FLOAT);
+    Schema newSchema = Schema.create(Schema.Type.DOUBLE);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = new DoubleWritable(3.14d);
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteBytesToString() throws AvroSerdeException {
+    BytesWritable oldWritable = new BytesWritable("hello".getBytes());
+    Schema oldSchema = Schema.create(Schema.Type.BYTES);
+    Schema newSchema = Schema.create(Schema.Type.STRING);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = new Text("hello");
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteIntToString() throws AvroSerdeException {
+    Writable oldWritable = new IntWritable(123);
+    Schema oldSchema = Schema.create(Schema.Type.INT);
+    Schema newSchema = Schema.create(Schema.Type.STRING);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = new Text("123");
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteFixedDecimalToString() throws AvroSerdeException {
+    Schema decimalSchema = LogicalTypes.decimal(10, 
2).addToSchema(Schema.createFixed("decimal", null, null, 5));
+    HiveDecimalWritable oldWritable = new 
HiveDecimalWritable(HiveDecimal.create(new BigDecimal("123.45")));
+    Schema newSchema = Schema.create(Schema.Type.STRING);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, decimalSchema, 
newSchema);
+    Writable expected = new Text("123.45");
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, decimalSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteStringToFixedDecimal() throws AvroSerdeException {
+    Schema decimalSchema = LogicalTypes.decimal(10, 
2).addToSchema(Schema.createFixed("decimal", null, null, 5));
+    Writable oldWritable = new Text("123.45");
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, 
Schema.create(Schema.Type.STRING), decimalSchema);
+    assertInstanceOf(HiveDecimalWritable.class, result);
+    assertEquals(new BigDecimal("123.45"), ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.STRING), 
result, decimalSchema);
+  }
+
+  @Test
+  void testRewriteBytesToFixedDecimal() throws AvroSerdeException {
+    BigDecimal input = new BigDecimal("123.45");
+    byte[] bytes = input.unscaledValue().toByteArray();
+    BytesWritable oldWritable = new BytesWritable(bytes);
+    Schema decimalSchema = LogicalTypes.decimal(5, 
2).addToSchema(Schema.createFixed("decimal", null, null, 5));
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, 
Schema.create(Schema.Type.BYTES), decimalSchema);
+    assertEquals(input, ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.BYTES), 
result, decimalSchema);
+  }
+
+  @Test
+  void testUnsupportedTypeConversionThrows() {
+    Schema oldSchema = Schema.createMap(Schema.create(Schema.Type.INT));
+    Schema newSchema = Schema.create(Schema.Type.STRING);
+    assertThrows(HoodieAvroSchemaException.class, () ->
+        HoodieArrayWritableAvroUtils.rewritePrimaryType(null, oldSchema, 
newSchema));
+  }
+
+  @Test
+  void testRewriteEnumToString() throws AvroSerdeException {
+    Schema enumSchema = Schema.createEnum("TestEnum", null, null, 
Arrays.asList("A", "B", "C"));
+    Writable oldWritable = new Text("B");
+    Schema newSchema = Schema.create(Schema.Type.STRING);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, enumSchema, 
newSchema);
+    Writable expected = new Text("B");
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, enumSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteFixedWithSameSizeAndFullName() {
+    Schema oldFixed = Schema.createFixed("decimal", null, "ns", 5);
+    Schema newFixed = Schema.createFixed("decimal", null, "ns", 5);
+    HiveDecimalWritable hdw = new 
HiveDecimalWritable(HiveDecimal.create("123.45"));
+    Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(hdw, 
oldFixed, newFixed);
+    assertSame(hdw, result);
+  }
+
+  @Test
+  void testRewriteFixedWithSameSizeButDifferentNameUsesDecimalFallback() 
throws AvroSerdeException {
+    Schema oldFixed = LogicalTypes.decimal(5, 
2).addToSchema(Schema.createFixed("decA", null, "ns1", 5));
+    Schema newFixed = LogicalTypes.decimal(5, 
2).addToSchema(Schema.createFixed("decB", null, "ns2", 5));
+    HiveDecimalWritable oldWritable = new 
HiveDecimalWritable(HiveDecimal.create("123.45"));
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldFixed, 
newFixed);
+    assertInstanceOf(HiveDecimalWritable.class, result);
+    assertEquals(new BigDecimal("123.45"), ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, oldFixed, result, newFixed);
+  }
+
+  @Test
+  void testRewriteBooleanPassthrough() {
+    Schema boolSchema = Schema.create(Schema.Type.BOOLEAN);
+    BooleanWritable bool = new BooleanWritable(true);
+    Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(bool, 
boolSchema, boolSchema);
+    assertSame(bool, result);
+  }
+
+  @Test
+  void testUnsupportedRewriteMapToIntThrows() {
+    Schema oldSchema = Schema.createMap(Schema.create(Schema.Type.STRING));
+    Schema newSchema = Schema.create(Schema.Type.INT);
+    assertThrows(HoodieAvroSchemaException.class, () ->
+        HoodieArrayWritableAvroUtils.rewritePrimaryType(new Text("foo"), 
oldSchema, newSchema));
+  }
+
+  @Test
+  void testRewriteIntToDecimalFixed() throws AvroSerdeException {
+    Schema fixedDecimal = LogicalTypes.decimal(8, 
2).addToSchema(Schema.createFixed("dec", null, null, 6));
+    IntWritable oldWritable = new IntWritable(12345);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, 
Schema.create(Schema.Type.INT), fixedDecimal);
+    assertInstanceOf(HiveDecimalWritable.class, result);
+    assertEquals(new BigDecimal("12345"), ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.INT), 
result, fixedDecimal);
+  }
+
+  @Test
+  void testRewriteDoubleToDecimalFixed() throws AvroSerdeException {
+    Schema fixedDecimal = LogicalTypes.decimal(10, 
3).addToSchema(Schema.createFixed("dec", null, null, 8));
+    DoubleWritable oldWritable = new DoubleWritable(987.654);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, 
Schema.create(Schema.Type.DOUBLE), fixedDecimal);
+    assertInstanceOf(HiveDecimalWritable.class, result);
+    assertEquals(new BigDecimal("987.654"), ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.DOUBLE), 
result, fixedDecimal);
+  }
+
+  @Test
+  void testRewriteDecimalBytesToFixed() throws AvroSerdeException {
+    Schema decimalSchema = LogicalTypes.decimal(6, 
2).addToSchema(Schema.createFixed("dec", null, null, 6));
+    BigDecimal value = new BigDecimal("999.99");
+    byte[] unscaledBytes = value.unscaledValue().toByteArray();
+    BytesWritable oldWritable = new BytesWritable(unscaledBytes);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, 
Schema.create(Schema.Type.BYTES), decimalSchema);
+    assertEquals(value, ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.BYTES), 
result, decimalSchema);
+  }
+
+  private void validateRewriteWithAvro(
+      Writable oldWritable,
+      Schema oldSchema,
+      Writable newWritable,
+      Schema newSchema
+  ) throws AvroSerdeException {
+    TypeInfo oldTypeInfo = HiveTypeUtils.generateTypeInfo(oldSchema, 
Collections.emptySet());
+    TypeInfo newTypeInfo = HiveTypeUtils.generateTypeInfo(newSchema, 
Collections.emptySet());
+
+    ObjectInspector oldOI = 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(oldTypeInfo);

Review Comment:
   Although it may be verbose, my preference is to use ObjectInspector instead 
of `OI` in the variable and method names



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##########
@@ -71,35 +77,33 @@
  */
 public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable> {
   protected final HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator 
readerCreator;
-  protected final Map<String, TypeInfo> columnTypeMap;
   private RecordReader<NullWritable, ArrayWritable> firstRecordReader = null;
 
   private final List<String> partitionCols;
   private final Set<String> partitionColSet;
 
   protected 
HiveHoodieReaderContext(HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator
 readerCreator,
                                     List<String> partitionCols,
-                                    ObjectInspectorCache objectInspectorCache,
                                     StorageConfiguration<?> 
storageConfiguration,
                                     HoodieTableConfig tableConfig) {
-    super(storageConfiguration, tableConfig, Option.empty(), Option.empty(), 
new HiveRecordContext(tableConfig, storageConfiguration, objectInspectorCache));
+    super(storageConfiguration, tableConfig, Option.empty(), Option.empty(), 
new HiveRecordContext(tableConfig));
+    if 
(storageConfiguration.getString(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS).isEmpty())
 {
+      // Overriding default treatment of repeated groups in Parquet
+      storageConfiguration.set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, 
"false");
+    }
     this.readerCreator = readerCreator;
     this.partitionCols = partitionCols;
     this.partitionColSet = new HashSet<>(this.partitionCols);
-    this.columnTypeMap = objectInspectorCache.getColumnTypeMap();
   }
 
   private void setSchemas(JobConf jobConf, Schema dataSchema, Schema 
requiredSchema) {
     List<String> dataColumnNameList = dataSchema.getFields().stream().map(f -> 
f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toList());
-    List<TypeInfo> dataColumnTypeList = 
dataColumnNameList.stream().map(fieldName -> {
-      TypeInfo type = columnTypeMap.get(fieldName);
-      if (type == null) {
-        throw new IllegalArgumentException("Field: " + fieldName + ", does not 
have a defined type");
-      }
-      return type;
-    }).collect(Collectors.toList());
     jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", 
dataColumnNameList));
-    jobConf.set(serdeConstants.LIST_COLUMN_TYPES, 
dataColumnTypeList.stream().map(TypeInfo::getQualifiedName).collect(Collectors.joining(",")));
+    try {
+      jobConf.set(serdeConstants.LIST_COLUMN_TYPES, 
HiveTypeUtils.generateColumnTypes(dataSchema).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(",")));
+    } catch (AvroSerdeException e) {
+      throw new RuntimeException(e);

Review Comment:
   nitpick: can we throw a HoodieException with some more context int the error 
message?



##########
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java:
##########
@@ -118,4 +124,282 @@ public void testCastOrderingField() {
     WritableComparable reflexive = new IntWritable(8675309);
     assertEquals(reflexive, 
readerContext.getRecordContext().convertValueToEngineType(reflexive));
   }
+
+  @Test
+  void testRewriteStringToDateInt() throws AvroSerdeException {
+    Schema oldSchema = Schema.create(Schema.Type.STRING);
+    Schema newSchema = 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+    Writable oldWritable = new Text("2023-01-01");
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = 
HoodieHiveUtils.getDateWriteable(HoodieAvroUtils.fromJavaDate(Date.valueOf("2023-01-01")));
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteIntToLong() throws AvroSerdeException {
+    Writable oldWritable = new IntWritable(42);
+    Schema oldSchema = Schema.create(Schema.Type.INT);
+    Schema newSchema = Schema.create(Schema.Type.LONG);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = new LongWritable(42);
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteLongToFloat() throws AvroSerdeException {
+    Writable oldWritable = new LongWritable(123);
+    Schema oldSchema = Schema.create(Schema.Type.LONG);
+    Schema newSchema = Schema.create(Schema.Type.FLOAT);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = new FloatWritable(123.0f);
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteFloatToDouble() throws AvroSerdeException {
+    Writable oldWritable = new FloatWritable(3.14f);
+    Schema oldSchema = Schema.create(Schema.Type.FLOAT);
+    Schema newSchema = Schema.create(Schema.Type.DOUBLE);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = new DoubleWritable(3.14d);
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteBytesToString() throws AvroSerdeException {
+    BytesWritable oldWritable = new BytesWritable("hello".getBytes());
+    Schema oldSchema = Schema.create(Schema.Type.BYTES);
+    Schema newSchema = Schema.create(Schema.Type.STRING);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = new Text("hello");
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteIntToString() throws AvroSerdeException {
+    Writable oldWritable = new IntWritable(123);
+    Schema oldSchema = Schema.create(Schema.Type.INT);
+    Schema newSchema = Schema.create(Schema.Type.STRING);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldSchema, 
newSchema);
+    Writable expected = new Text("123");
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, oldSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteFixedDecimalToString() throws AvroSerdeException {
+    Schema decimalSchema = LogicalTypes.decimal(10, 
2).addToSchema(Schema.createFixed("decimal", null, null, 5));
+    HiveDecimalWritable oldWritable = new 
HiveDecimalWritable(HiveDecimal.create(new BigDecimal("123.45")));
+    Schema newSchema = Schema.create(Schema.Type.STRING);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, decimalSchema, 
newSchema);
+    Writable expected = new Text("123.45");
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, decimalSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteStringToFixedDecimal() throws AvroSerdeException {
+    Schema decimalSchema = LogicalTypes.decimal(10, 
2).addToSchema(Schema.createFixed("decimal", null, null, 5));
+    Writable oldWritable = new Text("123.45");
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, 
Schema.create(Schema.Type.STRING), decimalSchema);
+    assertInstanceOf(HiveDecimalWritable.class, result);
+    assertEquals(new BigDecimal("123.45"), ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.STRING), 
result, decimalSchema);
+  }
+
+  @Test
+  void testRewriteBytesToFixedDecimal() throws AvroSerdeException {
+    BigDecimal input = new BigDecimal("123.45");
+    byte[] bytes = input.unscaledValue().toByteArray();
+    BytesWritable oldWritable = new BytesWritable(bytes);
+    Schema decimalSchema = LogicalTypes.decimal(5, 
2).addToSchema(Schema.createFixed("decimal", null, null, 5));
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, 
Schema.create(Schema.Type.BYTES), decimalSchema);
+    assertEquals(input, ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.BYTES), 
result, decimalSchema);
+  }
+
+  @Test
+  void testUnsupportedTypeConversionThrows() {
+    Schema oldSchema = Schema.createMap(Schema.create(Schema.Type.INT));
+    Schema newSchema = Schema.create(Schema.Type.STRING);
+    assertThrows(HoodieAvroSchemaException.class, () ->
+        HoodieArrayWritableAvroUtils.rewritePrimaryType(null, oldSchema, 
newSchema));
+  }
+
+  @Test
+  void testRewriteEnumToString() throws AvroSerdeException {
+    Schema enumSchema = Schema.createEnum("TestEnum", null, null, 
Arrays.asList("A", "B", "C"));
+    Writable oldWritable = new Text("B");
+    Schema newSchema = Schema.create(Schema.Type.STRING);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, enumSchema, 
newSchema);
+    Writable expected = new Text("B");
+    assertEquals(expected, result);
+    validateRewriteWithAvro(oldWritable, enumSchema, result, newSchema);
+  }
+
+  @Test
+  void testRewriteFixedWithSameSizeAndFullName() {
+    Schema oldFixed = Schema.createFixed("decimal", null, "ns", 5);
+    Schema newFixed = Schema.createFixed("decimal", null, "ns", 5);
+    HiveDecimalWritable hdw = new 
HiveDecimalWritable(HiveDecimal.create("123.45"));
+    Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(hdw, 
oldFixed, newFixed);
+    assertSame(hdw, result);
+  }
+
+  @Test
+  void testRewriteFixedWithSameSizeButDifferentNameUsesDecimalFallback() 
throws AvroSerdeException {
+    Schema oldFixed = LogicalTypes.decimal(5, 
2).addToSchema(Schema.createFixed("decA", null, "ns1", 5));
+    Schema newFixed = LogicalTypes.decimal(5, 
2).addToSchema(Schema.createFixed("decB", null, "ns2", 5));
+    HiveDecimalWritable oldWritable = new 
HiveDecimalWritable(HiveDecimal.create("123.45"));
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, oldFixed, 
newFixed);
+    assertInstanceOf(HiveDecimalWritable.class, result);
+    assertEquals(new BigDecimal("123.45"), ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, oldFixed, result, newFixed);
+  }
+
+  @Test
+  void testRewriteBooleanPassthrough() {
+    Schema boolSchema = Schema.create(Schema.Type.BOOLEAN);
+    BooleanWritable bool = new BooleanWritable(true);
+    Writable result = HoodieArrayWritableAvroUtils.rewritePrimaryType(bool, 
boolSchema, boolSchema);
+    assertSame(bool, result);
+  }
+
+  @Test
+  void testUnsupportedRewriteMapToIntThrows() {
+    Schema oldSchema = Schema.createMap(Schema.create(Schema.Type.STRING));
+    Schema newSchema = Schema.create(Schema.Type.INT);
+    assertThrows(HoodieAvroSchemaException.class, () ->
+        HoodieArrayWritableAvroUtils.rewritePrimaryType(new Text("foo"), 
oldSchema, newSchema));
+  }
+
+  @Test
+  void testRewriteIntToDecimalFixed() throws AvroSerdeException {
+    Schema fixedDecimal = LogicalTypes.decimal(8, 
2).addToSchema(Schema.createFixed("dec", null, null, 6));
+    IntWritable oldWritable = new IntWritable(12345);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, 
Schema.create(Schema.Type.INT), fixedDecimal);
+    assertInstanceOf(HiveDecimalWritable.class, result);
+    assertEquals(new BigDecimal("12345"), ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.INT), 
result, fixedDecimal);
+  }
+
+  @Test
+  void testRewriteDoubleToDecimalFixed() throws AvroSerdeException {
+    Schema fixedDecimal = LogicalTypes.decimal(10, 
3).addToSchema(Schema.createFixed("dec", null, null, 8));
+    DoubleWritable oldWritable = new DoubleWritable(987.654);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, 
Schema.create(Schema.Type.DOUBLE), fixedDecimal);
+    assertInstanceOf(HiveDecimalWritable.class, result);
+    assertEquals(new BigDecimal("987.654"), ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.DOUBLE), 
result, fixedDecimal);
+  }
+
+  @Test
+  void testRewriteDecimalBytesToFixed() throws AvroSerdeException {
+    Schema decimalSchema = LogicalTypes.decimal(6, 
2).addToSchema(Schema.createFixed("dec", null, null, 6));
+    BigDecimal value = new BigDecimal("999.99");
+    byte[] unscaledBytes = value.unscaledValue().toByteArray();
+    BytesWritable oldWritable = new BytesWritable(unscaledBytes);
+    Writable result = 
HoodieArrayWritableAvroUtils.rewritePrimaryType(oldWritable, 
Schema.create(Schema.Type.BYTES), decimalSchema);
+    assertEquals(value, ((HiveDecimalWritable) 
result).getHiveDecimal().bigDecimalValue());
+    validateRewriteWithAvro(oldWritable, Schema.create(Schema.Type.BYTES), 
result, decimalSchema);
+  }
+
+  private void validateRewriteWithAvro(
+      Writable oldWritable,
+      Schema oldSchema,
+      Writable newWritable,
+      Schema newSchema
+  ) throws AvroSerdeException {
+    TypeInfo oldTypeInfo = HiveTypeUtils.generateTypeInfo(oldSchema, 
Collections.emptySet());
+    TypeInfo newTypeInfo = HiveTypeUtils.generateTypeInfo(newSchema, 
Collections.emptySet());
+
+    ObjectInspector oldOI = 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(oldTypeInfo);
+    ObjectInspector newOI = 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(newTypeInfo);
+
+    ObjectInspector writableOIOld = getWritableOIForType(oldTypeInfo);
+    ObjectInspector writableOINew = getWritableOIForType(newTypeInfo);
+
+    Object javaInput = ObjectInspectorConverters.getConverter(writableOIOld, 
oldOI).convert(oldWritable);
+    if (isDecimalSchema(oldSchema)) {
+      javaInput = 
HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(getDecimalValue(javaInput, 
oldSchema), oldSchema, oldSchema.getLogicalType());
+    } else if (javaInput instanceof byte[]) {
+      javaInput = ByteBuffer.wrap((byte[]) javaInput);
+    }
+    Object javaOutput = HoodieAvroUtils.rewritePrimaryType(javaInput, 
oldSchema, newSchema);
+    Object javaExpected = 
ObjectInspectorConverters.getConverter(writableOINew, 
newOI).convert(newWritable);
+
+    if (isDecimalSchema(newSchema)) {
+      BigDecimal outputDecimal = getDecimalValue(javaOutput, newSchema);
+      BigDecimal expectedDecimal = getDecimalValue(javaExpected, newSchema);
+      assertEquals(0, outputDecimal.compareTo(expectedDecimal));
+    } else if (newSchema.getLogicalType() instanceof LogicalTypes.Date) {
+      assertEquals(HoodieAvroUtils.toJavaDate((int) javaOutput), javaExpected);
+    } else {
+      assertEquals(javaOutput, javaExpected);
+    }
+  }
+
+  private boolean isDecimalSchema(Schema schema) {
+    return schema.getLogicalType() instanceof LogicalTypes.Decimal;
+  }
+
+  private BigDecimal getDecimalValue(Object value, Schema decimalSchema) {
+    if (value instanceof HiveDecimal) {
+      return ((HiveDecimal) value).bigDecimalValue();
+    } else if (value instanceof HiveDecimalWritable) {
+      return ((HiveDecimalWritable) value).getHiveDecimal().bigDecimalValue();
+    } else if (value instanceof BigDecimal) {
+      return (BigDecimal) value;
+    } else if (value instanceof byte[]) {
+      int scale = ((LogicalTypes.Decimal) 
decimalSchema.getLogicalType()).getScale();
+      return new BigDecimal(new java.math.BigInteger((byte[]) value), scale);

Review Comment:
   Can we import BigDecimal?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java:
##########
@@ -19,20 +19,356 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
+import org.apache.hadoop.hive.serde2.avro.HiveTypeUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
+import static org.apache.hudi.avro.HoodieAvroUtils.createFullName;
+import static org.apache.hudi.avro.HoodieAvroUtils.createNamePrefix;
+import static org.apache.hudi.avro.HoodieAvroUtils.getOldFieldNameWithRenaming;
+import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDate;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
 public class HoodieArrayWritableAvroUtils {
 
+  public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable 
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+    return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema, 
newSchema, renameCols, new LinkedList<>());
+  }
+
+  private static Writable rewriteRecordWithNewSchema(Writable writable, Schema 
oldAvroSchema, Schema newAvroSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+    if (writable == null) {
+      return null;
+    }
+    Schema oldSchema = AvroSchemaUtils.resolveNullableSchema(oldAvroSchema);
+    Schema newSchema = AvroSchemaUtils.resolveNullableSchema(newAvroSchema);
+    if (oldSchema.equals(newSchema)) {
+      return writable;
+    }
+    return rewriteRecordWithNewSchemaInternal(writable, oldSchema, newSchema, 
renameCols, fieldNames);
+  }
+
+  private static Writable rewriteRecordWithNewSchemaInternal(Writable 
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as a record", writable.getClass().getName()));
+        }
+
+        ArrayWritable arrayWritable = (ArrayWritable) writable;
+        List<Schema.Field> fields = newSchema.getFields();
+        // projection will keep the size from the "from" schema because it 
gets recycled
+        // and if the size changes the reader will fail
+        boolean noFieldsRenaming = renameCols.isEmpty();
+        String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames);
+        Writable[] values = new Writable[Math.max(fields.size(), 
arrayWritable.get().length)];
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field newField = newSchema.getFields().get(i);
+          String newFieldName = newField.name();
+          fieldNames.push(newFieldName);
+          Schema.Field oldField = noFieldsRenaming
+              ? oldSchema.getField(newFieldName)
+              : oldSchema.getField(getOldFieldNameWithRenaming(namePrefix, 
newFieldName, renameCols));
+          if (oldField != null) {
+            values[i] = 
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], 
oldField.schema(), newField.schema(), renameCols, fieldNames);
+          } else if (newField.defaultVal() instanceof JsonProperties.Null) {
+            values[i] = NullWritable.get();
+          } else if (!isNullable(newField.schema()) && newField.defaultVal() 
== null) {
+            throw new SchemaCompatibilityException("Field " + 
createFullName(fieldNames) + " has no default value and is non-nullable");
+          } else if (newField.defaultVal() != null) {
+            switch 
(AvroSchemaUtils.resolveNullableSchema(newField.schema()).getType()) {
+              case BOOLEAN:
+                values[i] = new BooleanWritable((Boolean) 
newField.defaultVal());
+                break;
+              case INT:
+                values[i] = new IntWritable((Integer) newField.defaultVal());
+                break;
+              case LONG:
+                values[i] = new LongWritable((Long) newField.defaultVal());
+                break;
+              case FLOAT:
+                values[i] = new FloatWritable((Float) newField.defaultVal());
+                break;
+              case DOUBLE:
+                values[i] = new DoubleWritable((Double) newField.defaultVal());
+                break;
+              case STRING:
+                values[i] = new Text(newField.defaultVal().toString());
+                break;
+              default:
+                throw new SchemaCompatibilityException("Field " + 
createFullName(fieldNames) + " has no default value");
+            }
+          }
+          fieldNames.pop();
+        }
+        return new ArrayWritable(Writable.class, values);
+
+      case ENUM:
+        if ((writable instanceof BytesWritable)) {
+          return writable;
+        }
+        if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() 
!= Schema.Type.ENUM) {
+          throw new SchemaCompatibilityException(String.format("Only ENUM or 
STRING type can be converted ENUM type. Schema type was %s", 
oldSchema.getType().getName()));
+        }
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return new BytesWritable(((Text) writable).copyBytes());
+        }
+        return writable;
+      case ARRAY:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as an array", writable.getClass().getName()));
+        }
+        ArrayWritable array = (ArrayWritable) writable;
+        fieldNames.push("element");
+        for (int i = 0; i < array.get().length; i++) {
+          array.get()[i] = rewriteRecordWithNewSchema(array.get()[i], 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames);
+        }
+        fieldNames.pop();
+        return array;
+      case MAP:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as a map", writable.getClass().getName()));
+        }
+        ArrayWritable map = (ArrayWritable) writable;
+        fieldNames.push("value");
+        for (int i = 0; i < map.get().length; i++) {
+          Writable mapEntry = map.get()[i];
+          ((ArrayWritable) mapEntry).get()[1] = 
rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1], 
oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames);
+        }
+        return map;
+
+      case UNION:
+        throw new IllegalArgumentException("should not be here?");
+
+      default:
+        return rewritePrimaryType(writable, oldSchema, newSchema);
+    }
+  }
+
+  public static Writable rewritePrimaryType(Writable writable, Schema 
oldSchema, Schema newSchema) {
+    if (oldSchema.getType() == newSchema.getType()) {
+      switch (oldSchema.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+          return writable;
+        case FIXED:
+          if (oldSchema.getFixedSize() != newSchema.getFixedSize()) {
+            // Check whether this is a [[Decimal]]'s precision change
+            if (oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+              LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+              return 
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+            } else {
+              throw new HoodieAvroSchemaException("Fixed type size change is 
not currently supported");
+            }
+          }
+
+          // For [[Fixed]] data type both size and name have to match
+          //
+          // NOTE: That for values wrapped into [[Union]], to make sure that 
reverse lookup (by
+          //       full-name) is working we have to make sure that both 
schema's name and namespace
+          //       do match
+          if (Objects.equals(oldSchema.getFullName(), 
newSchema.getFullName())) {
+            return writable;
+          } else {
+            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+            return 
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+          }
+
+        default:
+          throw new HoodieAvroSchemaException("Unknown schema type: " + 
newSchema.getType());
+      }
+    } else {
+      return rewritePrimaryTypeWithDiffSchemaType(writable, oldSchema, 
newSchema);
+    }
+  }
+
+  private static Writable rewritePrimaryTypeWithDiffSchemaType(Writable 
writable, Schema oldSchema, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case NULL:
+      case BOOLEAN:
+        break;
+      case INT:
+        if (newSchema.getLogicalType() == LogicalTypes.date() && 
oldSchema.getType() == Schema.Type.STRING) {
+          return 
HoodieHiveUtils.getDateWriteable((HoodieAvroUtils.fromJavaDate(java.sql.Date.valueOf(writable.toString()))));
+        }
+        break;
+      case LONG:
+        if (oldSchema.getType() == Schema.Type.INT) {
+          return new LongWritable(((IntWritable) writable).get());
+        }
+        break;
+      case FLOAT:
+        if ((oldSchema.getType() == Schema.Type.INT)
+            || (oldSchema.getType() == Schema.Type.LONG)) {
+          return oldSchema.getType() == Schema.Type.INT
+              ? new FloatWritable(((IntWritable) writable).get())
+              : new FloatWritable(((LongWritable) writable).get());
+        }
+        break;
+      case DOUBLE:
+        if (oldSchema.getType() == Schema.Type.FLOAT) {
+          // java float cannot convert to double directly, deal with float 
precision change
+          return new DoubleWritable(Double.parseDouble(((FloatWritable) 
writable).get() + ""));
+        } else if (oldSchema.getType() == Schema.Type.INT) {
+          return new DoubleWritable(((IntWritable) writable).get());
+        } else if (oldSchema.getType() == Schema.Type.LONG) {
+          return new DoubleWritable(((LongWritable) writable).get());
+        }
+        break;
+      case BYTES:
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return new BytesWritable(getUTF8Bytes(writable.toString()));
+        }
+        break;
+      case STRING:
+        if (oldSchema.getType() == Schema.Type.ENUM) {
+          return writable;
+        }
+        if (oldSchema.getType() == Schema.Type.BYTES) {
+          return new Text(StringUtils.fromUTF8Bytes(((BytesWritable) 
writable).getBytes()));
+        }
+        if (oldSchema.getLogicalType() == LogicalTypes.date()) {
+          return new Text(toJavaDate(((IntWritable) 
writable).get()).toString());
+        }
+        if (oldSchema.getType() == Schema.Type.INT
+            || oldSchema.getType() == Schema.Type.LONG
+            || oldSchema.getType() == Schema.Type.FLOAT
+            || oldSchema.getType() == Schema.Type.DOUBLE) {
+          return new Text(writable.toString());
+        }
+        if (oldSchema.getType() == Schema.Type.FIXED && 
oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          HiveDecimalWritable hdw = (HiveDecimalWritable) writable;
+          return new 
Text(hdw.getHiveDecimal().bigDecimalValue().toPlainString());
+        }
+        break;
+      case FIXED:
+        if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
newSchema.getLogicalType();
+          DecimalTypeInfo decimalTypeInfo = new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale());
+
+          if (oldSchema.getType() == Schema.Type.STRING
+              || oldSchema.getType() == Schema.Type.INT
+              || oldSchema.getType() == Schema.Type.LONG
+              || oldSchema.getType() == Schema.Type.FLOAT
+              || oldSchema.getType() == Schema.Type.DOUBLE) {
+            // loses trailing zeros due to hive issue. Since we only read with 
hive, I think this is fine
+            HiveDecimalWritable converted = new 
HiveDecimalWritable(HiveDecimal.create(new 
java.math.BigDecimal(writable.toString())));
+            return HiveDecimalUtils.enforcePrecisionScale(converted, 
decimalTypeInfo);
+          }
+
+          if (oldSchema.getType() == Schema.Type.BYTES) {
+            ByteBuffer buffer = ByteBuffer.wrap(((BytesWritable) 
writable).getBytes());
+            BigDecimal bd = new BigDecimal(new BigInteger(buffer.array()), 
decimal.getScale());
+            HiveDecimalWritable converted = new 
HiveDecimalWritable(HiveDecimal.create(bd));
+            return HiveDecimalUtils.enforcePrecisionScale(converted, 
decimalTypeInfo);
+          }
+        }
+        break;
+      default:
+    }
+    throw new HoodieAvroSchemaException(String.format("cannot support rewrite 
value for schema type: %s since the old schema type is: %s", newSchema, 
oldSchema));
+  }
+
+  private static final Cache<Schema, ArrayWritableObjectInspector> 
OBJECT_INSPECTOR_CACHE =

Review Comment:
   Previously, the ObjectInspectorCache has this as an instance variable so the 
lifecycle of the cache is tied to that instance. Now this will live on for the 
life of the jvm it runs in. What is the reason for this change?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java:
##########
@@ -19,20 +19,356 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
+import org.apache.hadoop.hive.serde2.avro.HiveTypeUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
+import static org.apache.hudi.avro.HoodieAvroUtils.createFullName;
+import static org.apache.hudi.avro.HoodieAvroUtils.createNamePrefix;
+import static org.apache.hudi.avro.HoodieAvroUtils.getOldFieldNameWithRenaming;
+import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDate;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
 public class HoodieArrayWritableAvroUtils {
 
+  public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable 
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+    return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema, 
newSchema, renameCols, new LinkedList<>());
+  }
+
+  private static Writable rewriteRecordWithNewSchema(Writable writable, Schema 
oldAvroSchema, Schema newAvroSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+    if (writable == null) {
+      return null;
+    }
+    Schema oldSchema = AvroSchemaUtils.resolveNullableSchema(oldAvroSchema);
+    Schema newSchema = AvroSchemaUtils.resolveNullableSchema(newAvroSchema);
+    if (oldSchema.equals(newSchema)) {
+      return writable;
+    }
+    return rewriteRecordWithNewSchemaInternal(writable, oldSchema, newSchema, 
renameCols, fieldNames);
+  }
+
+  private static Writable rewriteRecordWithNewSchemaInternal(Writable 
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as a record", writable.getClass().getName()));
+        }
+
+        ArrayWritable arrayWritable = (ArrayWritable) writable;
+        List<Schema.Field> fields = newSchema.getFields();
+        // projection will keep the size from the "from" schema because it 
gets recycled
+        // and if the size changes the reader will fail
+        boolean noFieldsRenaming = renameCols.isEmpty();
+        String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames);
+        Writable[] values = new Writable[Math.max(fields.size(), 
arrayWritable.get().length)];
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field newField = newSchema.getFields().get(i);
+          String newFieldName = newField.name();
+          fieldNames.push(newFieldName);
+          Schema.Field oldField = noFieldsRenaming
+              ? oldSchema.getField(newFieldName)
+              : oldSchema.getField(getOldFieldNameWithRenaming(namePrefix, 
newFieldName, renameCols));
+          if (oldField != null) {
+            values[i] = 
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], 
oldField.schema(), newField.schema(), renameCols, fieldNames);
+          } else if (newField.defaultVal() instanceof JsonProperties.Null) {
+            values[i] = NullWritable.get();
+          } else if (!isNullable(newField.schema()) && newField.defaultVal() 
== null) {
+            throw new SchemaCompatibilityException("Field " + 
createFullName(fieldNames) + " has no default value and is non-nullable");
+          } else if (newField.defaultVal() != null) {
+            switch 
(AvroSchemaUtils.resolveNullableSchema(newField.schema()).getType()) {
+              case BOOLEAN:
+                values[i] = new BooleanWritable((Boolean) 
newField.defaultVal());
+                break;
+              case INT:
+                values[i] = new IntWritable((Integer) newField.defaultVal());
+                break;
+              case LONG:
+                values[i] = new LongWritable((Long) newField.defaultVal());
+                break;
+              case FLOAT:
+                values[i] = new FloatWritable((Float) newField.defaultVal());
+                break;
+              case DOUBLE:
+                values[i] = new DoubleWritable((Double) newField.defaultVal());
+                break;
+              case STRING:
+                values[i] = new Text(newField.defaultVal().toString());
+                break;
+              default:
+                throw new SchemaCompatibilityException("Field " + 
createFullName(fieldNames) + " has no default value");
+            }
+          }
+          fieldNames.pop();
+        }
+        return new ArrayWritable(Writable.class, values);
+
+      case ENUM:
+        if ((writable instanceof BytesWritable)) {
+          return writable;
+        }
+        if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() 
!= Schema.Type.ENUM) {
+          throw new SchemaCompatibilityException(String.format("Only ENUM or 
STRING type can be converted ENUM type. Schema type was %s", 
oldSchema.getType().getName()));
+        }
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return new BytesWritable(((Text) writable).copyBytes());
+        }
+        return writable;
+      case ARRAY:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as an array", writable.getClass().getName()));
+        }
+        ArrayWritable array = (ArrayWritable) writable;
+        fieldNames.push("element");
+        for (int i = 0; i < array.get().length; i++) {
+          array.get()[i] = rewriteRecordWithNewSchema(array.get()[i], 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames);
+        }
+        fieldNames.pop();
+        return array;
+      case MAP:
+        if (!(writable instanceof ArrayWritable)) {
+          throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as a map", writable.getClass().getName()));
+        }
+        ArrayWritable map = (ArrayWritable) writable;
+        fieldNames.push("value");
+        for (int i = 0; i < map.get().length; i++) {
+          Writable mapEntry = map.get()[i];
+          ((ArrayWritable) mapEntry).get()[1] = 
rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1], 
oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames);
+        }
+        return map;
+
+      case UNION:
+        throw new IllegalArgumentException("should not be here?");
+
+      default:
+        return rewritePrimaryType(writable, oldSchema, newSchema);
+    }
+  }
+
+  public static Writable rewritePrimaryType(Writable writable, Schema 
oldSchema, Schema newSchema) {
+    if (oldSchema.getType() == newSchema.getType()) {
+      switch (oldSchema.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+          return writable;
+        case FIXED:
+          if (oldSchema.getFixedSize() != newSchema.getFixedSize()) {
+            // Check whether this is a [[Decimal]]'s precision change
+            if (oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+              LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+              return 
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+            } else {
+              throw new HoodieAvroSchemaException("Fixed type size change is 
not currently supported");
+            }
+          }
+
+          // For [[Fixed]] data type both size and name have to match
+          //
+          // NOTE: That for values wrapped into [[Union]], to make sure that 
reverse lookup (by
+          //       full-name) is working we have to make sure that both 
schema's name and namespace
+          //       do match
+          if (Objects.equals(oldSchema.getFullName(), 
newSchema.getFullName())) {
+            return writable;
+          } else {
+            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+            return 
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+          }
+
+        default:
+          throw new HoodieAvroSchemaException("Unknown schema type: " + 
newSchema.getType());
+      }
+    } else {
+      return rewritePrimaryTypeWithDiffSchemaType(writable, oldSchema, 
newSchema);
+    }
+  }
+
+  private static Writable rewritePrimaryTypeWithDiffSchemaType(Writable 
writable, Schema oldSchema, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case NULL:
+      case BOOLEAN:
+        break;
+      case INT:
+        if (newSchema.getLogicalType() == LogicalTypes.date() && 
oldSchema.getType() == Schema.Type.STRING) {
+          return 
HoodieHiveUtils.getDateWriteable((HoodieAvroUtils.fromJavaDate(java.sql.Date.valueOf(writable.toString()))));
+        }
+        break;
+      case LONG:
+        if (oldSchema.getType() == Schema.Type.INT) {
+          return new LongWritable(((IntWritable) writable).get());
+        }
+        break;
+      case FLOAT:
+        if ((oldSchema.getType() == Schema.Type.INT)
+            || (oldSchema.getType() == Schema.Type.LONG)) {
+          return oldSchema.getType() == Schema.Type.INT
+              ? new FloatWritable(((IntWritable) writable).get())
+              : new FloatWritable(((LongWritable) writable).get());
+        }
+        break;
+      case DOUBLE:
+        if (oldSchema.getType() == Schema.Type.FLOAT) {
+          // java float cannot convert to double directly, deal with float 
precision change
+          return new DoubleWritable(Double.parseDouble(((FloatWritable) 
writable).get() + ""));
+        } else if (oldSchema.getType() == Schema.Type.INT) {
+          return new DoubleWritable(((IntWritable) writable).get());
+        } else if (oldSchema.getType() == Schema.Type.LONG) {
+          return new DoubleWritable(((LongWritable) writable).get());
+        }
+        break;
+      case BYTES:
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return new BytesWritable(getUTF8Bytes(writable.toString()));
+        }
+        break;
+      case STRING:
+        if (oldSchema.getType() == Schema.Type.ENUM) {
+          return writable;
+        }
+        if (oldSchema.getType() == Schema.Type.BYTES) {
+          return new Text(StringUtils.fromUTF8Bytes(((BytesWritable) 
writable).getBytes()));
+        }
+        if (oldSchema.getLogicalType() == LogicalTypes.date()) {
+          return new Text(toJavaDate(((IntWritable) 
writable).get()).toString());
+        }
+        if (oldSchema.getType() == Schema.Type.INT
+            || oldSchema.getType() == Schema.Type.LONG
+            || oldSchema.getType() == Schema.Type.FLOAT
+            || oldSchema.getType() == Schema.Type.DOUBLE) {
+          return new Text(writable.toString());
+        }
+        if (oldSchema.getType() == Schema.Type.FIXED && 
oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          HiveDecimalWritable hdw = (HiveDecimalWritable) writable;
+          return new 
Text(hdw.getHiveDecimal().bigDecimalValue().toPlainString());
+        }
+        break;
+      case FIXED:
+        if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
newSchema.getLogicalType();
+          DecimalTypeInfo decimalTypeInfo = new 
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale());
+
+          if (oldSchema.getType() == Schema.Type.STRING
+              || oldSchema.getType() == Schema.Type.INT
+              || oldSchema.getType() == Schema.Type.LONG
+              || oldSchema.getType() == Schema.Type.FLOAT
+              || oldSchema.getType() == Schema.Type.DOUBLE) {
+            // loses trailing zeros due to hive issue. Since we only read with 
hive, I think this is fine
+            HiveDecimalWritable converted = new 
HiveDecimalWritable(HiveDecimal.create(new 
java.math.BigDecimal(writable.toString())));

Review Comment:
   nit: use `BigDecimal` instead of fully qualified name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to