alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1090883889


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -18,304 +18,413 @@
 
 package org.apache.spark.sql
 
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
 import org.apache.avro.Schema
 import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
 import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
 import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, 
composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeArrayData, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections, 
Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
 
 object HoodieInternalRowUtils {
 
-  // Projection are all thread local. Projection is not thread-safe
-  val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType, 
StructType), UnsafeProjection]] =
-    ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType), 
UnsafeProjection]] {
-      override def get(): HashMap[(StructType, StructType), UnsafeProjection] 
= new HashMap[(StructType, StructType), UnsafeProjection]
+  private type RenamedColumnMap = JMap[String, String]
+  private type UnsafeRowWriter = InternalRow => UnsafeRow
+
+  // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]] 
since these are not thread-safe
+  private val unsafeWriterThreadLocal: 
ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap), 
UnsafeRowWriter]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, 
StructType, RenamedColumnMap), UnsafeRowWriter]] {
+      override def get(): mutable.HashMap[(StructType, StructType, 
RenamedColumnMap), UnsafeRowWriter] =
+        new mutable.HashMap[(StructType, StructType, RenamedColumnMap), 
UnsafeRowWriter]
     })
-  val schemaMap = new ConcurrentHashMap[Schema, StructType]
-  val orderPosListMap = new ConcurrentHashMap[(StructType, String), 
NestedFieldPath]
 
-  /**
-   * @see 
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord,
 org.apache.avro.Schema)
-   */
-  def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: 
StructType): InternalRow = {
-    val newRow = new 
GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
-
-    for ((field, pos) <- newSchema.fields.zipWithIndex) {
-      var oldValue: AnyRef = null
-      var oldType: DataType = null
-      if (existField(oldSchema, field.name)) {
-        val oldField = oldSchema(field.name)
-        val oldPos = oldSchema.fieldIndex(field.name)
-        oldType = oldField.dataType
-        oldValue = oldRecord.get(oldPos, oldType)
-      }
-      if (oldValue != null) {
-        field.dataType match {
-          case structType: StructType =>
-            val oldType = 
oldSchema(field.name).dataType.asInstanceOf[StructType]
-            val newValue = rewriteRecord(oldValue.asInstanceOf[InternalRow], 
oldType, structType)
-            newRow.update(pos, newValue)
-          case decimalType: DecimalType =>
-            val oldFieldSchema = 
oldSchema(field.name).dataType.asInstanceOf[DecimalType]
-            if (decimalType.scale != oldFieldSchema.scale || 
decimalType.precision != oldFieldSchema.precision) {
-              newRow.update(pos, 
Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-              )
-            } else {
-              newRow.update(pos, oldValue)
-            }
-          case t if t == oldType => newRow.update(pos, oldValue)
-          // Type promotion
-          case _: ShortType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, 
oldValue.asInstanceOf[Byte].toShort)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and 
$newSchema are incompatible")
-            }
-          case _: IntegerType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, 
oldValue.asInstanceOf[Byte].toInt)
-              case _: ShortType => newRow.update(pos, 
oldValue.asInstanceOf[Short].toInt)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and 
$newSchema are incompatible")
-            }
-          case _: LongType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, 
oldValue.asInstanceOf[Byte].toLong)
-              case _: ShortType => newRow.update(pos, 
oldValue.asInstanceOf[Short].toLong)
-              case _: IntegerType => newRow.update(pos, 
oldValue.asInstanceOf[Int].toLong)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and 
$newSchema are incompatible")
-            }
-          case _: FloatType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, 
oldValue.asInstanceOf[Byte].toFloat)
-              case _: ShortType => newRow.update(pos, 
oldValue.asInstanceOf[Short].toFloat)
-              case _: IntegerType => newRow.update(pos, 
oldValue.asInstanceOf[Int].toFloat)
-              case _: LongType => newRow.update(pos, 
oldValue.asInstanceOf[Long].toFloat)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and 
$newSchema are incompatible")
-            }
-          case _: DoubleType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, 
oldValue.asInstanceOf[Byte].toDouble)
-              case _: ShortType => newRow.update(pos, 
oldValue.asInstanceOf[Short].toDouble)
-              case _: IntegerType => newRow.update(pos, 
oldValue.asInstanceOf[Int].toDouble)
-              case _: LongType => newRow.update(pos, 
oldValue.asInstanceOf[Long].toDouble)
-              case _: FloatType => newRow.update(pos, 
oldValue.asInstanceOf[Float].toDouble)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and 
$newSchema are incompatible")
-            }
-          case _: BinaryType if oldType.isInstanceOf[StringType] => 
newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
-          case _ => newRow.update(pos, oldValue)
-        }
-      } else {
-        // TODO default value in newSchema
-      }
-    }
+  // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]] 
since these are not thread-safe
+  private val unsafeProjectionThreadLocal: 
ThreadLocal[mutable.HashMap[(StructType, StructType), UnsafeProjection]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, 
StructType), UnsafeProjection]] {
+      override def get(): mutable.HashMap[(StructType, StructType), 
UnsafeProjection] =
+        new mutable.HashMap[(StructType, StructType), UnsafeProjection]
+    })
 
-    newRow
-  }
+  private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+  private val orderPosListMap = new ConcurrentHashMap[(StructType, String), 
Option[NestedFieldPath]]
 
   /**
-   * @see 
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord,
 org.apache.avro.Schema, java.util.Map)
+   * Provides cached instance of [[UnsafeProjection]] transforming provided 
[[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * For more details regarding its semantic, please check corresponding 
scala-doc for
+   * [[HoodieCatalystExpressionUtils.generateUnsafeProjection]]
    */
-  def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: 
StructType, newSchema: StructType, renameCols: java.util.Map[String, String]): 
InternalRow = {
-    rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, 
new java.util.LinkedList[String]).asInstanceOf[InternalRow]
+  def getCachedUnsafeProjection(from: StructType, to: StructType): 
UnsafeProjection = {
+    unsafeProjectionThreadLocal.get()
+      .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
   }
 
   /**
-   * @see 
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object,
 org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque)
+   * Provides cached instance of [[UnsafeRowWriter]] transforming provided 
[[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * Unlike [[UnsafeProjection]] requiring that [[from]] has to be a proper 
subset of [[to]] schema,
+   * [[UnsafeRowWriter]] is able to perform whole spectrum of schema-evolution 
transformations including:
+   *
+   * <ul>
+   *   <li>Transforming nested structs/maps/arrays</li>
+   *   <li>Handling type promotions (int -> long, etc)</li>
+   *   <li>Handling (field) renames</li>
+   * </ul>
    */
-  private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, 
newSchema: DataType, renameCols: java.util.Map[String, String], fieldNames: 
java.util.Deque[String]): Any = {
-    if (oldRecord == null) {
-      null
-    } else {
-      newSchema match {
-        case targetSchema: StructType =>
-          if (!oldRecord.isInstanceOf[InternalRow]) {
-            throw new IllegalArgumentException("cannot rewrite record with 
different type")
-          }
-          val oldRow = oldRecord.asInstanceOf[InternalRow]
-          val helper = mutable.Map[Integer, Any]()
-
-          val oldStrucType = oldSchema.asInstanceOf[StructType]
-          targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
-            fieldNames.push(field.name)
-            if (existField(oldStrucType, field.name)) {
-              val oldField = oldStrucType(field.name)
-              val oldPos = oldStrucType.fieldIndex(field.name)
-              helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, 
oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-            } else {
-              val fieldFullName = createFullName(fieldNames)
-              val colNamePartsFromOldSchema = 
renameCols.getOrDefault(fieldFullName, "").split("\\.")
-              val lastColNameFromOldSchema = 
colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
-              // deal with rename
-              if (!existField(oldStrucType, field.name) && 
existField(oldStrucType, lastColNameFromOldSchema)) {
-                // find rename
-                val oldField = oldStrucType(lastColNameFromOldSchema)
-                val oldPos = oldStrucType.fieldIndex(lastColNameFromOldSchema)
-                helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, 
oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-              }
-            }
-            fieldNames.pop()
-          }
-          val newRow = new 
GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
-          targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
-            if (helper.contains(i)) {
-              newRow.update(i, helper(i))
-            } else {
-              // TODO add default val
-              newRow.update(i, null)
-            }
-          }
+  def getCachedUnsafeRowWriter(from: StructType, to: StructType, 
renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()): 
UnsafeRowWriter = {
+    unsafeWriterThreadLocal.get()
+      .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from, 
to, renamedColumnsMap))
+  }
 
-          newRow
-        case targetSchema: ArrayType =>
-          if (!oldRecord.isInstanceOf[ArrayData]) {
-            throw new IllegalArgumentException("cannot rewrite record with 
different type")
-          }
-          val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
-          val oldArray = oldRecord.asInstanceOf[ArrayData]
-          val newElementType = targetSchema.elementType
-          val newArray = new 
GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
-          fieldNames.push("element")
-          oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case 
(value, i) => newArray.update(i, 
rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, 
newElementType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newArray
-        case targetSchema: MapType =>
-          if (!oldRecord.isInstanceOf[MapData]) {
-            throw new IllegalArgumentException("cannot rewrite record with 
different type")
-          }
-          val oldValueType = oldSchema.asInstanceOf[MapType].valueType
-          val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
-          val oldMap = oldRecord.asInstanceOf[MapData]
-          val newValueType = targetSchema.valueType
-          val newKeyArray = new 
GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newValueArray = new 
GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
-          fieldNames.push("value")
-          oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case 
(value, i) => newKeyArray.update(i, rewritePrimaryType(value, oldKeyType, 
oldKeyType)) }
-          oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { 
case (value, i) => newValueArray.update(i, 
rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, 
newValueType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newMap
-        case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
-      }
+  def getCachedPosList(structType: StructType, field: String): 
Option[NestedFieldPath] = {
+    val nestedFieldPathOpt = orderPosListMap.get((structType, field))
+    // NOTE: This specifically designed to do 2 lookups (in case of 
cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more 
frequent cache-hit path
+    if (nestedFieldPathOpt != null) {
+      nestedFieldPathOpt
+    } else {
+      orderPosListMap.computeIfAbsent((structType, field), new 
JFunction[(StructType, String), Option[NestedFieldPath]] {
+        override def apply(t: (StructType, String)): Option[NestedFieldPath] =
+          composeNestedFieldPath(structType, field)
+      })
     }
   }
 
-  def getCachedPosList(structType: StructType, field: String): NestedFieldPath 
= {
-    val schemaPair = (structType, field)
-    if (!orderPosListMap.containsKey(schemaPair)) {
-      val posList = HoodieUnsafeRowUtils.composeNestedFieldPath(structType, 
field)
-      orderPosListMap.put(schemaPair, posList)
+  def getCachedSchema(schema: Schema): StructType = {
+    val structType = schemaMap.get(schema)
+    // NOTE: This specifically designed to do 2 lookups (in case of 
cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more 
frequent cache-hit path
+    if (structType != null) {
+      structType
+    } else {
+      schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
+        override def apply(t: Schema): StructType =
+          convertAvroSchemaToStructType(schema)
+      })
     }
-    orderPosListMap.get(schemaPair)
   }
 
-  def getCachedUnsafeProjection(from: StructType, to: StructType): 
UnsafeProjection = {
-    val schemaPair = (from, to)
-    val map = unsafeProjectionThreadLocal.get()
-    if (!map.containsKey(schemaPair)) {
-      val projection = 
HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to)
-      map.put(schemaPair, projection)
+  private[sql] def genUnsafeRowWriter(prevSchema: StructType,
+                                      newSchema: StructType,
+                                      renamedColumnsMap: JMap[String, 
String]): UnsafeRowWriter = {
+    val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, 
new JArrayDeque[String]())
+    val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+    val phonyUpdater = new CatalystDataUpdater {
+      var value: InternalRow = _
+
+      override def set(ordinal: Int, value: Any): Unit =
+        this.value = value.asInstanceOf[InternalRow]
     }
-    map.get(schemaPair)
-  }
 
-  def getCachedSchema(schema: Schema): StructType = {
-    if (!schemaMap.containsKey(schema)) {
-      val structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
-      schemaMap.put(schema, structType)
+    oldRow => {
+      writer(phonyUpdater, 0, oldRow)
+      unsafeProjection(phonyUpdater.value)
     }
-    schemaMap.get(schema)
   }
 
-  def existField(structType: StructType, name: String): Boolean = {
-    try {
-      HoodieUnsafeRowUtils.composeNestedFieldPath(structType, name)
-      true
-    } catch {
-      case _: IllegalArgumentException => false
+  private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+  private def genUnsafeStructWriter(prevStructType: StructType,
+                                    newStructType: StructType,
+                                    renamedColumnsMap: JMap[String, String],
+                                    fieldNamesStack: JDeque[String]): 
(CatalystDataUpdater, Any) => Unit = {
+    // TODO need to canonicalize schemas (casing)
+    val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+    val positionMap = ArrayBuffer.empty[Int]
+
+    for (newField <- newStructType.fields) {
+      fieldNamesStack.push(newField.name)
+
+      val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+        prevStructType.getFieldIndex(newField.name) match {
+          case Some(prevFieldPos) =>
+            val prevField = prevStructType(prevFieldPos)
+            (newWriterRenaming(prevField.dataType, newField.dataType, 
renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+          case None =>
+            val newFieldQualifiedName = createFullName(fieldNamesStack)
+            val prevFieldName: String = 
lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+            // Handle rename
+            prevStructType.getFieldIndex(prevFieldName) match {
+              case Some(prevFieldPos) =>
+                val prevField = prevStructType.fields(prevFieldPos)
+                (newWriterRenaming(prevField.dataType, newField.dataType, 
renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+              case None =>
+                val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => 
fieldUpdater.setNullAt(ordinal)
+                (updater, -1)
+            }
+        }
+
+      fieldWriters += fieldWriter
+      positionMap += prevFieldPos
+
+      fieldNamesStack.pop()
     }
-  }
 
-  private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, 
newSchema: DataType) = {
-    if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && 
newSchema.isInstanceOf[DecimalType])) {
-      oldSchema match {
-        case NullType | BooleanType | IntegerType | LongType | FloatType | 
DoubleType | DateType | TimestampType | BinaryType =>
-          oldValue
-        // Copy UTF8String before putting into GenericInternalRow
-        case StringType => UTF8String.fromString(oldValue.toString)
-        case DecimalType() =>
-          
Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-        case _ =>
-          throw new HoodieException("Unknown schema type: " + newSchema)
+    (fieldUpdater, row) => {
+      var pos = 0
+      while (pos < fieldWriters.length) {
+        val prevPos = positionMap(pos)
+        val prevValue = if (prevPos >= 0) {
+          row.asInstanceOf[InternalRow].get(prevPos, 
prevStructType.fields(prevPos).dataType)
+        } else {
+          null
+        }
+
+        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        pos += 1
       }
-    } else {
-      rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
     }
   }
 
-  private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: 
DataType, newSchema: DataType): Any = {
-    val value = newSchema match {
-      case NullType | BooleanType =>
-      case DateType if oldSchema.equals(StringType) =>
-        
CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(oldValue.toString))
-      case LongType =>
-        oldSchema match {
-          case IntegerType => 
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].longValue())
+  private def newWriterRenaming(prevDataType: DataType,
+                                newDataType: DataType,
+                                renamedColumnsMap: JMap[String, String],
+                                fieldNameStack: JDeque[String]): 
RowFieldUpdater = {
+    (newDataType, prevDataType) match {
+      case (newType, prevType) if prevType == newType =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+      case (newStructType: StructType, prevStructType: StructType) =>
+        val writer = genUnsafeStructWriter(prevStructType, newStructType, 
renamedColumnsMap, fieldNameStack)
+
+        val newRow = new 
SpecificInternalRow(newStructType.fields.map(_.dataType))
+        val rowUpdater = new RowUpdater(newRow)
+
+        (fieldUpdater, ordinal, value) => {
+          // Here new row is built in 2 stages:
+          //    - First, we pass mutable row (used as buffer/scratchpad) 
created above wrapped into [[RowUpdater]]
+          //      into generated row-writer
+          //    - Upon returning from row-writer, we call back into parent 
row's [[fieldUpdater]] to set returned
+          //      row as a value in it
+          writer(rowUpdater, value)
+          fieldUpdater.set(ordinal, newRow)
+        }
+
+      case (ArrayType(newElementType, _), ArrayType(prevElementType, 
containsNull)) =>
+        fieldNameStack.push("element")
+        val elementWriter = newWriterRenaming(prevElementType, newElementType, 
renamedColumnsMap, fieldNameStack)
+        fieldNameStack.pop()
+
+        (fieldUpdater, ordinal, value) => {
+          val prevArrayData = value.asInstanceOf[ArrayData]
+          val prevArray = prevArrayData.toObjectArray(prevElementType)
+
+          val newArrayData = createArrayData(newElementType, 
prevArrayData.numElements())
+          val elementUpdater = new ArrayDataUpdater(newArrayData)
+
+          var i = 0
+          while (i < prevArray.length) {
+            val element = prevArray(i)
+            if (element == null) {
+              if (!containsNull) {
+                throw new HoodieException(
+                  s"Array value at path 
'${fieldNameStack.asScala.mkString(".")}' is not allowed to be null")
+              } else {
+                elementUpdater.setNullAt(i)
+              }
+            } else {
+              elementWriter(elementUpdater, i, element)
+            }
+            i += 1
+          }
+
+          fieldUpdater.set(ordinal, newArrayData)
+        }
+
+      case (MapType(_, newValueType, _), MapType(_, prevValueType, 
valueContainsNull)) =>
+        fieldNameStack.push("value")
+        val valueWriter = newWriterRenaming(prevValueType, newValueType, 
renamedColumnsMap, fieldNameStack)
+        fieldNameStack.pop()
+
+        (updater, ordinal, value) =>
+          val mapData = value.asInstanceOf[MapData]
+          val prevKeyArrayData = mapData.keyArray
+          val prevValueArrayData = mapData.valueArray
+          val prevValueArray = prevValueArrayData.toObjectArray(prevValueType)
+
+          val newValueArray = createArrayData(newValueType, 
mapData.numElements())
+          val valueUpdater = new ArrayDataUpdater(newValueArray)
+          var i = 0
+          while (i < prevValueArray.length) {
+            val value = prevValueArray(i)
+            if (value == null) {
+              if (!valueContainsNull) {
+                throw new HoodieException(s"Map value at path 
${fieldNameStack.asScala.mkString(".")} is not allowed to be null")
+              } else {
+                valueUpdater.setNullAt(i)
+              }
+            } else {
+              valueWriter(valueUpdater, i, value)
+            }
+            i += 1
+          }
+
+          // NOTE: Key's couldn't be transformed and have to always be of 
[[StringType]]
+          updater.set(ordinal, new ArrayBasedMapData(prevKeyArrayData, 
newValueArray))
+
+      case (newDecimal: DecimalType, _) =>
+        prevDataType match {
+          case IntegerType | LongType | FloatType | DoubleType | StringType =>
+            (fieldUpdater, ordinal, value) =>
+              val scale = newDecimal.scale
+              // TODO this has to be revisited to avoid loss of precision (for 
fps)
+              fieldUpdater.setDecimal(ordinal, 
Decimal.fromDecimal(BigDecimal(value.toString).setScale(scale, 
ROUND_HALF_EVEN)))
+
+          case _: DecimalType =>
+            (fieldUpdater, ordinal, value) =>
+              fieldUpdater.setDecimal(ordinal, 
Decimal.fromDecimal(value.asInstanceOf[Decimal].toBigDecimal.setScale(newDecimal.scale)))
+
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and 
$newDataType are incompatible")
         }
-      case FloatType =>
-        oldSchema match {
-          case IntegerType => 
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].floatValue())
-          case LongType => 
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].floatValue())
+
+      case (_: ShortType, _) =>
+        prevDataType match {
+          case _: ByteType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setShort(ordinal, value.asInstanceOf[Byte].toShort)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and 
$newDataType are incompatible")
         }
-      case DoubleType =>
-        oldSchema match {
-          case IntegerType => 
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].doubleValue())
-          case LongType => 
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].doubleValue())
-          case FloatType => 
CatalystTypeConverters.convertToCatalyst(java.lang.Double.valueOf(oldValue.asInstanceOf[Float]
 + ""))
+
+      case (_: IntegerType, _) =>
+        prevDataType match {
+          case _: ShortType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setInt(ordinal, value.asInstanceOf[Short].toInt)
+          case _: ByteType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setInt(ordinal, value.asInstanceOf[Byte].toInt)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and 
$newDataType are incompatible")
         }
-      case BinaryType =>
-        oldSchema match {
-          case StringType => 
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8))
+
+      case (_: LongType, _) =>
+        prevDataType match {
+          case _: IntegerType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setLong(ordinal, value.asInstanceOf[Int].toLong)
+          case _: ShortType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setLong(ordinal, value.asInstanceOf[Short].toLong)
+          case _: ByteType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setLong(ordinal, value.asInstanceOf[Byte].toLong)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and 
$newDataType are incompatible")
         }
-      case StringType =>
-        oldSchema match {
-          case BinaryType => CatalystTypeConverters.convertToCatalyst(new 
String(oldValue.asInstanceOf[Array[Byte]]))
-          case DateType => 
CatalystTypeConverters.convertToCatalyst(toJavaDate(oldValue.asInstanceOf[Integer]).toString)
-          case IntegerType | LongType | FloatType | DoubleType | DecimalType() 
=> CatalystTypeConverters.convertToCatalyst(oldValue.toString)
+
+      case (_: FloatType, _) =>
+        prevDataType match {
+          case _: LongType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Long].toFloat)
+          case _: IntegerType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Int].toFloat)
+          case _: ShortType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Short].toFloat)
+          case _: ByteType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Byte].toFloat)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and 
$newDataType are incompatible")
         }
-      case DecimalType() =>
-        oldSchema match {
-          case IntegerType | LongType | FloatType | DoubleType | StringType =>
-            val scale = newSchema.asInstanceOf[DecimalType].scale
 
-            Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale))
+      case (_: DoubleType, _) =>
+        prevDataType match {
+          case _: FloatType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble)
+          case _: LongType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Long].toDouble)
+          case _: IntegerType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Int].toDouble)
+          case _: ShortType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Short].toDouble)
+          case _: ByteType => (fieldUpdater, ordinal, value) => 
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Byte].toDouble)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and 
$newDataType are incompatible")
         }
-      case _ =>
-    }
-    if (value == None) {
-      throw new HoodieException(String.format("cannot support rewrite value 
for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
-    } else {
-      CatalystTypeConverters.convertToCatalyst(value)
+
+      case (_: BinaryType, _: StringType) =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, 
value.asInstanceOf[UTF8String].getBytes)
+
+      // TODO revisit this (we need to align permitted casting w/ Spark)
+      // NOTE: This is supported to stay compatible w/ 
[[HoodieAvroUtils.rewriteRecordWithNewSchema]]
+      case (_: StringType, _) =>
+        prevDataType match {
+          case BinaryType => (fieldUpdater, ordinal, value) =>
+            fieldUpdater.set(ordinal, 
UTF8String.fromBytes(value.asInstanceOf[Array[Byte]]))
+          case DateType => (fieldUpdater, ordinal, value) =>
+            fieldUpdater.set(ordinal, 
UTF8String.fromString(toJavaDate(value.asInstanceOf[Integer]).toString))
+          case IntegerType | LongType | FloatType | DoubleType | _: 
DecimalType =>
+            (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, 
UTF8String.fromString(value.toString))
+
+          case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and 
$newDataType are incompatible")
+        }
+
+      case (DateType, StringType) =>
+        (fieldUpdater, ordinal, value) =>
+          fieldUpdater.set(ordinal, 
CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))
+
+      case (_, _) =>
+        throw new IllegalArgumentException(s"$prevDataType and $newDataType 
are incompatible")
     }
   }
 
-  def removeFields(schema: StructType, fieldsToRemove: 
java.util.List[String]): StructType = {
-    StructType(schema.fields.filter(field => 
!fieldsToRemove.contains(field.name)))
+  private def lookupRenamedField(newFieldQualifiedName: String, 
renamedColumnsMap: JMap[String, String]) = {
+    val prevFieldQualifiedName = 
renamedColumnsMap.getOrDefault(newFieldQualifiedName, "")
+    val prevFieldQualifiedNameParts = prevFieldQualifiedName.split("\\.")
+    val prevFieldName = 
prevFieldQualifiedNameParts(prevFieldQualifiedNameParts.length - 1)
+
+    prevFieldName
   }
+
+  private def createArrayData(elementType: DataType, length: Int): ArrayData = 
elementType match {

Review Comment:
   Yes, they 1:1 w/ Spark type system



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to