vinothchandar commented on code in PR #8107:
URL: https://github.com/apache/hudi/pull/8107#discussion_r1182499972


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieKeyGeneratorException
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+
+object AutoRecordKeyGenerationUtils {
+
+  def validateParamsForAutoGenerationOfRecordKeys(parameters: Map[String, 
String], hoodieConfig: HoodieConfig): Unit = {
+    val autoGenerateRecordKeys = 
!parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if 
record key is not configured,
+    // hudi will auto generate.
+
+    if (autoGenerateRecordKeys) {

Review Comment:
   In this case, the dupes will never be found, that's all right?  I don't see 
a reason to fail the writes per se. In general, we should avoid these 
interdependencies between configs. Can we WARN for these cases and let the 
write go through.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1118,31 +1124,47 @@ object HoodieSparkSqlWriter {
           Some(writerSchema))
 
         avroRecords.mapPartitions(it => {
+          val sparkPartitionId = TaskContext.getPartitionId()
+
           val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
           val consistentLogicalTimestampEnabled = parameters.getOrElse(
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
 
-          it.map { avroRecord =>
+          // generate record keys if auto generation is enabled.
+          val recordsWithRecordKeyOverride = 
mayBeAutoGenerateRecordKeys(autoGenerateRecordKeys, it, instantTime, 
sparkPartitionId)
+
+          // handle dropping partition columns
+          recordsWithRecordKeyOverride.map { avroRecordRecordKeyOverRide =>
             val processedRecord = if (shouldDropPartitionColumns) {
-              HoodieAvroUtils.rewriteRecord(avroRecord, dataFileSchema)
+              HoodieAvroUtils.rewriteRecord(avroRecordRecordKeyOverRide._1, 
dataFileSchema)
+            } else {
+              avroRecordRecordKeyOverRide._1
+            }
+
+            // Generate HoodieKey for records
+            val hoodieKey = if (autoGenerateRecordKeys) {
+              // fetch record key from the recordKeyOverride if auto 
generation is enabled.
+              new HoodieKey(avroRecordRecordKeyOverRide._2.get, 
keyGenerator.getKey(avroRecordRecordKeyOverRide._1).getPartitionPath)
             } else {
-              avroRecord
+              keyGenerator.getKey(avroRecordRecordKeyOverRide._1)
             }
+
             val hoodieRecord = if (shouldCombine) {
-              val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRecord, 
config.getString(PRECOMBINE_FIELD),
+              val orderingVal = 
HoodieAvroUtils.getNestedFieldVal(avroRecordRecordKeyOverRide._1, 
config.getString(PRECOMBINE_FIELD),
                 false, 
consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
-              DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, 
keyGenerator.getKey(avroRecord),
+              DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, 
hoodieKey,
                 config.getString(PAYLOAD_CLASS_NAME))
             } else {
-              DataSourceUtils.createHoodieRecord(processedRecord, 
keyGenerator.getKey(avroRecord),
+              DataSourceUtils.createHoodieRecord(processedRecord, hoodieKey,
                 config.getString(PAYLOAD_CLASS_NAME))
             }
             hoodieRecord
           }
         }).toJavaRDD()
 
       case HoodieRecord.HoodieRecordType.SPARK =>
+        // TODO: fix auto generation of record keys

Review Comment:
   should we do this for this path in the same PR? I am scared of introducing 
more divergences. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala:
##########
@@ -1267,4 +1267,39 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       })
     }
   }
+
+
+  test("Test Insert Into with auto generate record keys") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create a partitioned table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  dt string,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | partitioned by (dt)
+           | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+      // Note: Do not write the field alias, the partition field must be 
placed last.
+      spark.sql(
+        s"""
+           | insert into $tableName values
+           | (1, 'a1', 10, 1000, "2021-01-05"),
+           | (2, 'a2', 20, 2000, "2021-01-06"),
+           | (3, 'a3', 30, 3000, "2021-01-07")
+              """.stripMargin)
+
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(

Review Comment:
   don't we have to read the keys out?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala:
##########
@@ -196,10 +196,12 @@ object HoodieOptionConfig {
     // validate primary key
     val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)
       .map(_.split(",").filter(_.length > 0))
-    ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is 
specified.")
-    primaryKeys.get.foreach { primaryKey =>
-      ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, 
getRootLevelFieldName(primaryKey))),
-        s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
+    // ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is 
specified.")

Review Comment:
   remove



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala:
##########
@@ -1267,4 +1267,39 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       })
     }
   }
+
+
+  test("Test Insert Into with auto generate record keys") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create a partitioned table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  dt string,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | partitioned by (dt)
+           | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+      // Note: Do not write the field alias, the partition field must be 
placed last.
+      spark.sql(
+        s"""
+           | insert into $tableName values
+           | (1, 'a1', 10, 1000, "2021-01-05"),
+           | (2, 'a2', 20, 2000, "2021-01-06"),
+           | (3, 'a3', 30, 3000, "2021-01-07")
+              """.stripMargin)
+
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(

Review Comment:
   this test is not testing that the keys were actually generated and is 
unique. Love to see more comprehensive SQL tests, that include the entire set 
of DMLs. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java:
##########
@@ -44,6 +48,9 @@ public ComplexAvroKeyGenerator(TypedProperties props) {
 
   @Override
   public String getRecordKey(GenericRecord record) {
+    if (autoGenerateRecordKeys()) {
+      return StringUtils.EMPTY_STRING;
+    }

Review Comment:
   Do we have alignment on the wrapping KeyGenerator? @nsivabalan what 
@danny0405  is saying is - have a `HoodieInternalKeyGenerator` class and do all 
these checks in that one file, while delegating to the inner key generator. 
Makes sense to me, will avoid touching all key generators. no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to