nsivabalan commented on code in PR #5523:
URL: https://github.com/apache/hudi/pull/5523#discussion_r923293257


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -57,187 +92,153 @@ public int numFields() {
   }
 
   @Override
-  public void setNullAt(int i) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = null;
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = null;
-          break;
-        }
-        case 2: {
-          this.recordKey = null;
-          break;
-        }
-        case 3: {
-          this.partitionPath = null;
-          break;
-        }
-        case 4: {
-          this.fileName = null;
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
-      }
+  public void setNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      metaFields[ordinal] = null;
     } else {
-      row.setNullAt(i);
+      row.setNullAt(rebaseOrdinal(ordinal));
     }
   }
 
   @Override
-  public void update(int i, Object value) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = value.toString();
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = value.toString();
-          break;
-        }
-        case 2: {
-          this.recordKey = value.toString();
-          break;
-        }
-        case 3: {
-          this.partitionPath = value.toString();
-          break;
-        }
-        case 4: {
-          this.fileName = value.toString();
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
+  public void update(int ordinal, Object value) {
+    if (ordinal < metaFields.length) {
+      if (value instanceof UTF8String) {
+        metaFields[ordinal] = (UTF8String) value;
+      } else if (value instanceof String) {
+        metaFields[ordinal] = UTF8String.fromString((String) value);
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Could not update the row at (%d) with value of type 
(%s), either UTF8String or String are expected", ordinal, 
value.getClass().getSimpleName()));
       }
     } else {
-      row.update(i, value);
+      row.update(rebaseOrdinal(ordinal), value);
     }
   }
 
-  private String getMetaColumnVal(int ordinal) {
-    switch (ordinal) {
-      case 0: {
-        return commitTime;
-      }
-      case 1: {
-        return commitSeqNumber;
-      }
-      case 2: {
-        return recordKey;
-      }
-      case 3: {
-        return partitionPath;
-      }
-      case 4: {
-        return fileName;
-      }
-      default: throw new IllegalArgumentException("Not expected");
+  @Override
+  public boolean isNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      return metaFields[ordinal] == null;
     }
+    return row.isNullAt(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public boolean isNullAt(int ordinal) {
+  public UTF8String getUTF8String(int ordinal) {
+    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      return metaFields[ordinal];
+    }
+    return row.getUTF8String(rebaseOrdinal(ordinal));
+  }
+
+  @Override
+  public Object get(int ordinal, DataType dataType) {
     if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return null == getMetaColumnVal(ordinal);
+      validateMetaFieldDataType(dataType);
+      return metaFields[ordinal];
     }
-    return row.isNullAt(ordinal);
+    return row.get(rebaseOrdinal(ordinal), dataType);
   }
 
   @Override
   public boolean getBoolean(int ordinal) {
-    return row.getBoolean(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Boolean.class);
+    return row.getBoolean(rebaseOrdinal(ordinal));
   }
 
   @Override
   public byte getByte(int ordinal) {
-    return row.getByte(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Byte.class);
+    return row.getByte(rebaseOrdinal(ordinal));
   }
 
   @Override
   public short getShort(int ordinal) {
-    return row.getShort(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Short.class);
+    return row.getShort(rebaseOrdinal(ordinal));
   }
 
   @Override
   public int getInt(int ordinal) {
-    return row.getInt(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Integer.class);
+    return row.getInt(rebaseOrdinal(ordinal));
   }
 
   @Override
   public long getLong(int ordinal) {
-    return row.getLong(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Long.class);
+    return row.getLong(rebaseOrdinal(ordinal));
   }
 
   @Override
   public float getFloat(int ordinal) {
-    return row.getFloat(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Float.class);
+    return row.getFloat(rebaseOrdinal(ordinal));
   }
 
   @Override
   public double getDouble(int ordinal) {
-    return row.getDouble(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Double.class);
+    return row.getDouble(rebaseOrdinal(ordinal));
   }
 
   @Override
   public Decimal getDecimal(int ordinal, int precision, int scale) {
-    return row.getDecimal(ordinal, precision, scale);
-  }
-
-  @Override
-  public UTF8String getUTF8String(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
-    }
-    return row.getUTF8String(ordinal);
-  }
-
-  @Override
-  public String getString(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return new String(getMetaColumnVal(ordinal).getBytes());
-    }
-    return row.getString(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Decimal.class);
+    return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
   }
 
   @Override
   public byte[] getBinary(int ordinal) {
-    return row.getBinary(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Byte[].class);
+    return row.getBinary(rebaseOrdinal(ordinal));
   }
 
   @Override
   public CalendarInterval getInterval(int ordinal) {
-    return row.getInterval(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
+    return row.getInterval(rebaseOrdinal(ordinal));
   }
 
   @Override
   public InternalRow getStruct(int ordinal, int numFields) {
-    return row.getStruct(ordinal, numFields);
+    ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
+    return row.getStruct(rebaseOrdinal(ordinal), numFields);
   }
 
   @Override
   public ArrayData getArray(int ordinal) {
-    return row.getArray(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
+    return row.getArray(rebaseOrdinal(ordinal));
   }
 
   @Override
   public MapData getMap(int ordinal) {
-    return row.getMap(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, MapData.class);
+    return row.getMap(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public Object get(int ordinal, DataType dataType) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
+  public InternalRow copy() {
+    return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), 
row.copy(), containsMetaFields);
+  }
+
+  private int rebaseOrdinal(int ordinal) {
+    // NOTE: In cases when source row does not contain meta fields, we will 
have to
+    //       rebase ordinal onto its indexes
+    return containsMetaFields ? ordinal : ordinal - metaFields.length;
+  }
+
+  private void validateMetaFieldDataType(DataType dataType) {
+    if (!dataType.sameType(StringType$.MODULE$)) {
+      throw new ClassCastException(String.format("Can not cast meta-field of 
type UTF8String to %s", dataType.simpleString()));
     }
-    return row.get(ordinal, dataType);
   }
 
-  @Override
-  public InternalRow copy() {
-    return new HoodieInternalRow(commitTime, commitSeqNumber, recordKey, 
partitionPath, fileName, row.copy());
+  private void ruleOutMetaFieldsAccess(int ordinal, Class<?> expectedDataType) 
{

Review Comment:
   do we need to have this validation in fast path? i..e for every column that 
is not meta field will be going through this for every record. Is there a way 
to optimize or reduce the frequency of validations. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -18,121 +18,334 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.ApiMaturityLevel;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.unsafe.UTF8StringBuilder;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.Function1;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.ThreadSafe;
 
-import scala.Function1;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link 
SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum 
performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should 
override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements 
SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new 
AtomicBoolean(false);
-  protected StructType structType;
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = 
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = 
new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo 
= new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return getRecordKey(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return 
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return getPartitionPath(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getPartitionPath(InternalRow internalRow, StructType 
schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+    return UTF8String.fromString(getPartitionPath(avroRecord));
+  }
+
+  protected void tryInitRowAccessor(StructType schema) {
+    if (this.rowAccessor == null) {
+      synchronized (this) {
+        if (this.rowAccessor == null) {
+          this.rowAccessor = new SparkRowAccessor(schema);
+        }
+      }
     }
-    return getKey(converterFn.apply(row)).getRecordKey();
   }
 
   /**
-   * Fetch partition path from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which partition path is requested
-   * @return the partition path of interest from {@link Row}.
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler 
to apply certain
+   *       optimizations, like inlining)
    */
-  @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getPartitionPath(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
+  protected final String combinePartitionPath(Object... partitionPathParts) {
+    checkState(partitionPathParts.length == recordKeyFields.size());
+    // Avoid creating [[StringBuilder]] in case there's just one 
partition-path part,
+    // and Hive-style of partitioning is not required
+    if (!hiveStylePartitioning && partitionPathParts.length == 1) {
+      return partitionPathParts[0] != null
+          ? partitionPathParts[0].toString()
+          : HUDI_DEFAULT_PARTITION_PATH;
     }
-    return getKey(converterFn.apply(row)).getPartitionPath();
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < partitionPathParts.length; ++i) {
+      String partitionPathPartStr = partitionPathParts[i] != null
+          ? partitionPathParts[i].toString()

Review Comment:
   may I know why encoding is honored in next method and not here? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.unsafe;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A helper class to write {@link UTF8String}s to an internal buffer and build 
the concatenated
+ * {@link UTF8String} at the end.
+ */
+public class UTF8StringBuilder {
+
+  private static final int ARRAY_MAX = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
+
+  private byte[] buffer;
+  private int cursor = Platform.BYTE_ARRAY_OFFSET;
+
+  public UTF8StringBuilder() {

Review Comment:
   do we have tests for this? 



##########
hudi-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java:
##########
@@ -52,7 +52,7 @@ protected KeyGenerator(TypedProperties config) {
    * @return list of field names, when concatenated make up the record key.
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public List<String> getRecordKeyFieldNames() {

Review Comment:
   we should be careful in renaming public methods. can we retain the same 
name. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -18,121 +18,334 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.ApiMaturityLevel;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.unsafe.UTF8StringBuilder;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.Function1;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.ThreadSafe;
 
-import scala.Function1;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link 
SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum 
performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should 
override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements 
SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new 
AtomicBoolean(false);
-  protected StructType structType;
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = 
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = 
new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo 
= new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return getRecordKey(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return 
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return getPartitionPath(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getPartitionPath(InternalRow internalRow, StructType 
schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+    return UTF8String.fromString(getPartitionPath(avroRecord));
+  }
+
+  protected void tryInitRowAccessor(StructType schema) {
+    if (this.rowAccessor == null) {
+      synchronized (this) {
+        if (this.rowAccessor == null) {
+          this.rowAccessor = new SparkRowAccessor(schema);
+        }
+      }
     }
-    return getKey(converterFn.apply(row)).getRecordKey();
   }
 
   /**
-   * Fetch partition path from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which partition path is requested
-   * @return the partition path of interest from {@link Row}.
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler 
to apply certain
+   *       optimizations, like inlining)
    */
-  @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getPartitionPath(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
+  protected final String combinePartitionPath(Object... partitionPathParts) {
+    checkState(partitionPathParts.length == recordKeyFields.size());
+    // Avoid creating [[StringBuilder]] in case there's just one 
partition-path part,
+    // and Hive-style of partitioning is not required
+    if (!hiveStylePartitioning && partitionPathParts.length == 1) {
+      return partitionPathParts[0] != null
+          ? partitionPathParts[0].toString()
+          : HUDI_DEFAULT_PARTITION_PATH;
     }
-    return getKey(converterFn.apply(row)).getPartitionPath();
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < partitionPathParts.length; ++i) {
+      String partitionPathPartStr = partitionPathParts[i] != null
+          ? partitionPathParts[i].toString()
+          : HUDI_DEFAULT_PARTITION_PATH;
+
+      // TODO support url-encoding
+
+      if (hiveStylePartitioning) {
+        sb.append(recordKeyFields.get(i))

Review Comment:
   again, is this referring to partitionPathFields.get(i)? 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/model/TestHoodieInternalRow.java:
##########
@@ -64,7 +65,13 @@ public void testGet() {
     Object[] values = getRandomValue(true);
 
     InternalRow row = new GenericInternalRow(values);
-    HoodieInternalRow hoodieInternalRow = new HoodieInternalRow("commitTime", 
"commitSeqNo", "recordKey", "partitionPath", "fileName", row);
+    HoodieInternalRow hoodieInternalRow = new 
HoodieInternalRow(UTF8String.fromString("commitTime"),
+        UTF8String.fromString("commitSeqNo"),

Review Comment:
   can we declare constants for these UTF8strings and use it across all tests? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -39,49 +39,74 @@
 import org.apache.log4j.Logger;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 
 /**
  * Create handle with InternalRow for datasource implementation of bulk insert.
  */
 public class HoodieRowCreateHandle implements Serializable {
 
   private static final long serialVersionUID = 1L;
+
   private static final Logger LOG = 
LogManager.getLogger(HoodieRowCreateHandle.class);
-  private static final AtomicLong SEQGEN = new AtomicLong(1);
+  private static final AtomicLong GLOBAL_SEQ_NO = new AtomicLong(1);
+
+  private static final Integer RECORD_KEY_META_FIELD_ORD =
+      
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD);

Review Comment:
   PARTITION_PATH_META_FIELD_POS as well



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -18,121 +18,334 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.ApiMaturityLevel;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.unsafe.UTF8StringBuilder;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.Function1;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.ThreadSafe;
 
-import scala.Function1;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link 
SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum 
performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should 
override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements 
SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new 
AtomicBoolean(false);
-  protected StructType structType;
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = 
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = 
new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo 
= new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return getRecordKey(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return 
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return getPartitionPath(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getPartitionPath(InternalRow internalRow, StructType 
schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+    return UTF8String.fromString(getPartitionPath(avroRecord));
+  }
+
+  protected void tryInitRowAccessor(StructType schema) {
+    if (this.rowAccessor == null) {
+      synchronized (this) {
+        if (this.rowAccessor == null) {
+          this.rowAccessor = new SparkRowAccessor(schema);
+        }
+      }
     }
-    return getKey(converterFn.apply(row)).getRecordKey();
   }
 
   /**
-   * Fetch partition path from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which partition path is requested
-   * @return the partition path of interest from {@link Row}.
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler 
to apply certain
+   *       optimizations, like inlining)
    */
-  @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getPartitionPath(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
+  protected final String combinePartitionPath(Object... partitionPathParts) {
+    checkState(partitionPathParts.length == recordKeyFields.size());

Review Comment:
   did you mean partitionPathFields as 2nd arg?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -39,49 +39,74 @@
 import org.apache.log4j.Logger;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 
 /**
  * Create handle with InternalRow for datasource implementation of bulk insert.
  */
 public class HoodieRowCreateHandle implements Serializable {
 
   private static final long serialVersionUID = 1L;
+
   private static final Logger LOG = 
LogManager.getLogger(HoodieRowCreateHandle.class);
-  private static final AtomicLong SEQGEN = new AtomicLong(1);
+  private static final AtomicLong GLOBAL_SEQ_NO = new AtomicLong(1);
+
+  private static final Integer RECORD_KEY_META_FIELD_ORD =
+      
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD);

Review Comment:
   we already have HoodieRecord.RECORD_KEY_META_FIELD_POS. probably we can 
re-use.



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -157,7 +152,29 @@ public List<HoodieInternalWriteStatus> getWriteStatuses() 
throws IOException {
     return writeStatusList;
   }
 
-  public void abort() {
+  public void abort() {}
+
+  public void close() throws IOException {
+    for (HoodieRowCreateHandle rowCreateHandle : handles.values()) {
+      writeStatusList.add(rowCreateHandle.close());
+    }
+    handles.clear();
+    handle = null;
+  }
+
+  private UTF8String extractPartitionPath(InternalRow row) {
+    if (populateMetaFields) {
+      // In case meta-fields are materialized w/in the table itself, we can 
just simply extract
+      // partition path from there
+      //
+      // NOTE: Helper keeps track of [[lastKnownPartitionPath]] as 
[[UTF8String]] to avoid
+      //       conversion from Catalyst internal representation into a 
[[String]]
+      return row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS);
+    } else if (keyGeneratorOpt.isPresent()) {
+      return keyGeneratorOpt.get().getPartitionPath(row, structType);

Review Comment:
   prior to this patch, in case of SimpleKeyGen, we had a special case where we 
directly fetch the field and not go via key generator L131 to L136. guess we 
missed to retain that. only if not for simple key gen, we go via regular key 
gen. 
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java:
##########
@@ -46,16 +47,27 @@
  * field in the partition path, use field1:simple 3. If you want your table to 
be non partitioned, simply leave it as blank.
  *
  * RecordKey is internally generated using either SimpleKeyGenerator or 
ComplexKeyGenerator.
+ *
+ * @deprecated
  */
+@Deprecated

Review Comment:
   do we know why we are deprecating this? whats the alternative suggested? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {
+
+  /**
+   * Fetches (nested) value w/in provided [[Row]] uniquely identified by the 
provided nested-field path
+   * previously composed by [[composeNestedFieldPath]]
+   */
+  def getNestedRowValue(row: Row, nestedFieldPath: NestedFieldPath): Any = {

Review Comment:
   did you have to make any changes or just moved code as is? If you can point 
me to the actual changes, will review only those.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -18,121 +18,334 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.ApiMaturityLevel;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.unsafe.UTF8StringBuilder;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.Function1;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.ThreadSafe;
 
-import scala.Function1;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link 
SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum 
performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should 
override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements 
SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new 
AtomicBoolean(false);
-  protected StructType structType;
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = 
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = 
new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo 
= new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
+    tryInitRowConverter(row.schema());

Review Comment:
   may I know why we have removed the maturity ("EVOLVING") annotation? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -57,187 +92,153 @@ public int numFields() {
   }
 
   @Override
-  public void setNullAt(int i) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = null;
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = null;
-          break;
-        }
-        case 2: {
-          this.recordKey = null;
-          break;
-        }
-        case 3: {
-          this.partitionPath = null;
-          break;
-        }
-        case 4: {
-          this.fileName = null;
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
-      }
+  public void setNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      metaFields[ordinal] = null;
     } else {
-      row.setNullAt(i);
+      row.setNullAt(rebaseOrdinal(ordinal));
     }
   }
 
   @Override
-  public void update(int i, Object value) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = value.toString();
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = value.toString();
-          break;
-        }
-        case 2: {
-          this.recordKey = value.toString();
-          break;
-        }
-        case 3: {
-          this.partitionPath = value.toString();
-          break;
-        }
-        case 4: {
-          this.fileName = value.toString();
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
+  public void update(int ordinal, Object value) {
+    if (ordinal < metaFields.length) {
+      if (value instanceof UTF8String) {
+        metaFields[ordinal] = (UTF8String) value;
+      } else if (value instanceof String) {
+        metaFields[ordinal] = UTF8String.fromString((String) value);
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Could not update the row at (%d) with value of type 
(%s), either UTF8String or String are expected", ordinal, 
value.getClass().getSimpleName()));
       }
     } else {
-      row.update(i, value);
+      row.update(rebaseOrdinal(ordinal), value);
     }
   }
 
-  private String getMetaColumnVal(int ordinal) {
-    switch (ordinal) {
-      case 0: {
-        return commitTime;
-      }
-      case 1: {
-        return commitSeqNumber;
-      }
-      case 2: {
-        return recordKey;
-      }
-      case 3: {
-        return partitionPath;
-      }
-      case 4: {
-        return fileName;
-      }
-      default: throw new IllegalArgumentException("Not expected");
+  @Override
+  public boolean isNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      return metaFields[ordinal] == null;
     }
+    return row.isNullAt(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public boolean isNullAt(int ordinal) {
+  public UTF8String getUTF8String(int ordinal) {
+    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {

Review Comment:
   minor. to be consistent w/ other places, can we do 
   ```
   < metaFields.length
   ```



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java:
##########
@@ -18,18 +18,65 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 /**
- * Spark key generator interface.
+ * Spark-specific {@link KeyGenerator} interface extension allowing 
implementation to
+ * specifically implement record-key, partition-path generation w/o the need 
for (expensive)
+ * conversion from Spark internal representation (for ex, to Avro)
  */
 public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
 
+  /**
+   * Extracts record key from Spark's {@link Row}
+   *
+   * @param row instance of {@link Row} from which record-key is extracted
+   * @return record's (primary) key
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   String getRecordKey(Row row);
 
+  /**
+   * Extracts record key from Spark's {@link InternalRow}
+   *
+   * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link 
InternalRow} could
+   *       internally hold just a binary representation of the data, while 
{@link Row} has it
+   *       deserialized into JVM-native representation (like {@code Integer}, 
{@code Long},
+   *       {@code String}, etc)
+   *
+   * @param row instance of {@link InternalRow} from which record-key is 
extracted
+   * @param schema schema {@link InternalRow} is adhering to
+   * @return record-key as instance of {@link UTF8String}
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  UTF8String getRecordKey(InternalRow row, StructType schema);
+
+  /**
+   * Extracts partition-path from {@link Row}
+   *
+   * @param row instance of {@link Row} from which record-key is extracted

Review Comment:
   minor. "from which partition path" is extracted.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {
+
+  /**
+   * Fetches (nested) value w/in provided [[Row]] uniquely identified by the 
provided nested-field path
+   * previously composed by [[composeNestedFieldPath]]
+   */
+  def getNestedRowValue(row: Row, nestedFieldPath: NestedFieldPath): Any = {
+    var curRow = row
+    for (idx <- nestedFieldPath.parts.indices) {
+      val (ord, f) = nestedFieldPath.parts(idx)
+      if (curRow.isNullAt(ord)) {
+        // scalastyle:off return
+        if (f.nullable) return null

Review Comment:
   I see one change. null place holder is replaced w/ explicit nulls.



##########
hudi-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java:
##########
@@ -52,7 +52,7 @@ protected KeyGenerator(TypedProperties config) {
    * @return list of field names, when concatenated make up the record key.
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public List<String> getRecordKeyFieldNames() {

Review Comment:
   or add deprecated for older ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to