danny0405 commented on code in PR #8107:
URL: https://github.com/apache/hudi/pull/8107#discussion_r1184700259


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java:
##########
@@ -71,22 +73,31 @@ public static KeyGenerator 
createAvroKeyGeneratorByType(TypedProperties props) t
       throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + 
keyGeneratorType);
     }
 
+    BaseKeyGenerator keyGenerator = null;
+
     switch (keyGeneratorTypeEnum) {
       case SIMPLE:
-        return new SimpleAvroKeyGenerator(props);
+        keyGenerator = new SimpleAvroKeyGenerator(props);
+        break;
       case COMPLEX:
-        return new ComplexAvroKeyGenerator(props);
+        keyGenerator = new ComplexAvroKeyGenerator(props);
+        break;
       case TIMESTAMP:
-        return new TimestampBasedAvroKeyGenerator(props);
+        keyGenerator = new TimestampBasedAvroKeyGenerator(props);
+        break;
       case CUSTOM:
-        return new CustomAvroKeyGenerator(props);
+        keyGenerator = new CustomAvroKeyGenerator(props);
+        break;
       case NON_PARTITION:
-        return new NonpartitionedAvroKeyGenerator(props);
+        keyGenerator = new NonpartitionedAvroKeyGenerator(props);
+        break;
       case GLOBAL_DELETE:
-        return new GlobalAvroDeleteKeyGenerator(props);
+        keyGenerator = new GlobalAvroDeleteKeyGenerator(props);
+        break;
       default:
         throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " 
+ keyGeneratorType);
     }
+    return new HoodieWrapperAvroKeyGenerator(props, keyGenerator);
   }

Review Comment:
   Can we return the wrapper generator only when there is necessity, like when 
we are sure it is the key-less use case.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/HoodieWrapperKeyGenerator.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.util.List;
+
+/**
+ * A wrapper key generator to intercept getRecordKey calls for auto record key 
generator.
+ */
+public class HoodieWrapperKeyGenerator extends BuiltinKeyGenerator {
+
+  private final BuiltinKeyGenerator builtinKeyGenerator;
+
+  public HoodieWrapperKeyGenerator(TypedProperties config, BuiltinKeyGenerator 
builtinKeyGenerator) {
+    super(config);
+    this.builtinKeyGenerator = builtinKeyGenerator;
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    if (autoGenerateRecordKeys()) {
+      return StringUtils.EMPTY_STRING;
+    } else {

Review Comment:
   The `autoGenerateRecordKeys()` attribute should only restrict to this class: 
move it out from the base class.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1118,44 +1124,70 @@ object HoodieSparkSqlWriter {
           Some(writerSchema))
 
         avroRecords.mapPartitions(it => {
+          val sparkPartitionId = TaskContext.getPartitionId()
+
           val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
           val consistentLogicalTimestampEnabled = parameters.getOrElse(
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
 
-          it.map { avroRecord =>
+          // generate record keys if auto generation is enabled.
+          val recordsWithRecordKeyOverride = 
mayBeAutoGenerateRecordKeys(autoGenerateRecordKeys, it, instantTime, 
sparkPartitionId)
+
+          // handle dropping partition columns
+          recordsWithRecordKeyOverride.map { avroRecordRecordKeyOverRide =>
             val processedRecord = if (shouldDropPartitionColumns) {
-              HoodieAvroUtils.rewriteRecord(avroRecord, dataFileSchema)
+              HoodieAvroUtils.rewriteRecord(avroRecordRecordKeyOverRide._1, 
dataFileSchema)
+            } else {
+              avroRecordRecordKeyOverRide._1
+            }
+
+            // Generate HoodieKey for records

Review Comment:
   Can we conceal all the auto ken gen logic/changes just into the wrapper key 
generator, if you need the `TaskContext.getPartitionId()` as an auxiliary, just 
pass it into the wrapper key generator(may need some code refactoring here).



##########
hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java:
##########
@@ -63,10 +67,14 @@ protected BaseKeyGenerator(TypedProperties config) {
    */
   @Override
   public final HoodieKey getKey(GenericRecord record) {
-    if (getRecordKeyFieldNames() == null || getPartitionPathFields() == null) {
+    if (!autoGenerateRecordKeys() && (getRecordKeyFieldNames() == null || 
getPartitionPathFields() == null)) {
       throw new HoodieKeyException("Unable to find field names for record key 
or partition path in cfg");
     }
-    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+    String recordKey = StringUtils.EMPTY_STRING;
+    if (!autoGenerateRecordKeys()) {

Review Comment:
   The base class should never introduce any key-less related changes.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java:
##########
@@ -75,11 +75,6 @@ public void testNullPartitionPathFields() {
     Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp()));
   }
 
-  @Test
-  public void testNullRecordKeyFields() {
-    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
-  }

Review Comment:
   We need to keep these tests, just set the operation as `upsert` to make the 
tests pass.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java:
##########
@@ -32,7 +33,8 @@ public class ComplexAvroKeyGenerator extends BaseKeyGenerator 
{
 
   public ComplexAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+    this.recordKeyFields = autoGenerateRecordKeys() ? Collections.emptyList() :

Review Comment:
   Why we still keep the changes in these key generators? Shouldn't all the 
keyless realted changes only happen in the wrapper key generator?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/HoodieWrapperAvroKeyGenerator.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.List;
+
+/**
+ * A wrapper key generator to intercept getRecordKey calls for auto record key 
generator.
+ */
+public class HoodieWrapperAvroKeyGenerator extends BaseKeyGenerator {
+
+  private BaseKeyGenerator keyGenerator;
+
+  public HoodieWrapperAvroKeyGenerator(TypedProperties config, 
BaseKeyGenerator keyGenerator) {
+    super(config);
+    this.keyGenerator = keyGenerator;
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    if (autoGenerateRecordKeys()) {
+      return StringUtils.EMPTY_STRING;

Review Comment:
   The `autoGenerateRecordKeys()` attribute should only restrict to this class: 
move it out from the base class.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala:
##########
@@ -150,8 +150,9 @@ object HoodieWriterUtils {
       if (null != tableConfig) {
         val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
         val tableConfigRecordKey = 
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
-        if (null != datasourceRecordKey && null != tableConfigRecordKey
-          && datasourceRecordKey != tableConfigRecordKey) {
+        if ((null != datasourceRecordKey && null != tableConfigRecordKey
+          && datasourceRecordKey != tableConfigRecordKey) || (null != 
datasourceRecordKey && !datasourceRecordKey.isEmpty
+          && tableConfigRecordKey == null)) {
           
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")

Review Comment:
   Can we add some doc what the new condition represents?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -528,9 +527,13 @@ public String getPreCombineField() {
   }
 
   public Option<String[]> getRecordKeyFields() {
-    String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
-    return Option.of(Arrays.stream(keyFieldsValue.split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}));
+    String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, null);
+    if (keyFieldsValue == null) {
+      return Option.empty();

Review Comment:
   Should be cautious with this change, please make sure the invoker does not 
explicitly relies on the fetchiing of metadata field: 
`HoodieRecord.RECORD_KEY_METADATA_FIELD`.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -610,14 +616,31 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
     SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
     SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns() ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
     if (recordType == HoodieRecordType.AVRO) {
-      records = avroRDD.map(record -> {
-        GenericRecord gr = isDropPartitionColumns() ? 
HoodieAvroUtils.removeFields(record, partitionColumns) : record;
+      // if auto generation of keys is enabled, lets generate one
+      JavaRDD<Tuple2<GenericRecord, Option<String>>> 
recordRecordKeyOverrideRdd = avroRDD.mapPartitions(
+          (FlatMapFunction<Iterator<GenericRecord>, Tuple2<GenericRecord, 
Option<String>>>) genericRecordIterator -> {
+            int rowId = 0;
+            int sparkPartitionId = TaskContext.getPartitionId();
+            List<Tuple2<GenericRecord, Option<String>>> 
recordWithRecordKeyOverride = new ArrayList<>();
+            while (genericRecordIterator.hasNext()) {
+              GenericRecord genRec = genericRecordIterator.next();
+              Option<String> recordKeyOpt = autoGenerateRecordKeys ? 
Option.of(HoodieRecord.generateSequenceId(instantTime, sparkPartitionId, 
rowId++))
+                  : Option.empty();
+              recordWithRecordKeyOverride.add(new Tuple2(genRec, 
recordKeyOpt));
+            }

Review Comment:
   Ditto with `HoodieSparkSqlWriter`.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java:
##########
@@ -61,10 +61,11 @@ public class KeyGenUtils {
    */
   public static KeyGeneratorType inferKeyGeneratorType(
       String recordsKeyFields, String partitionFields) {
+    boolean autoGenerateRecordKeys = recordsKeyFields == null;
     if (!StringUtils.isNullOrEmpty(partitionFields)) {
       int numPartFields = partitionFields.split(",").length;
-      int numRecordKeyFields = recordsKeyFields.split(",").length;
-      if (numPartFields == 1 && numRecordKeyFields == 1) {
+      int numRecordKeyFields = autoGenerateRecordKeys ? 0 : 
recordsKeyFields.split(",").length;

Review Comment:
   Not sure why we need this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to