wzx140 commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089861698


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -308,14 +175,351 @@ object HoodieInternalRowUtils {
         }
       case _ =>
     }
+
     if (value == None) {
       throw new HoodieException(String.format("cannot support rewrite value 
for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
     } else {
       CatalystTypeConverters.convertToCatalyst(value)
     }
   }
+   */
+  
+  private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+  private def genUnsafeRowWriterRenaming(prevStructType: StructType, 
newStructType: StructType, renamedColumnsMap: JMap[String, String], fieldNames: 
JDeque[String]): (CatalystDataUpdater, Any) => Unit = {
+    // TODO need to canonicalize schemas (casing)
+    val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+    val positionMap = ArrayBuffer.empty[Int]
+
+    for (newField <- newStructType.fields) {
+      fieldNames.push(newField.name)
+
+      val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+        prevStructType.getFieldIndex(newField.name) match {
+          case Some(prevFieldPos) =>
+            val prevField = prevStructType(prevFieldPos)
+            (newWriterRenaming(prevField.dataType, newField.dataType, 
renamedColumnsMap, fieldNames), prevFieldPos)
+
+          case None =>
+            val newFieldQualifiedName = createFullName(fieldNames)
+            val prevFieldName: String = 
lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+            // Handle rename
+            prevStructType.getFieldIndex(prevFieldName) match {
+              case Some(prevFieldPos) =>
+                val prevField = prevStructType.fields(prevFieldPos)
+                (newWriterRenaming(prevField.dataType, newField.dataType, 
renamedColumnsMap, fieldNames), prevFieldPos)
+
+              case None =>
+                // TODO handle defaults
+                val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => 
fieldUpdater.setNullAt(ordinal)
+                (updater, -1)
+            }
+        }
+
+      fieldWriters += fieldWriter
+      positionMap += prevFieldPos
+
+      fieldNames.pop()
+    }
+
+    (fieldUpdater, row) => {
+      var pos = 0
+      while (pos < fieldWriters.length) {
+        val prevPos = positionMap(pos)
+        val prevValue = if (prevPos >= 0) {
+          row.asInstanceOf[InternalRow].get(prevPos, 
prevStructType.fields(prevPos).dataType)
+        } else {
+          null
+        }
+
+        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        pos += 1
+      }
+    }
+  }
+
+  private def newWriterRenaming(prevDataType: DataType,
+                                newDataType: DataType,
+                                renamedColumnsMap: JMap[String, String],
+                                fieldNames: JDeque[String]): RowFieldUpdater = 
{
+    (prevDataType, newDataType) match {
+      case (prevType, newType) if prevType == newType =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+      case (prevStructType: StructType, newStructType: StructType) =>
+        val writer = genUnsafeRowWriterRenaming(prevStructType, newStructType, 
renamedColumnsMap, fieldNames)
+        val newRow = new 
SpecificInternalRow(newStructType.fields.map(_.dataType))
+        val rowUpdater = new RowUpdater(newRow)
+
+        (fieldUpdater, ordinal, value) => {
+          // TODO elaborate

Review Comment:
   What does elaborate mean? Need more comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to