nsivabalan commented on code in PR #8107:
URL: https://github.com/apache/hudi/pull/8107#discussion_r1151448306


##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -260,6 +260,18 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("0.13.0")
       .withDocumentation("The metadata of secondary indexes");
 
+  public static final ConfigProperty<String> AUTO_GENERATE_RECORD_KEYS = 
ConfigProperty
+      .key("hoodie.table.auto.generate.record.keys")
+      .defaultValue("false")
+      .withDocumentation("Enables automatic generation of the record-keys in 
cases when dataset bears "

Review Comment:
   yes. @yihua had some tracking ticket for this. Ethan: I could not locate 
one. can you share the jira link. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.TaskContext
+
+object AutoRecordKeyGenerationUtils {
+
+   // supported operation types when auto generation of record keys is enabled.
+   val supportedOperations: Set[String] =
+    Set(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, 
WriteOperationType.DELETE,
+      WriteOperationType.INSERT_OVERWRITE, 
WriteOperationType.INSERT_OVERWRITE_TABLE,
+      WriteOperationType.DELETE_PARTITION).map(_.name())

Review Comment:
   yeah. we can't generate for an upsert. its only insert or bulk_insert for 
spark-datasource writes. but w/ spark-sql, we might want to support MIT, 
updates, deletes. so, will fix this accordingly. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, 
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null)),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
   SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
-    this(props, null, partitionPathField);
+    this(props, Option.empty(), partitionPathField);
   }
 
-  SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String 
partitionPathField) {
+  SimpleAvroKeyGenerator(TypedProperties props, Option<String> recordKeyField, 
String partitionPathField) {

Review Comment:
   we are good. everywhere we use the constructor w/ just TypedProps



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -82,9 +85,19 @@ object HoodieDatasetBulkInsertHelper
           val keyGenerator =
             ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
               .asInstanceOf[SparkKeyGeneratorInterface]
+          val partitionId = TaskContext.getPartitionId()

Review Comment:
   yeah. if entire source RDD is not re-computed, we should be ok. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.{HoodieAutoRecordKeyException, 
HoodieException}
+import org.apache.spark.TaskContext
+
+object AutoRecordKeyGenerationUtils {
+
+   // supported operation types when auto generation of record keys is enabled.
+   val supportedOperations: Set[String] =

Review Comment:
   so, when a user does not configure record key explicitly, we can turn on 
auto key generation automatically? 
   yes, we could do that. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -260,6 +260,18 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("0.13.0")
       .withDocumentation("The metadata of secondary indexes");
 
+  public static final ConfigProperty<String> AUTO_GENERATE_RECORD_KEYS = 
ConfigProperty
+      .key("hoodie.table.auto.generate.record.keys")
+      .defaultValue("false")
+      .withDocumentation("Enables automatic generation of the record-keys in 
cases when dataset bears "
+          + "no natural record key satisfying requirements of being the 
primary-key in the Hudi table. "
+          + "Record key auto-gen is generally recommended for 'append-only' 
workloads, ie ones leveraging 'insert' or "

Review Comment:
   just so we are on same page. your suggestion is to make this a writer 
config, but still make it immutable? i.e. once set, use can never unset? 
   I feel this bit contradicting to how write configs behaves which can keep 
changing from commit to another. let me know wdyt. I don't have a strong 
opinion. 
   



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -82,9 +85,19 @@ object HoodieDatasetBulkInsertHelper
           val keyGenerator =
             ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
               .asInstanceOf[SparkKeyGeneratorInterface]
+          val partitionId = TaskContext.getPartitionId()
+          var rowId = 0

Review Comment:
   yes. you are right. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -260,6 +260,18 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("0.13.0")
       .withDocumentation("The metadata of secondary indexes");
 
+  public static final ConfigProperty<String> AUTO_GENERATE_RECORD_KEYS = 
ConfigProperty
+      .key("hoodie.table.auto.generate.record.keys")
+      .defaultValue("false")
+      .withDocumentation("Enables automatic generation of the record-keys in 
cases when dataset bears "
+          + "no natural record key satisfying requirements of being the 
primary-key in the Hudi table. "
+          + "Record key auto-gen is generally recommended for 'append-only' 
workloads, ie ones leveraging 'insert' or "

Review Comment:
   sure. will take a stab at removing as a table property. 
   just that table properties are immutable, where as writer properties are 
mutable. Since for a table where auto generation is enabled, it does not makes 
sense to go back to other key gens, I thought it makes sense to go w/ table 
property. 
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java:
##########
@@ -41,7 +42,7 @@ public class TimestampBasedKeyGenerator extends 
SimpleKeyGenerator {
   private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator;
 
   public TimestampBasedKeyGenerator(TypedProperties config) throws IOException 
{
-    this(config, 
config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(config, 
config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null),

Review Comment:
   nope. have responded else where



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -608,14 +617,31 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
     SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
     SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns() ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
     if (recordType == HoodieRecordType.AVRO) {
-      records = avroRDD.map(record -> {
-        GenericRecord gr = isDropPartitionColumns() ? 
HoodieAvroUtils.removeFields(record, partitionColumns) : record;
+      // if auto generation of keys is enabled, lets generate one
+      JavaRDD<Tuple2<GenericRecord, Option<String>>> 
recordRecordKeyOverrideRdd = avroRDD.mapPartitions(
+          (FlatMapFunction<Iterator<GenericRecord>, Tuple2<GenericRecord, 
Option<String>>>) genericRecordIterator -> {
+            int rowId = 0;

Review Comment:
   just in 2 places right. deltastreamer and other spark writers.
   yeah, its not easy to avoid de-dup here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to