This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch recordKeyGenRefactor
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b26ffe6600ce80dbeed7c43e5de72b6ff3ddeb99
Author: sivabalan <[email protected]>
AuthorDate: Mon Jan 16 15:05:41 2023 -0800

    Fixing record key generation so that any key generator class can have any 
record key generation(simple, custom, auto generation
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  14 ++
 .../apache/hudi/keygen/AutoRecordKeyGenerator.java | 235 +++++++++++++++++++++
 .../hudi/keygen/ComplexAvroKeyGenerator.java       |  11 +-
 .../hudi/keygen/ComplexAvroRecordKeyGenerator.java |  42 ++++
 .../hudi/keygen/GlobalAvroDeleteKeyGenerator.java  |   5 +-
 .../keygen/NonpartitionedAvroKeyGenerator.java     |   8 +-
 .../apache/hudi/keygen/SimpleAvroKeyGenerator.java |  11 +-
 .../hudi/keygen/SimpleAvroRecordKeyGenerator.java  |  40 ++++
 .../keygen/factory/RecordKeyGeneratorFactory.java  |  45 ++++
 .../hudi/keygen/SparkKeyGeneratorInterface.java    |  26 +--
 ....java => SparkRecordKeyGeneratorInterface.java} |  31 +--
 .../org/apache/hudi/keygen/BaseKeyGenerator.java   |   4 +-
 .../org/apache/hudi/keygen/RecordKeyGenerator.java |  32 +++
 13 files changed, 435 insertions(+), 69 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b70b13c0833..b907905f99c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -549,6 +549,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("When table is upgraded from pre 0.12 to 0.12, we 
check for \"default\" partition and fail if found one. "
           + "Users are expected to rewrite the data in those partitions. 
Enabling this config will bypass this validation");
 
+  public static final ConfigProperty<Boolean> AUTO_GENERATE_RECORD_KEYS = 
ConfigProperty.key("hoodie.auto.generate.record.keys")
+      .defaultValue(false)
+      .sinceVersion("0.13.0")
+      .withDocumentation("to be added");
+
   private ConsistencyGuardConfig consistencyGuardConfig;
   private FileSystemRetryConfig fileSystemRetryConfig;
 
@@ -2201,6 +2206,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(SKIP_DEFAULT_PARTITION_VALIDATION);
   }
 
+  public Boolean doAutoGenerateRecordKeys() {
+    return getBooleanOrDefault(AUTO_GENERATE_RECORD_KEYS);
+  }
+
   /**
    * Are any table services configured to run inline for both scheduling and 
execution?
    *
@@ -2723,6 +2732,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withAutoGenerateRecordKeys(boolean autoGenerateRecordKeys) {
+      writeConfig.setValue(AUTO_GENERATE_RECORD_KEYS, 
String.valueOf(autoGenerateRecordKeys));
+      return this;
+    }
+
     protected void setDefaults() {
       writeConfig.setDefaultValue(MARKERS_TYPE, 
getDefaultMarkersType(engineType));
       // Check for mandatory properties
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
new file mode 100644
index 00000000000..6beba686dfc
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.keygen.CustomAvroKeyGenerator.SPLIT_REGEX;
+
+/**
+ * Auto record key generator. This generator will fetch values from the entire 
record based on some of the fields and determine the record key.
+ * Use-cases where users may not be able to configure record keys, can use 
this auto record key generator.
+ */
+public class AutoRecordKeyGenerator implements RecordKeyGenerator {
+
+  private final TypedProperties config;
+  private static final String HOODIE_PREFIX = "_hoodie";
+  private static final String DOT = ".";
+  private final int maxFieldsToConsider;
+  private final int numFieldsForKey;
+  private final Set<String> partitionFieldNames;
+  private int[][] fieldOrdering;
+
+  public AutoRecordKeyGenerator(TypedProperties config, List<String> 
partitionPathFields) {
+    this.config = config;
+    this.numFieldsForKey = 
config.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), 
KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.defaultValue());
+    // cap the number of fields to order in case of large schemas
+    this.maxFieldsToConsider = numFieldsForKey * 3;
+    this.partitionFieldNames = partitionPathFields.stream().map(field -> 
field.split(SPLIT_REGEX)[0]).collect(Collectors.toSet());
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return buildKey(getFieldOrdering(record), record);
+  }
+
+  int[][] getFieldOrdering(GenericRecord genericRecord) {
+    if (fieldOrdering == null) {
+      fieldOrdering = 
buildFieldOrdering(genericRecord.getSchema().getFields());
+    }
+    return fieldOrdering;
+  }
+
+  /**
+   * Deterministically builds a key for the input value based on the provided 
fieldOrdering. The first {@link #numFieldsForKey} non-null values will be used 
to generate a string that is passed to
+   * {@link UUID#nameUUIDFromBytes(byte[])}.
+   * @param fieldOrdering an array of integer arrays. The integer arrays 
represent paths to a single field within the input object.
+   * @param input the input object that needs a key
+   * @return a deterministically generated {@link UUID}
+   * @param <T> the input object type
+   */
+  private <T> String buildKey(int[][] fieldOrdering, GenericRecord input) {
+    StringBuilder key = new StringBuilder();
+    int nonNullFields = 0;
+    for (int[] index : fieldOrdering) {
+      Object value = getFieldForRecord(input, index);
+      if (value == null) {
+        continue;
+      }
+      nonNullFields++;
+      key.append(value.hashCode());
+      if (nonNullFields >= numFieldsForKey) {
+        break;
+      }
+    }
+    return 
UUID.nameUUIDFromBytes(key.toString().getBytes(StandardCharsets.UTF_8)).toString();
+  }
+
+  /**
+   * Gets the value of the field at the specified path within the record.
+   * @param record the input record
+   * @param fieldPath the path to the field as an array of integers 
representing the field position within the object
+   * @return value at the path
+   */
+  private static Object getFieldForRecord(GenericRecord record, int[] 
fieldPath) {
+    Object value = record;
+    for (Integer index : fieldPath) {
+      if (value == null) {
+        return null;
+      }
+      value = ((GenericRecord) value).get(index);
+    }
+    return value;
+  }
+
+  private int[][] buildFieldOrdering(List<Schema.Field> initialFields) {
+    PriorityQueue<Pair<int[], Integer>> queue = new 
PriorityQueue<>(maxFieldsToConsider + 1, RankingComparator.getInstance());
+    Queue<FieldToProcess> fieldsToProcess = new ArrayDeque<>();
+    for (int j = 0; j < initialFields.size(); j++) {
+      fieldsToProcess.offer(new FieldToProcess(new int[]{j}, 
initialFields.get(j), initialFields.get(j).name()));
+    }
+    while (!fieldsToProcess.isEmpty()) {
+      FieldToProcess fieldToProcess = fieldsToProcess.poll();
+      int[] existingPath = fieldToProcess.getIndexPath();
+      Schema fieldSchema = fieldToProcess.getField().schema();
+      if (fieldSchema.getType() == Schema.Type.UNION) {
+        fieldSchema = fieldSchema.getTypes().get(1);
+      }
+      if (fieldSchema.getType() == Schema.Type.RECORD) {
+        List<Schema.Field> nestedFields = fieldSchema.getFields();
+        for (int i = 0; i < nestedFields.size(); i++) {
+          int[] path = Arrays.copyOf(existingPath, existingPath.length + 1);
+          path[existingPath.length] = i;
+          Schema.Field nestedField = nestedFields.get(i);
+          fieldsToProcess.add(new FieldToProcess(path, nestedField, 
fieldToProcess.getNamePath() + DOT + nestedField.name()));
+        }
+      } else {
+        // check that field is not used in partitioning
+        if (!partitionFieldNames.contains(fieldToProcess.getNamePath())) {
+          queue.offer(Pair.of(existingPath, 
getSchemaRanking(fieldToProcess.getField())));
+          if (queue.size() > maxFieldsToConsider) {
+            queue.poll();
+          }
+        }
+      }
+    }
+    Pair<int[], Integer>[] sortedPairs = queue.toArray(new Pair[queue.size()]);
+    Arrays.sort(sortedPairs, RankingComparator.getInstance().reversed());
+    int[][] output = new int[sortedPairs.length][];
+    for (int k = 0; k < sortedPairs.length; k++) {
+      output[k] = sortedPairs[k].getLeft();
+    }
+    return output;
+  }
+
+  private static class FieldToProcess {
+    final int[] indexPath;
+    final Schema.Field field;
+    final String namePath;
+
+    public FieldToProcess(int[] indexPath, Schema.Field field, String 
namePath) {
+      this.indexPath = indexPath;
+      this.field = field;
+      this.namePath = namePath;
+    }
+
+    public int[] getIndexPath() {
+      return indexPath;
+    }
+
+    public Schema.Field getField() {
+      return field;
+    }
+
+    public String getNamePath() {
+      return namePath;
+    }
+  }
+
+  /**
+   * Ranks the fields by their type.
+   * @param field input field
+   * @return a score of 0 to 4
+   */
+  private int getSchemaRanking(Schema.Field field) {
+    if (field.name().startsWith(HOODIE_PREFIX)) {
+      return 0;
+    }
+    Schema schema = field.schema();
+    if (schema.getType() == Schema.Type.UNION) {
+      schema = schema.getTypes().get(0).getType() == Schema.Type.NULL ? 
schema.getTypes().get(1) : schema.getTypes().get(0);
+    }
+    Schema.Type type = schema.getType();
+    switch (type) {
+      case LONG:
+        // assumes long with logical type will be a timestamp
+        return schema.getLogicalType() != null ? 4 : 3;
+      case INT:
+        // assumes long with logical type will be a date which will have low 
variance in a batch
+        return schema.getLogicalType() != null ? 1 : 3;
+      case DOUBLE:
+      case FLOAT:
+        return 3;
+      case BOOLEAN:
+      case MAP:
+      case ARRAY:
+        return 1;
+      default:
+        return 2;
+    }
+  }
+
+  private static class RankingComparator implements Comparator<Pair<int[], 
Integer>> {
+    private static final RankingComparator INSTANCE = new RankingComparator();
+
+    static RankingComparator getInstance() {
+      return INSTANCE;
+    }
+
+    @Override
+    public int compare(Pair<int[], Integer> o1, Pair<int[], Integer> o2) {
+      int initialResult = o1.getRight().compareTo(o2.getRight());
+      if (initialResult == 0) {
+        // favor the smaller list (less nested value) on ties
+        int sizeResult = Integer.compare(o2.getLeft().length, 
o1.getLeft().length);
+        if (sizeResult == 0) {
+          return Integer.compare(o2.getLeft()[0], o1.getLeft()[0]);
+        }
+        return sizeResult;
+      }
+      return initialResult;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
index 9ff5c522e45..581ddaa90f5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory;
 
 import java.util.Arrays;
 import java.util.stream.Collectors;
@@ -29,22 +30,22 @@ import java.util.stream.Collectors;
  */
 public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
   public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":";
+  private final RecordKeyGenerator recordKeyGenerator;
 
   public ComplexAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
-        .map(String::trim)
-        .filter(s -> !s.isEmpty())
-        .collect(Collectors.toList());
+    this.recordKeyFields = 
Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
     this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(","))
         .map(String::trim)
         .filter(s -> !s.isEmpty())
         .collect(Collectors.toList());
+    this.recordKeyGenerator = 
RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, 
isConsistentLogicalTimestampEnabled(),
+        partitionPathFields);
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), 
isConsistentLogicalTimestampEnabled());
+    return recordKeyGenerator.getRecordKey(record);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroRecordKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroRecordKeyGenerator.java
new file mode 100644
index 00000000000..cd86b8f0834
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroRecordKeyGenerator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.List;
+
+/**
+ * Complex record key generator.
+ */
+public class ComplexAvroRecordKeyGenerator implements RecordKeyGenerator {
+
+  private final List<String> recordKeyFields;
+  private final boolean consistentLogicalTimestampEnabled;
+
+  public ComplexAvroRecordKeyGenerator(List<String> recordKeyFields, boolean 
consistentLogicalTimestampEnabled) {
+    this.recordKeyFields = recordKeyFields;
+    this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return KeyGenUtils.getRecordKey(record, recordKeyFields, 
consistentLogicalTimestampEnabled);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
index dc0bc3cef2f..ba66b1a32f8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,15 +33,17 @@ import java.util.List;
 public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
 
   private static final String EMPTY_PARTITION = "";
+  private final RecordKeyGenerator recordKeyGenerator;
 
   public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
     super(config);
     this.recordKeyFields = 
Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
+    this.recordKeyGenerator = 
RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, 
isConsistentLogicalTimestampEnabled(), new ArrayList<>());
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), 
isConsistentLogicalTimestampEnabled());
+    return recordKeyGenerator.getRecordKey(record);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
index 5b5cedcbf88..4efbaf9b857 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,12 +34,14 @@ public class NonpartitionedAvroKeyGenerator extends 
BaseKeyGenerator {
 
   private static final String EMPTY_PARTITION = "";
   private static final List<String> EMPTY_PARTITION_FIELD_LIST = new 
ArrayList<>();
+  private final RecordKeyGenerator recordKeyGenerator;
 
   public NonpartitionedAvroKeyGenerator(TypedProperties props) {
     super(props);
     this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
         .split(",")).map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
     this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST;
+    this.recordKeyGenerator = 
RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, 
isConsistentLogicalTimestampEnabled(), partitionPathFields);
   }
 
   @Override
@@ -56,10 +59,7 @@ public class NonpartitionedAvroKeyGenerator extends 
BaseKeyGenerator {
     // for backward compatibility, we need to use the right format according 
to the number of record key fields
     // 1. if there is only one record key field, the format of record key is 
just "<value>"
     // 2. if there are multiple record key fields, the format is 
"<field1>:<value1>,<field2>:<value2>,..."
-    if (getRecordKeyFieldNames().size() == 1) {
-      return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), 
isConsistentLogicalTimestampEnabled());
-    }
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), 
isConsistentLogicalTimestampEnabled());
+    return recordKeyGenerator.getRecordKey(record);
   }
 
   public String getEmptyPartition() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
index c7398e94ece..85a3fb74f27 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
@@ -20,7 +20,9 @@ package org.apache.hudi.keygen;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 
 /**
@@ -28,6 +30,8 @@ import java.util.Collections;
  */
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
+  private final RecordKeyGenerator recordKeyGenerator;
+
   public SimpleAvroKeyGenerator(TypedProperties props) {
     this(props, 
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
@@ -39,15 +43,14 @@ public class SimpleAvroKeyGenerator extends 
BaseKeyGenerator {
 
   SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String 
partitionPathField) {
     super(props);
-    this.recordKeyFields = recordKeyField == null
-        ? Collections.emptyList()
-        : Collections.singletonList(recordKeyField);
+    this.recordKeyFields = 
Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
     this.partitionPathFields = Collections.singletonList(partitionPathField);
+    this.recordKeyGenerator = 
RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, 
isConsistentLogicalTimestampEnabled(), partitionPathFields);
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), 
isConsistentLogicalTimestampEnabled());
+    return recordKeyGenerator.getRecordKey(record);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroRecordKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroRecordKeyGenerator.java
new file mode 100644
index 00000000000..52bf7ac872b
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroRecordKeyGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Simple record key generator.
+ */
+public class SimpleAvroRecordKeyGenerator implements RecordKeyGenerator {
+
+  private final String recordKeyField;
+  private final boolean consistentLogicalTimestampEnabled;
+
+  public SimpleAvroRecordKeyGenerator(String recordKeyField, boolean 
consistentLogicalTimestampEnabled) {
+    this.recordKeyField = recordKeyField;
+    this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return KeyGenUtils.getRecordKey(record, recordKeyField, 
consistentLogicalTimestampEnabled);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/RecordKeyGeneratorFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/RecordKeyGeneratorFactory.java
new file mode 100644
index 00000000000..e884d4db5c4
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/RecordKeyGeneratorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen.factory;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.ComplexAvroRecordKeyGenerator;
+import org.apache.hudi.keygen.AutoRecordKeyGenerator;
+import org.apache.hudi.keygen.RecordKeyGenerator;
+import org.apache.hudi.keygen.SimpleAvroRecordKeyGenerator;
+
+import java.util.List;
+
+/**
+ * Factory to instantiate RecordKeyGenerator.
+ */
+public class RecordKeyGeneratorFactory {
+
+  public static RecordKeyGenerator getRecordKeyGenerator(TypedProperties 
config, List<String> recordKeyFields, boolean consistentLogicalTimestampEnabled,
+                                                         List<String> 
partitionPathFields) {
+    if (config.getBoolean(HoodieWriteConfig.AUTO_GENERATE_RECORD_KEYS.key(), 
HoodieWriteConfig.AUTO_GENERATE_RECORD_KEYS.defaultValue())) {
+      return new AutoRecordKeyGenerator(config, partitionPathFields);
+    } else if (recordKeyFields.size() == 1) {
+      return new SimpleAvroRecordKeyGenerator(recordKeyFields.get(0), 
consistentLogicalTimestampEnabled);
+    } else {
+      return new ComplexAvroRecordKeyGenerator(recordKeyFields, 
consistentLogicalTimestampEnabled);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
index 977ff709bb1..38ea518508f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
@@ -30,31 +30,7 @@ import org.apache.spark.unsafe.types.UTF8String;
  * specifically implement record-key, partition-path generation w/o the need 
for (expensive)
  * conversion from Spark internal representation (for ex, to Avro)
  */
-public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
-
-  /**
-   * Extracts record key from Spark's {@link Row}
-   *
-   * @param row instance of {@link Row} from which record-key is extracted
-   * @return record's (primary) key
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  String getRecordKey(Row row);
-
-  /**
-   * Extracts record key from Spark's {@link InternalRow}
-   *
-   * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link 
InternalRow} could
-   *       internally hold just a binary representation of the data, while 
{@link Row} has it
-   *       deserialized into JVM-native representation (like {@code Integer}, 
{@code Long},
-   *       {@code String}, etc)
-   *
-   * @param row instance of {@link InternalRow} from which record-key is 
extracted
-   * @param schema schema {@link InternalRow} is adhering to
-   * @return record-key as instance of {@link UTF8String}
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  UTF8String getRecordKey(InternalRow row, StructType schema);
+public interface SparkKeyGeneratorInterface extends 
SparkRecordKeyGeneratorInterface {
 
   /**
    * Extracts partition-path from {@link Row}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkRecordKeyGeneratorInterface.java
similarity index 61%
copy from 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
copy to 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkRecordKeyGeneratorInterface.java
index 977ff709bb1..3d4ccc0de0e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkRecordKeyGeneratorInterface.java
@@ -20,17 +20,16 @@ package org.apache.hudi.keygen;
 
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIMethod;
+
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
- * Spark-specific {@link KeyGenerator} interface extension allowing 
implementation to
- * specifically implement record-key, partition-path generation w/o the need 
for (expensive)
- * conversion from Spark internal representation (for ex, to Avro)
+ * Spark's record key generator interface to assist in generating record key 
for a given spark row.
  */
-public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
+public interface SparkRecordKeyGeneratorInterface {
 
   /**
    * Extracts record key from Spark's {@link Row}
@@ -55,28 +54,4 @@ public interface SparkKeyGeneratorInterface extends 
KeyGeneratorInterface {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   UTF8String getRecordKey(InternalRow row, StructType schema);
-
-  /**
-   * Extracts partition-path from {@link Row}
-   *
-   * @param row instance of {@link Row} from which partition-path is extracted
-   * @return record's partition-path
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  String getPartitionPath(Row row);
-
-  /**
-   * Extracts partition-path from Spark's {@link InternalRow}
-   *
-   * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link 
InternalRow} could
-   *       internally hold just a binary representation of the data, while 
{@link Row} has it
-   *       deserialized into JVM-native representation (like {@code Integer}, 
{@code Long},
-   *       {@code String}, etc)
-   *
-   * @param row instance of {@link InternalRow} from which record-key is 
extracted
-   * @param schema schema {@link InternalRow} is adhering to
-   * @return partition-path as instance of {@link UTF8String}
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  UTF8String getPartitionPath(InternalRow row, StructType schema);
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java 
b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
index d0baa903919..e3d5a3b18bb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -30,7 +30,7 @@ import java.util.List;
  * Base abstract class to extend for {@link KeyGenerator} with default logic 
of taking
  * partitioning and timestamp configs.
  */
-public abstract class BaseKeyGenerator extends KeyGenerator {
+public abstract class BaseKeyGenerator extends KeyGenerator implements 
RecordKeyGenerator {
 
   protected List<String> recordKeyFields;
   protected List<String> partitionPathFields;
@@ -51,7 +51,7 @@ public abstract class BaseKeyGenerator extends KeyGenerator {
   /**
    * Generate a record Key out of provided generic record.
    */
-  public abstract String getRecordKey(GenericRecord record);
+  //public abstract String getRecordKey(GenericRecord record);
 
   /**
    * Generate a partition path out of provided generic record.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/keygen/RecordKeyGenerator.java 
b/hudi-common/src/main/java/org/apache/hudi/keygen/RecordKeyGenerator.java
new file mode 100644
index 00000000000..2854c5de2d8
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/RecordKeyGenerator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Interface to fetch record key given a GenericRecord.
+ */
+public interface RecordKeyGenerator {
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  String getRecordKey(GenericRecord record);
+}

Reply via email to