nsivabalan commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089682826


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1078,28 +1049,28 @@ object HoodieSparkSqlWriter {
           }
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
-        // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = 
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
         val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
         val dataFileStructType = 
HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
         val writerStructType = 
HoodieInternalRowUtils.getCachedSchema(writerSchema)
         val sourceStructType = df.schema
-        df.queryExecution.toRdd.mapPartitions { iter =>
 
-          iter.map { internalRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(internalRow, 
sourceStructType)
-            val partitionPath = 
sparkKeyGenerator.getPartitionPath(internalRow, sourceStructType)
+        df.queryExecution.toRdd.mapPartitions { it =>
+          // TODO elaborate

Review Comment:
   please elaborate. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1078,28 +1049,28 @@ object HoodieSparkSqlWriter {
           }
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
-        // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = 
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
         val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
         val dataFileStructType = 
HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
         val writerStructType = 
HoodieInternalRowUtils.getCachedSchema(writerSchema)
         val sourceStructType = df.schema
-        df.queryExecution.toRdd.mapPartitions { iter =>
 
-          iter.map { internalRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(internalRow, 
sourceStructType)
-            val partitionPath = 
sparkKeyGenerator.getPartitionPath(internalRow, sourceStructType)
+        df.queryExecution.toRdd.mapPartitions { it =>
+          // TODO elaborate
+          val (unsafeProjection, transformer) = if 
(shouldDropPartitionColumns) {
+            (generateUnsafeProjection(dataFileStructType, dataFileStructType), 
genUnsafeRowWriter(sourceStructType, dataFileStructType))

Review Comment:
   nice optimization



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala:
##########
@@ -65,25 +61,32 @@ class TestHoodieInternalRowUtils extends FunSuite with 
Matchers with BeforeAndAf
   }
 
   test("test rewrite") {
-    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, 
"like1", 181)))
+    val rows = Seq(
+      Row("Andrew", 18, Row("Mission st", "SF"), "John", 19)
+    )
+    val data = sparkSession.sparkContext.parallelize(rows)
     val oldRow = sparkSession.createDataFrame(data, 
schemaMerge).queryExecution.toRdd.first()
-    val newRow1 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, 
schema1)
-    val newRow2 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, 
schema2)
-    assert(newRow1.get(0, StringType).toString.equals("like"))
-    assert(newRow1.get(1, IntegerType) == 18)
-    assert(newRow2.get(0, StringType).toString.equals("like1"))
-    assert(newRow2.get(1, IntegerType) == 181)
+
+    val rowWriter1 = HoodieInternalRowUtils.genUnsafeRowWriter(schemaMerge, 
schema1)
+    val newRow1 = rowWriter1(oldRow)
+
+    val serDe1 = sparkAdapter.createSparkRowSerDe(schema1)
+    assertEquals(serDe1.deserializeRow(newRow1), Row("Andrew", 18, 
Row("Mission st", "SF")));
+
+    val rowWriter2 = HoodieInternalRowUtils.genUnsafeRowWriter(schemaMerge, 
schema2)
+    val newRow2 = rowWriter2(oldRow)
+
+    val serDe2 = sparkAdapter.createSparkRowSerDe(schema2)
+    assertEquals(serDe2.deserializeRow(newRow2), Row("John", 19));
   }
 
   test("test rewrite with nullable value") {
-    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+    val data = sparkSession.sparkContext.parallelize(Seq(Row("Rob", 18, 
null.asInstanceOf[StructType])))
     val oldRow = sparkSession.createDataFrame(data, 
schema1).queryExecution.toRdd.first()
-    val newRow = HoodieInternalRowUtils.rewriteRecord(oldRow, schema1, 
schemaMerge)
-    assert(newRow.get(0, StringType).toString.equals("like"))
-    assert(newRow.get(1, IntegerType) == 18)
-    assert(newRow.get(2, StringType) == null)
-    assert(newRow.get(3, IntegerType) == null)
-  }
-
+    val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(schema1, 
schemaMerge)
+    val newRow = rowWriter(oldRow)
 
+    val serDe = sparkAdapter.createSparkRowSerDe(schemaMerge)

Review Comment:
   let's test all data types as much as possible (all primitives, arrays, maps 
etc). 
   also, lets test some null values for some of the fields.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to