nsivabalan commented on code in PR #8956:
URL: https://github.com/apache/hudi/pull/8956#discussion_r1228771179


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -144,20 +144,25 @@ class DefaultSource extends RelationProvider
                               mode: SaveMode,
                               optParams: Map[String, String],
                               df: DataFrame): BaseRelation = {
-    val dfWithoutMetaCols = 
df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
+    val dfPrepped = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, 
"false")

Review Comment:
   lets call arg in L147 as "rawDf" and rename this too "df" 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1113,23 +1121,24 @@ object HoodieSparkSqlWriter {
                                     writerSchema: Schema,
                                     dataFileSchema: Schema,
                                     operation: WriteOperationType,
-                                    instantTime: String) = {
+                                    instantTime: String,

Review Comment:
   is createHoodieRecordRdd static?
   can we create a new class called HoodieCreateRecordUtils and move this 
entire method there. the main class (HoodieSparkSqlWriter) is becoming 
unmanageable. 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -55,8 +55,8 @@ trait ProvidesHoodieConfig extends Logging {
     // TODO(HUDI-3456) clean up
     val preCombineField = Option(tableConfig.getPreCombineField).getOrElse("")
 
-    require(hoodieCatalogTable.primaryKeys.nonEmpty,
-      s"There are no primary key in table 
${hoodieCatalogTable.table.identifier}, cannot execute update operator")
+//    require(hoodieCatalogTable.primaryKeys.nonEmpty,

Review Comment:
   can we remove commented out code if its not required. or revert if its 
applicable 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -137,6 +137,10 @@ trait ProvidesHoodieConfig extends Logging {
     val isNonStrictMode = insertMode == InsertMode.NON_STRICT
     val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
     val hasPrecombineColumn = hoodieCatalogTable.preCombineKey.nonEmpty
+    val hasPrimaryKey = hoodieCatalogTable.primaryKeys.nonEmpty
+    if (hoodieCatalogTable.primaryKeys.length == 0) { // auto generation or 
record keys
+      combinedOpts += DataSourceWriteOptions.OPERATION.key -> "insert"

Review Comment:
   Once Jon lands his patch, guess we can revert this change.
   https://github.com/apache/hudi/pull/8875



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1195,18 +1212,109 @@ object HoodieSparkSqlWriter {
           }
           val sparkKeyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
+          val finalStructType = if (isPrepped) {
+            val fieldsToExclude = 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()
+            StructType(targetStructType.fields.filterNot(field => 
fieldsToExclude.contains(field.name)))
+          } else {
+            targetStructType
+          }
           // NOTE: To make sure we properly transform records
-          val targetStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
+          val finalStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, finalStructType)
 
           it.map { sourceRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, 
sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, 
sourceStructType)
-            val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-            val targetRow = targetStructTypeRowWriter(sourceRow)
+            val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) 
= getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator, sourceRow, 
sourceStructType, isPrepped)
 
-            new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
+            val targetRow = finalStructTypeRowWriter(sourceRow)
+            var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow, 
dataFileStructType, false)
+            if (recordLocation.isDefined) {
+              hoodieSparkRecord.setCurrentLocation(recordLocation.get)
+            }
+            hoodieSparkRecord
           }
         }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
     }
   }
+
+  private def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator: 
BaseKeyGenerator, avroRec: GenericRecord,
+                                                         isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+    val recordKey = if (isPrepped) {
+      avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
+    } else {
+      keyGenerator.getRecordKey(avroRec)
+    };
+
+    val partitionPath = if (isPrepped) {
+      avroRec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString
+    } else {
+      keyGenerator.getPartitionPath(avroRec)
+    };
+
+    val hoodieKey = new HoodieKey(recordKey, partitionPath)
+    val instantTime: Option[String] = if (isPrepped) {
+      
Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString) }
+    else {
+      None
+    }
+    val fileName: Option[String] = if (isPrepped) {
+      
Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString) }
+    else {
+      None
+    }
+    val recordLocation: Option[HoodieRecordLocation] = if 
(instantTime.isDefined && fileName.isDefined) {
+      val fileId = FSUtils.getFileId(fileName.get)
+      Some(new HoodieRecordLocation(instantTime.get, fileId))
+    } else {
+      None
+    }
+    (hoodieKey, recordLocation)
+  }
+
+  private def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator: 
SparkKeyGeneratorInterface,
+                                                          sourceRow: 
InternalRow, schema: StructType,
+                                                          isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+    def getFieldIndex(fieldName: String): Int = {

Review Comment:
   should we do this once in the driver and once within mapPartitions() rather 
than for very record ? 
   



##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java:
##########
@@ -84,7 +84,7 @@ public JavaRDD<HoodieRecord> generateInputRecords(String 
tableName, String sourc
                   
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
           try {
             return DataSourceUtils.createHoodieRecord(gr, orderingVal, 
keyGenerator.getKey(gr),
-                props.getString("hoodie.datasource.write.payload.class"));
+                props.getString("hoodie.datasource.write.payload.class"), 
scala.Option.apply(null));

Review Comment:
   you can pass "None" 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1195,18 +1212,109 @@ object HoodieSparkSqlWriter {
           }
           val sparkKeyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
+          val finalStructType = if (isPrepped) {
+            val fieldsToExclude = 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()
+            StructType(targetStructType.fields.filterNot(field => 
fieldsToExclude.contains(field.name)))
+          } else {
+            targetStructType
+          }
           // NOTE: To make sure we properly transform records
-          val targetStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
+          val finalStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, finalStructType)
 
           it.map { sourceRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, 
sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, 
sourceStructType)
-            val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-            val targetRow = targetStructTypeRowWriter(sourceRow)
+            val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) 
= getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator, sourceRow, 
sourceStructType, isPrepped)
 
-            new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
+            val targetRow = finalStructTypeRowWriter(sourceRow)
+            var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow, 
dataFileStructType, false)
+            if (recordLocation.isDefined) {
+              hoodieSparkRecord.setCurrentLocation(recordLocation.get)
+            }
+            hoodieSparkRecord
           }
         }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
     }
   }
+
+  private def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator: 
BaseKeyGenerator, avroRec: GenericRecord,
+                                                         isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+    val recordKey = if (isPrepped) {
+      avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
+    } else {
+      keyGenerator.getRecordKey(avroRec)
+    };
+
+    val partitionPath = if (isPrepped) {
+      avroRec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString
+    } else {
+      keyGenerator.getPartitionPath(avroRec)
+    };
+
+    val hoodieKey = new HoodieKey(recordKey, partitionPath)
+    val instantTime: Option[String] = if (isPrepped) {
+      
Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString) }
+    else {
+      None
+    }
+    val fileName: Option[String] = if (isPrepped) {
+      
Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString) }
+    else {
+      None
+    }
+    val recordLocation: Option[HoodieRecordLocation] = if 
(instantTime.isDefined && fileName.isDefined) {
+      val fileId = FSUtils.getFileId(fileName.get)
+      Some(new HoodieRecordLocation(instantTime.get, fileId))
+    } else {
+      None
+    }
+    (hoodieKey, recordLocation)
+  }
+
+  private def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator: 
SparkKeyGeneratorInterface,
+                                                          sourceRow: 
InternalRow, schema: StructType,
+                                                          isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+    def getFieldIndex(fieldName: String): Int = {
+      if (schema.fieldNames.contains(fieldName)) {
+        HOODIE_META_COLUMNS_NAME_TO_POS.get(fieldName)
+      } else {
+        -1
+      }
+    }
+
+    val hoodieRecordKeyIndex = 
getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+    val recordKey = if (isPrepped && hoodieRecordKeyIndex != -1 && 
!sourceRow.isNullAt(hoodieRecordKeyIndex)) {

Review Comment:
   if isPrepped is true and if any of meta fields are not present we should 
throw in my opinion and not fallback to key generator



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1195,18 +1212,109 @@ object HoodieSparkSqlWriter {
           }
           val sparkKeyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
+          val finalStructType = if (isPrepped) {
+            val fieldsToExclude = 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()
+            StructType(targetStructType.fields.filterNot(field => 
fieldsToExclude.contains(field.name)))
+          } else {
+            targetStructType
+          }
           // NOTE: To make sure we properly transform records
-          val targetStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
+          val finalStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, finalStructType)
 
           it.map { sourceRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, 
sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, 
sourceStructType)
-            val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-            val targetRow = targetStructTypeRowWriter(sourceRow)
+            val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) 
= getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator, sourceRow, 
sourceStructType, isPrepped)
 
-            new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
+            val targetRow = finalStructTypeRowWriter(sourceRow)
+            var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow, 
dataFileStructType, false)
+            if (recordLocation.isDefined) {
+              hoodieSparkRecord.setCurrentLocation(recordLocation.get)
+            }
+            hoodieSparkRecord
           }
         }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
     }
   }
+
+  private def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator: 
BaseKeyGenerator, avroRec: GenericRecord,
+                                                         isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+    val recordKey = if (isPrepped) {
+      avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
+    } else {
+      keyGenerator.getRecordKey(avroRec)
+    };
+
+    val partitionPath = if (isPrepped) {
+      avroRec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString
+    } else {
+      keyGenerator.getPartitionPath(avroRec)
+    };
+
+    val hoodieKey = new HoodieKey(recordKey, partitionPath)
+    val instantTime: Option[String] = if (isPrepped) {
+      
Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString) }
+    else {
+      None
+    }
+    val fileName: Option[String] = if (isPrepped) {
+      
Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString) }
+    else {
+      None
+    }
+    val recordLocation: Option[HoodieRecordLocation] = if 
(instantTime.isDefined && fileName.isDefined) {
+      val fileId = FSUtils.getFileId(fileName.get)
+      Some(new HoodieRecordLocation(instantTime.get, fileId))
+    } else {
+      None
+    }
+    (hoodieKey, recordLocation)
+  }
+
+  private def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator: 
SparkKeyGeneratorInterface,
+                                                          sourceRow: 
InternalRow, schema: StructType,
+                                                          isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+    def getFieldIndex(fieldName: String): Int = {
+      if (schema.fieldNames.contains(fieldName)) {
+        HOODIE_META_COLUMNS_NAME_TO_POS.get(fieldName)
+      } else {
+        -1
+      }
+    }
+
+    val hoodieRecordKeyIndex = 
getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+    val recordKey = if (isPrepped && hoodieRecordKeyIndex != -1 && 
!sourceRow.isNullAt(hoodieRecordKeyIndex)) {
+      sourceRow.getString(hoodieRecordKeyIndex);
+    } else {
+      sparkKeyGenerator.getRecordKey(sourceRow, schema).toString
+    }
+
+    val hoodiePartitionPathIndex = 
getFieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
+    val partitionPath = if (isPrepped && hoodiePartitionPathIndex != -1 && 
!sourceRow.isNullAt(hoodiePartitionPathIndex)) {
+      sourceRow.getString(hoodiePartitionPathIndex)
+    } else {
+      sparkKeyGenerator.getPartitionPath(sourceRow, schema).toString
+    };
+
+    val commitTimeIndex = 
getFieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+    val instantTime: Option[String] = if (isPrepped && commitTimeIndex != -1) {
+      Option(sourceRow.getString(commitTimeIndex))
+    } else {
+      None
+    }
+
+    val fileNameIndex = getFieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)

Review Comment:
   same comment as above. lets not call getFieldIndex for every record. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to