vinothchandar commented on code in PR #8107:
URL: https://github.com/apache/hudi/pull/8107#discussion_r1144114597


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java:
##########
@@ -36,8 +39,9 @@ public class NonpartitionedAvroKeyGenerator extends 
BaseKeyGenerator {
 
   public NonpartitionedAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
-        .split(",")).map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
+    this.recordKeyFields = autoGenerateRecordKeys() ? Collections.emptyList() :

Review Comment:
   should this code be pushed to the super class?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java:
##########
@@ -32,7 +34,8 @@ public class ComplexAvroKeyGenerator extends BaseKeyGenerator 
{
 
   public ComplexAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+    this.recordKeyFields = autoGenerateRecordKeys() ? Collections.emptyList() :

Review Comment:
   may be a simple if instead of ternary is more readable? ternary is best used 
IMO for simple expressions. 
   
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, 
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null)),

Review Comment:
   won't it return `null` if the key is not found in `props` anyways?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, 
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null)),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
   SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
-    this(props, null, partitionPathField);
+    this(props, Option.empty(), partitionPathField);
   }
 
-  SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String 
partitionPathField) {
+  SimpleAvroKeyGenerator(TypedProperties props, Option<String> recordKeyField, 
String partitionPathField) {
     super(props);
-    this.recordKeyFields = recordKeyField == null
+    this.recordKeyFields = !recordKeyField.isPresent()
         ? Collections.emptyList()
-        : Collections.singletonList(recordKeyField);
+        : Collections.singletonList(recordKeyField.get());
     this.partitionPathFields = Collections.singletonList(partitionPathField);
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
+    if (autoGenerateRecordKeys()) {
+      // To fetch partition path, caller will have to call getKey() on 
KeyGenerator and call .getPartitionPath. Hence we have to pass empty field to 
support

Review Comment:
   avoid this comment everywhere?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, 
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null)),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
   SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
-    this(props, null, partitionPathField);
+    this(props, Option.empty(), partitionPathField);
   }
 
-  SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String 
partitionPathField) {
+  SimpleAvroKeyGenerator(TypedProperties props, Option<String> recordKeyField, 
String partitionPathField) {

Review Comment:
   This does not break any reflection based instantiation?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -82,9 +85,19 @@ object HoodieDatasetBulkInsertHelper
           val keyGenerator =
             ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
               .asInstanceOf[SparkKeyGeneratorInterface]
+          val partitionId = TaskContext.getPartitionId()
+          var rowId = 0

Review Comment:
   nts: this var gets copied over to each executed and we generate the `rowId` 
independently for each partition?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java:
##########
@@ -41,7 +42,7 @@ public class TimestampBasedKeyGenerator extends 
SimpleKeyGenerator {
   private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator;
 
   public TimestampBasedKeyGenerator(TypedProperties config) throws IOException 
{
-    this(config, 
config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(config, 
config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null),

Review Comment:
   same. will `getString()` not return null if not found



##########
hudi-common/src/main/java/org/apache/hudi/exception/HoodieAutoRecordKeyException.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Used for exceptions related to Auto generation of record keys.
+ */
+public class HoodieAutoRecordKeyException extends HoodieException {

Review Comment:
   rename: HoodieKeyGenerationException



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.{HoodieAutoRecordKeyException, 
HoodieException}
+import org.apache.spark.TaskContext
+
+object AutoRecordKeyGenerationUtils {
+
+   // supported operation types when auto generation of record keys is enabled.
+   val supportedOperations: Set[String] =

Review Comment:
   Can we simplify this more for the user. automatically turn on the config, if 
the user does `bulk_insert` from all Spark write paths. If they upsert the 
table later, then we generate keys for inserted records.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, 
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null)),

Review Comment:
   
https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html#getProperty-java.lang.String-



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.TaskContext
+
+object AutoRecordKeyGenerationUtils {
+
+   // supported operation types when auto generation of record keys is enabled.
+   val supportedOperations: Set[String] =
+    Set(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, 
WriteOperationType.DELETE,
+      WriteOperationType.INSERT_OVERWRITE, 
WriteOperationType.INSERT_OVERWRITE_TABLE,
+      WriteOperationType.DELETE_PARTITION).map(_.name())

Review Comment:
   idk how we would autogenerate keys for inserted records within an upsert for 
eg. without implementing this at the HoodieMerge/Create handle layers? Could 
you sketch that impl?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.{HoodieAutoRecordKeyException, 
HoodieException}
+import org.apache.spark.TaskContext
+
+object AutoRecordKeyGenerationUtils {
+
+   // supported operation types when auto generation of record keys is enabled.
+   val supportedOperations: Set[String] =

Review Comment:
   I think making this a top level choice, introduces the need to care about a 
new config auto generation of key, to not configure an existing config - key 
generator.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, 
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null)),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
   SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
-    this(props, null, partitionPathField);
+    this(props, Option.empty(), partitionPathField);
   }
 
-  SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String 
partitionPathField) {
+  SimpleAvroKeyGenerator(TypedProperties props, Option<String> recordKeyField, 
String partitionPathField) {
     super(props);
-    this.recordKeyFields = recordKeyField == null
+    this.recordKeyFields = !recordKeyField.isPresent()

Review Comment:
   use `Option.ifPresent`? or `Option.map.orElse` ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -260,6 +260,18 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("0.13.0")
       .withDocumentation("The metadata of secondary indexes");
 
+  public static final ConfigProperty<String> AUTO_GENERATE_RECORD_KEYS = 
ConfigProperty
+      .key("hoodie.table.auto.generate.record.keys")
+      .defaultValue("false")
+      .withDocumentation("Enables automatic generation of the record-keys in 
cases when dataset bears "

Review Comment:
   +1 we should track this in the separate usability improvement track



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -82,9 +85,19 @@ object HoodieDatasetBulkInsertHelper
           val keyGenerator =
             ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
               .asInstanceOf[SparkKeyGeneratorInterface]
+          val partitionId = TaskContext.getPartitionId()

Review Comment:
   we should be resilient to drive failures as well? the entire Hudi commit 
will abort if the driver is restarted? I am wondering if the RDD recomputation 
introduces any non-determinism here in this here. In general, as long as we 
don't recompute the source RDD, it should be resilient



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java:
##########
@@ -64,12 +67,22 @@ public String getPartitionPath(GenericRecord record) {
 
   @Override
   public String getRecordKey(Row row) {
+    if (autoGenerateRecordKeys()) {
+      // To fetch partition path, caller will have to call getKey() on 
KeyGenerator and call .getPartitionPath. Hence we have to pass empty field to 
support
+      // returning partition path for the callers.
+      return StringUtils.EMPTY_STRING;
+    }
     tryInitRowAccessor(row.schema());
     return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
   }
 
   @Override
   public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    if (autoGenerateRecordKeys()) {
+      // To fetch partition path, caller will have to call getKey() on 
KeyGenerator and call .getPartitionPath. Hence we have to pass empty field to 
support

Review Comment:
   remove comment



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -608,14 +617,31 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
     SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
     SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns() ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
     if (recordType == HoodieRecordType.AVRO) {
-      records = avroRDD.map(record -> {
-        GenericRecord gr = isDropPartitionColumns() ? 
HoodieAvroUtils.removeFields(record, partitionColumns) : record;
+      // if auto generation of keys is enabled, lets generate one
+      JavaRDD<Tuple2<GenericRecord, Option<String>>> 
recordRecordKeyOverrideRdd = avroRDD.mapPartitions(
+          (FlatMapFunction<Iterator<GenericRecord>, Tuple2<GenericRecord, 
Option<String>>>) genericRecordIterator -> {
+            int rowId = 0;

Review Comment:
   we are repeating this in 3 places now? I see that its hard to restructure 
though. hmmm.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -260,6 +260,18 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("0.13.0")
       .withDocumentation("The metadata of secondary indexes");
 
+  public static final ConfigProperty<String> AUTO_GENERATE_RECORD_KEYS = 
ConfigProperty
+      .key("hoodie.table.auto.generate.record.keys")
+      .defaultValue("false")
+      .withDocumentation("Enables automatic generation of the record-keys in 
cases when dataset bears "
+          + "no natural record key satisfying requirements of being the 
primary-key in the Hudi table. "
+          + "Record key auto-gen is generally recommended for 'append-only' 
workloads, ie ones leveraging 'insert' or "

Review Comment:
   IMO auto-generation should happen at the writer level. Do we need a table 
property? if there is no key generator, then the keys were auto generated?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1116,31 +1124,47 @@ object HoodieSparkSqlWriter {
           Some(writerSchema))
 
         avroRecords.mapPartitions(it => {
+          val sparkPartitionId = TaskContext.getPartitionId()
+
           val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
           val consistentLogicalTimestampEnabled = parameters.getOrElse(
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
 
-          it.map { avroRecord =>
+          // generate record keys is auto generation is enabled.
+          val recordsWithRecordKeyOverride = 
mayBeAutoGenerateRecordKeys(autoGenerateRecordKeys, it, instantTime, 
sparkPartitionId)

Review Comment:
   nts: need to dig deeper here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to