nsivabalan commented on code in PR #7668:
URL: https://github.com/apache/hudi/pull/7668#discussion_r1073828099


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -77,6 +77,28 @@ object HoodieSparkSqlWriter {
   private var asyncCompactionTriggerFnDefined: Boolean = false
   private var asyncClusteringTriggerFnDefined: Boolean = false
 
+  def changeOperationToInsertIfRequired(writeOperationType: 
WriteOperationType, hoodieConfig: HoodieConfig)

Review Comment:
   handleConfigsForAutoGenerationOfRecordKeys



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala:
##########
@@ -159,7 +164,15 @@ object HoodieWriterUtils {
           
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
         }
 
-        val datasourcePartitionFields = 
params.getOrElse(PARTITIONPATH_FIELD.key(), null)
+        val typedProperties = new TypedProperties()
+        typedProperties.putAll(params)
+        val keyGeneratorClass = 
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(typedProperties)
+        val datasourcePartitionFields = if (keyGeneratorClass == 
classOf[KeylessKeyGenerator].getName) {
+          val keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties)
+          SparkKeyGenUtils.getPartitionColumns(keyGenerator, 
toProperties(params))

Review Comment:
   lets revisit this after 5574. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala:
##########
@@ -1179,6 +1177,88 @@ class TestHoodieSparkSqlWriter {
     assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testAutoGenerationOfRecordKeysWithCombineBeforeInsert(tableType: 
String): Unit = {
+    val _spark = spark
+    import _spark.implicits._
+
+    val initialOpts = Map(
+      HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> 
"org.apache.hudi.keygen.KeylessKeyGenerator",
+      DataSourceWriteOptions.OPERATION.key() -> INSERT_OPERATION_OPT_VAL,
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition:simple",
+      HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> 
"false")
+
+    try {
+      // verify exception is thrown when 
HoodieWriteConfig.COMBINE_BEFORE_INSERT is enabled
+      val tmpDF = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", 
"value", "ts", "dt")
+      tmpDF.write.format("org.apache.hudi")
+        .options(initialOpts + (HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() 
-> "true"))
+        .mode(SaveMode.Append)
+        .save(tempBasePath)
+    } catch {
+      case e: HoodieException =>
+        
assertTrue(e.getMessage.contains(s"${classOf[KeylessKeyGenerator].getName} can 
not be used with config enabled " +
+          s"{${HoodieWriteConfig.COMBINE_BEFORE_INSERT.key()}}"), e.getMessage)
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testAutoGenerationOfRecordKeys(tableType: String): Unit = {
+
+    val dataGen = new HoodieTestDataGenerator()
+    val initialOpts = Map(
+      HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> 
"org.apache.hudi.keygen.KeylessKeyGenerator",
+      DataSourceWriteOptions.OPERATION.key() -> UPSERT_OPERATION_OPT_VAL,
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition:simple",
+      HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> 
"false")
+
+    var totalRecs = 0
+    val extraOptsList = List(
+      Map.empty,
+      Map(
+        DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+        DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key")
+    )
+    for (extraOpt <- extraOptsList) {
+      val opts = initialOpts ++ extraOpt
+      for (x <- 1 to 5) {
+        val instantTime = "00" + x
+        val genRecsList = if (x == 1) {
+          totalRecs += 100 * 2
+          val inserts = dataGen.generateInserts(instantTime, 100)
+          inserts.addAll(inserts)
+          inserts
+        } else {
+          totalRecs += 10
+          dataGen.generateUniqueUpdates(instantTime, 10)
+        }
+        val records = recordsToStrings(genRecsList).toList
+        val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 
2))
+        inputDF.write.format("org.apache.hudi")
+          .options(opts)
+          .mode(SaveMode.Append)
+          .save(tempBasePath)
+
+        val snapshotDF = spark.read.format("org.apache.hudi")
+          .load(tempBasePath)
+        assertEquals(totalRecs, snapshotDF.count())
+      }
+    }
+
+    val metaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(tempBasePath).build()
+    assertEquals(metaClient.getTableConfig.getTableType, 
HoodieTableType.COPY_ON_WRITE)

Review Comment:
   can we write tests for exception/non happy paths. 
   1. if combine.before.insert is set, and try to send duplicates records, all 
duplicates should be present in snapshot read. 
   2. verify that preCombine field is not set in tableConfig. ensure tests sets 
it explicitly.
   3. if record key field config is set, it should not throw if keyless is 
enabled. and try to inject records w/ same record keys. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to