nsivabalan commented on code in PR #5523:
URL: https://github.com/apache/hudi/pull/5523#discussion_r923293257
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -57,187 +92,153 @@ public int numFields() {
}
@Override
- public void setNullAt(int i) {
- if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- switch (i) {
- case 0: {
- this.commitTime = null;
- break;
- }
- case 1: {
- this.commitSeqNumber = null;
- break;
- }
- case 2: {
- this.recordKey = null;
- break;
- }
- case 3: {
- this.partitionPath = null;
- break;
- }
- case 4: {
- this.fileName = null;
- break;
- }
- default: throw new IllegalArgumentException("Not expected");
- }
+ public void setNullAt(int ordinal) {
+ if (ordinal < metaFields.length) {
+ metaFields[ordinal] = null;
} else {
- row.setNullAt(i);
+ row.setNullAt(rebaseOrdinal(ordinal));
}
}
@Override
- public void update(int i, Object value) {
- if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- switch (i) {
- case 0: {
- this.commitTime = value.toString();
- break;
- }
- case 1: {
- this.commitSeqNumber = value.toString();
- break;
- }
- case 2: {
- this.recordKey = value.toString();
- break;
- }
- case 3: {
- this.partitionPath = value.toString();
- break;
- }
- case 4: {
- this.fileName = value.toString();
- break;
- }
- default: throw new IllegalArgumentException("Not expected");
+ public void update(int ordinal, Object value) {
+ if (ordinal < metaFields.length) {
+ if (value instanceof UTF8String) {
+ metaFields[ordinal] = (UTF8String) value;
+ } else if (value instanceof String) {
+ metaFields[ordinal] = UTF8String.fromString((String) value);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Could not update the row at (%d) with value of type
(%s), either UTF8String or String are expected", ordinal,
value.getClass().getSimpleName()));
}
} else {
- row.update(i, value);
+ row.update(rebaseOrdinal(ordinal), value);
}
}
- private String getMetaColumnVal(int ordinal) {
- switch (ordinal) {
- case 0: {
- return commitTime;
- }
- case 1: {
- return commitSeqNumber;
- }
- case 2: {
- return recordKey;
- }
- case 3: {
- return partitionPath;
- }
- case 4: {
- return fileName;
- }
- default: throw new IllegalArgumentException("Not expected");
+ @Override
+ public boolean isNullAt(int ordinal) {
+ if (ordinal < metaFields.length) {
+ return metaFields[ordinal] == null;
}
+ return row.isNullAt(rebaseOrdinal(ordinal));
}
@Override
- public boolean isNullAt(int ordinal) {
+ public UTF8String getUTF8String(int ordinal) {
+ if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+ return metaFields[ordinal];
+ }
+ return row.getUTF8String(rebaseOrdinal(ordinal));
+ }
+
+ @Override
+ public Object get(int ordinal, DataType dataType) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- return null == getMetaColumnVal(ordinal);
+ validateMetaFieldDataType(dataType);
+ return metaFields[ordinal];
}
- return row.isNullAt(ordinal);
+ return row.get(rebaseOrdinal(ordinal), dataType);
}
@Override
public boolean getBoolean(int ordinal) {
- return row.getBoolean(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Boolean.class);
+ return row.getBoolean(rebaseOrdinal(ordinal));
}
@Override
public byte getByte(int ordinal) {
- return row.getByte(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Byte.class);
+ return row.getByte(rebaseOrdinal(ordinal));
}
@Override
public short getShort(int ordinal) {
- return row.getShort(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Short.class);
+ return row.getShort(rebaseOrdinal(ordinal));
}
@Override
public int getInt(int ordinal) {
- return row.getInt(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Integer.class);
+ return row.getInt(rebaseOrdinal(ordinal));
}
@Override
public long getLong(int ordinal) {
- return row.getLong(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Long.class);
+ return row.getLong(rebaseOrdinal(ordinal));
}
@Override
public float getFloat(int ordinal) {
- return row.getFloat(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Float.class);
+ return row.getFloat(rebaseOrdinal(ordinal));
}
@Override
public double getDouble(int ordinal) {
- return row.getDouble(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Double.class);
+ return row.getDouble(rebaseOrdinal(ordinal));
}
@Override
public Decimal getDecimal(int ordinal, int precision, int scale) {
- return row.getDecimal(ordinal, precision, scale);
- }
-
- @Override
- public UTF8String getUTF8String(int ordinal) {
- if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
- }
- return row.getUTF8String(ordinal);
- }
-
- @Override
- public String getString(int ordinal) {
- if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- return new String(getMetaColumnVal(ordinal).getBytes());
- }
- return row.getString(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Decimal.class);
+ return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
}
@Override
public byte[] getBinary(int ordinal) {
- return row.getBinary(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Byte[].class);
+ return row.getBinary(rebaseOrdinal(ordinal));
}
@Override
public CalendarInterval getInterval(int ordinal) {
- return row.getInterval(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
+ return row.getInterval(rebaseOrdinal(ordinal));
}
@Override
public InternalRow getStruct(int ordinal, int numFields) {
- return row.getStruct(ordinal, numFields);
+ ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
+ return row.getStruct(rebaseOrdinal(ordinal), numFields);
}
@Override
public ArrayData getArray(int ordinal) {
- return row.getArray(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
+ return row.getArray(rebaseOrdinal(ordinal));
}
@Override
public MapData getMap(int ordinal) {
- return row.getMap(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, MapData.class);
+ return row.getMap(rebaseOrdinal(ordinal));
}
@Override
- public Object get(int ordinal, DataType dataType) {
- if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
+ public InternalRow copy() {
+ return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length),
row.copy(), containsMetaFields);
+ }
+
+ private int rebaseOrdinal(int ordinal) {
+ // NOTE: In cases when source row does not contain meta fields, we will
have to
+ // rebase ordinal onto its indexes
+ return containsMetaFields ? ordinal : ordinal - metaFields.length;
+ }
+
+ private void validateMetaFieldDataType(DataType dataType) {
+ if (!dataType.sameType(StringType$.MODULE$)) {
+ throw new ClassCastException(String.format("Can not cast meta-field of
type UTF8String to %s", dataType.simpleString()));
}
- return row.get(ordinal, dataType);
}
- @Override
- public InternalRow copy() {
- return new HoodieInternalRow(commitTime, commitSeqNumber, recordKey,
partitionPath, fileName, row.copy());
+ private void ruleOutMetaFieldsAccess(int ordinal, Class<?> expectedDataType)
{
Review Comment:
do we need to have this validation in fast path? i..e for every column that
is not meta field will be going through this for every record. Is there a way
to optimize or reduce the frequency of validations.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -18,121 +18,334 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.ApiMaturityLevel;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.unsafe.UTF8StringBuilder;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.Function1;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.ThreadSafe;
-import scala.Function1;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getRecordKey(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getPartitionPath(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getPartitionPath(InternalRow internalRow, StructType
schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+ return UTF8String.fromString(getPartitionPath(avroRecord));
+ }
+
+ protected void tryInitRowAccessor(StructType schema) {
+ if (this.rowAccessor == null) {
+ synchronized (this) {
+ if (this.rowAccessor == null) {
+ this.rowAccessor = new SparkRowAccessor(schema);
+ }
+ }
}
- return getKey(converterFn.apply(row)).getRecordKey();
}
/**
- * Fetch partition path from {@link Row}.
- *
- * @param row instance of {@link Row} from which partition path is requested
- * @return the partition path of interest from {@link Row}.
+ * NOTE: This method has to stay final (so that it's easier for JIT compiler
to apply certain
+ * optimizations, like inlining)
*/
- @Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getPartitionPath(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
+ protected final String combinePartitionPath(Object... partitionPathParts) {
+ checkState(partitionPathParts.length == recordKeyFields.size());
+ // Avoid creating [[StringBuilder]] in case there's just one
partition-path part,
+ // and Hive-style of partitioning is not required
+ if (!hiveStylePartitioning && partitionPathParts.length == 1) {
+ return partitionPathParts[0] != null
+ ? partitionPathParts[0].toString()
+ : HUDI_DEFAULT_PARTITION_PATH;
}
- return getKey(converterFn.apply(row)).getPartitionPath();
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < partitionPathParts.length; ++i) {
+ String partitionPathPartStr = partitionPathParts[i] != null
+ ? partitionPathParts[i].toString()
Review Comment:
may I know why encoding is honored in next method and not here?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.unsafe;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A helper class to write {@link UTF8String}s to an internal buffer and build
the concatenated
+ * {@link UTF8String} at the end.
+ */
+public class UTF8StringBuilder {
+
+ private static final int ARRAY_MAX =
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
+
+ private byte[] buffer;
+ private int cursor = Platform.BYTE_ARRAY_OFFSET;
+
+ public UTF8StringBuilder() {
Review Comment:
do we have tests for this?
##########
hudi-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java:
##########
@@ -52,7 +52,7 @@ protected KeyGenerator(TypedProperties config) {
* @return list of field names, when concatenated make up the record key.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public List<String> getRecordKeyFieldNames() {
Review Comment:
we should be careful in renaming public methods. can we retain the same
name.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -18,121 +18,334 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.ApiMaturityLevel;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.unsafe.UTF8StringBuilder;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.Function1;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.ThreadSafe;
-import scala.Function1;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getRecordKey(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getPartitionPath(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getPartitionPath(InternalRow internalRow, StructType
schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+ return UTF8String.fromString(getPartitionPath(avroRecord));
+ }
+
+ protected void tryInitRowAccessor(StructType schema) {
+ if (this.rowAccessor == null) {
+ synchronized (this) {
+ if (this.rowAccessor == null) {
+ this.rowAccessor = new SparkRowAccessor(schema);
+ }
+ }
}
- return getKey(converterFn.apply(row)).getRecordKey();
}
/**
- * Fetch partition path from {@link Row}.
- *
- * @param row instance of {@link Row} from which partition path is requested
- * @return the partition path of interest from {@link Row}.
+ * NOTE: This method has to stay final (so that it's easier for JIT compiler
to apply certain
+ * optimizations, like inlining)
*/
- @Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getPartitionPath(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
+ protected final String combinePartitionPath(Object... partitionPathParts) {
+ checkState(partitionPathParts.length == recordKeyFields.size());
+ // Avoid creating [[StringBuilder]] in case there's just one
partition-path part,
+ // and Hive-style of partitioning is not required
+ if (!hiveStylePartitioning && partitionPathParts.length == 1) {
+ return partitionPathParts[0] != null
+ ? partitionPathParts[0].toString()
+ : HUDI_DEFAULT_PARTITION_PATH;
}
- return getKey(converterFn.apply(row)).getPartitionPath();
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < partitionPathParts.length; ++i) {
+ String partitionPathPartStr = partitionPathParts[i] != null
+ ? partitionPathParts[i].toString()
+ : HUDI_DEFAULT_PARTITION_PATH;
+
+ // TODO support url-encoding
+
+ if (hiveStylePartitioning) {
+ sb.append(recordKeyFields.get(i))
Review Comment:
again, is this referring to partitionPathFields.get(i)?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/model/TestHoodieInternalRow.java:
##########
@@ -64,7 +65,13 @@ public void testGet() {
Object[] values = getRandomValue(true);
InternalRow row = new GenericInternalRow(values);
- HoodieInternalRow hoodieInternalRow = new HoodieInternalRow("commitTime",
"commitSeqNo", "recordKey", "partitionPath", "fileName", row);
+ HoodieInternalRow hoodieInternalRow = new
HoodieInternalRow(UTF8String.fromString("commitTime"),
+ UTF8String.fromString("commitSeqNo"),
Review Comment:
can we declare constants for these UTF8strings and use it across all tests?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -39,49 +39,74 @@
import org.apache.log4j.Logger;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
/**
* Create handle with InternalRow for datasource implementation of bulk insert.
*/
public class HoodieRowCreateHandle implements Serializable {
private static final long serialVersionUID = 1L;
+
private static final Logger LOG =
LogManager.getLogger(HoodieRowCreateHandle.class);
- private static final AtomicLong SEQGEN = new AtomicLong(1);
+ private static final AtomicLong GLOBAL_SEQ_NO = new AtomicLong(1);
+
+ private static final Integer RECORD_KEY_META_FIELD_ORD =
+
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD);
Review Comment:
PARTITION_PATH_META_FIELD_POS as well
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -18,121 +18,334 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.ApiMaturityLevel;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.unsafe.UTF8StringBuilder;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.Function1;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.ThreadSafe;
-import scala.Function1;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getRecordKey(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getPartitionPath(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getPartitionPath(InternalRow internalRow, StructType
schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+ return UTF8String.fromString(getPartitionPath(avroRecord));
+ }
+
+ protected void tryInitRowAccessor(StructType schema) {
+ if (this.rowAccessor == null) {
+ synchronized (this) {
+ if (this.rowAccessor == null) {
+ this.rowAccessor = new SparkRowAccessor(schema);
+ }
+ }
}
- return getKey(converterFn.apply(row)).getRecordKey();
}
/**
- * Fetch partition path from {@link Row}.
- *
- * @param row instance of {@link Row} from which partition path is requested
- * @return the partition path of interest from {@link Row}.
+ * NOTE: This method has to stay final (so that it's easier for JIT compiler
to apply certain
+ * optimizations, like inlining)
*/
- @Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getPartitionPath(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
+ protected final String combinePartitionPath(Object... partitionPathParts) {
+ checkState(partitionPathParts.length == recordKeyFields.size());
Review Comment:
did you mean partitionPathFields as 2nd arg?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -39,49 +39,74 @@
import org.apache.log4j.Logger;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
/**
* Create handle with InternalRow for datasource implementation of bulk insert.
*/
public class HoodieRowCreateHandle implements Serializable {
private static final long serialVersionUID = 1L;
+
private static final Logger LOG =
LogManager.getLogger(HoodieRowCreateHandle.class);
- private static final AtomicLong SEQGEN = new AtomicLong(1);
+ private static final AtomicLong GLOBAL_SEQ_NO = new AtomicLong(1);
+
+ private static final Integer RECORD_KEY_META_FIELD_ORD =
+
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD);
Review Comment:
we already have HoodieRecord.RECORD_KEY_META_FIELD_POS. probably we can
re-use.
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -157,7 +152,29 @@ public List<HoodieInternalWriteStatus> getWriteStatuses()
throws IOException {
return writeStatusList;
}
- public void abort() {
+ public void abort() {}
+
+ public void close() throws IOException {
+ for (HoodieRowCreateHandle rowCreateHandle : handles.values()) {
+ writeStatusList.add(rowCreateHandle.close());
+ }
+ handles.clear();
+ handle = null;
+ }
+
+ private UTF8String extractPartitionPath(InternalRow row) {
+ if (populateMetaFields) {
+ // In case meta-fields are materialized w/in the table itself, we can
just simply extract
+ // partition path from there
+ //
+ // NOTE: Helper keeps track of [[lastKnownPartitionPath]] as
[[UTF8String]] to avoid
+ // conversion from Catalyst internal representation into a
[[String]]
+ return row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS);
+ } else if (keyGeneratorOpt.isPresent()) {
+ return keyGeneratorOpt.get().getPartitionPath(row, structType);
Review Comment:
prior to this patch, in case of SimpleKeyGen, we had a special case where we
directly fetch the field and not go via key generator L131 to L136. guess we
missed to retain that. only if not for simple key gen, we go via regular key
gen.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java:
##########
@@ -46,16 +47,27 @@
* field in the partition path, use field1:simple 3. If you want your table to
be non partitioned, simply leave it as blank.
*
* RecordKey is internally generated using either SimpleKeyGenerator or
ComplexKeyGenerator.
+ *
+ * @deprecated
*/
+@Deprecated
Review Comment:
do we know why we are deprecating this? whats the alternative suggested?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {
+
+ /**
+ * Fetches (nested) value w/in provided [[Row]] uniquely identified by the
provided nested-field path
+ * previously composed by [[composeNestedFieldPath]]
+ */
+ def getNestedRowValue(row: Row, nestedFieldPath: NestedFieldPath): Any = {
Review Comment:
did you have to make any changes or just moved code as is? If you can point
me to the actual changes, will review only those.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -18,121 +18,334 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.ApiMaturityLevel;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.unsafe.UTF8StringBuilder;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.Function1;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.ThreadSafe;
-import scala.Function1;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
+ tryInitRowConverter(row.schema());
Review Comment:
may I know why we have removed the maturity ("EVOLVING") annotation?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -57,187 +92,153 @@ public int numFields() {
}
@Override
- public void setNullAt(int i) {
- if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- switch (i) {
- case 0: {
- this.commitTime = null;
- break;
- }
- case 1: {
- this.commitSeqNumber = null;
- break;
- }
- case 2: {
- this.recordKey = null;
- break;
- }
- case 3: {
- this.partitionPath = null;
- break;
- }
- case 4: {
- this.fileName = null;
- break;
- }
- default: throw new IllegalArgumentException("Not expected");
- }
+ public void setNullAt(int ordinal) {
+ if (ordinal < metaFields.length) {
+ metaFields[ordinal] = null;
} else {
- row.setNullAt(i);
+ row.setNullAt(rebaseOrdinal(ordinal));
}
}
@Override
- public void update(int i, Object value) {
- if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- switch (i) {
- case 0: {
- this.commitTime = value.toString();
- break;
- }
- case 1: {
- this.commitSeqNumber = value.toString();
- break;
- }
- case 2: {
- this.recordKey = value.toString();
- break;
- }
- case 3: {
- this.partitionPath = value.toString();
- break;
- }
- case 4: {
- this.fileName = value.toString();
- break;
- }
- default: throw new IllegalArgumentException("Not expected");
+ public void update(int ordinal, Object value) {
+ if (ordinal < metaFields.length) {
+ if (value instanceof UTF8String) {
+ metaFields[ordinal] = (UTF8String) value;
+ } else if (value instanceof String) {
+ metaFields[ordinal] = UTF8String.fromString((String) value);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Could not update the row at (%d) with value of type
(%s), either UTF8String or String are expected", ordinal,
value.getClass().getSimpleName()));
}
} else {
- row.update(i, value);
+ row.update(rebaseOrdinal(ordinal), value);
}
}
- private String getMetaColumnVal(int ordinal) {
- switch (ordinal) {
- case 0: {
- return commitTime;
- }
- case 1: {
- return commitSeqNumber;
- }
- case 2: {
- return recordKey;
- }
- case 3: {
- return partitionPath;
- }
- case 4: {
- return fileName;
- }
- default: throw new IllegalArgumentException("Not expected");
+ @Override
+ public boolean isNullAt(int ordinal) {
+ if (ordinal < metaFields.length) {
+ return metaFields[ordinal] == null;
}
+ return row.isNullAt(rebaseOrdinal(ordinal));
}
@Override
- public boolean isNullAt(int ordinal) {
+ public UTF8String getUTF8String(int ordinal) {
+ if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
Review Comment:
minor. to be consistent w/ other places, can we do
```
< metaFields.length
```
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java:
##########
@@ -18,18 +18,65 @@
package org.apache.hudi.keygen;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
/**
- * Spark key generator interface.
+ * Spark-specific {@link KeyGenerator} interface extension allowing
implementation to
+ * specifically implement record-key, partition-path generation w/o the need
for (expensive)
+ * conversion from Spark internal representation (for ex, to Avro)
*/
public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
+ /**
+ * Extracts record key from Spark's {@link Row}
+ *
+ * @param row instance of {@link Row} from which record-key is extracted
+ * @return record's (primary) key
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
String getRecordKey(Row row);
+ /**
+ * Extracts record key from Spark's {@link InternalRow}
+ *
+ * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link
InternalRow} could
+ * internally hold just a binary representation of the data, while
{@link Row} has it
+ * deserialized into JVM-native representation (like {@code Integer},
{@code Long},
+ * {@code String}, etc)
+ *
+ * @param row instance of {@link InternalRow} from which record-key is
extracted
+ * @param schema schema {@link InternalRow} is adhering to
+ * @return record-key as instance of {@link UTF8String}
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ UTF8String getRecordKey(InternalRow row, StructType schema);
+
+ /**
+ * Extracts partition-path from {@link Row}
+ *
+ * @param row instance of {@link Row} from which record-key is extracted
Review Comment:
minor. "from which partition path" is extracted.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {
+
+ /**
+ * Fetches (nested) value w/in provided [[Row]] uniquely identified by the
provided nested-field path
+ * previously composed by [[composeNestedFieldPath]]
+ */
+ def getNestedRowValue(row: Row, nestedFieldPath: NestedFieldPath): Any = {
+ var curRow = row
+ for (idx <- nestedFieldPath.parts.indices) {
+ val (ord, f) = nestedFieldPath.parts(idx)
+ if (curRow.isNullAt(ord)) {
+ // scalastyle:off return
+ if (f.nullable) return null
Review Comment:
I see one change. null place holder is replaced w/ explicit nulls.
##########
hudi-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java:
##########
@@ -52,7 +52,7 @@ protected KeyGenerator(TypedProperties config) {
* @return list of field names, when concatenated make up the record key.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public List<String> getRecordKeyFieldNames() {
Review Comment:
or add deprecated for older ones.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]