alexeykudinkin commented on code in PR #5523:
URL: https://github.com/apache/hudi/pull/5523#discussion_r927207409
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -18,121 +18,334 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.ApiMaturityLevel;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.unsafe.UTF8StringBuilder;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.Function1;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.ThreadSafe;
-import scala.Function1;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
+ tryInitRowConverter(row.schema());
Review Comment:
It's already annotated in the interface
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
import scala.Function1;
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ private static final Logger LOG =
LogManager.getLogger(BuiltinKeyGenerator.class);
+
+ private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+ protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+ protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- // TODO avoid conversion to avro
- // since converterFn is transient this will be repeatedly initialized
over and over again
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
- }
- return getKey(converterFn.apply(row)).getRecordKey();
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getRecordKey(rowConverter.convertToAvro(row));
}
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getRecordKey(InternalRow internalRow, StructType schema) {
- try {
- // TODO fix
- buildFieldSchemaInfoIfNeeded(schema);
- return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow,
getRecordKeyFields(), recordKeySchemaInfo, false);
- } catch (Exception e) {
- throw new HoodieException("Conversion of InternalRow to Row failed with
exception", e);
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getPartitionPath(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getPartitionPath(InternalRow internalRow, StructType
schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+ return UTF8String.fromString(getPartitionPath(avroRecord));
+ }
+
+ protected void tryInitRowAccessor(StructType schema) {
+ if (this.rowAccessor == null) {
+ synchronized (this) {
+ if (this.rowAccessor == null) {
+ this.rowAccessor = new SparkRowAccessor(schema);
+ }
+ }
}
}
+
/**
- * Fetch partition path from {@link Row}.
- *
- * @param row instance of {@link Row} from which partition path is requested
- * @return the partition path of interest from {@link Row}.
+ * NOTE: This method has to stay final (so that it's easier for JIT compiler
to apply certain
+ * optimizations, like inlining)
*/
+ protected final String combinePartitionPath(Object... partitionPathParts) {
Review Comment:
They actually access KG fields (partitionPathFields, hiveStylePartitioning)
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java:
##########
@@ -42,32 +41,34 @@ public class HoodieInternalRowFileWriterFactory {
* Factory method to assist in instantiating an instance of {@link
HoodieInternalRowFileWriter}.
* @param path path of the RowFileWriter.
* @param hoodieTable instance of {@link HoodieTable} in use.
- * @param config instance of {@link HoodieWriteConfig} to use.
+ * @param writeConfig instance of {@link HoodieWriteConfig} to use.
* @param schema schema of the dataset in use.
* @return the instantiated {@link HoodieInternalRowFileWriter}.
* @throws IOException if format is not supported or if any exception during
instantiating the RowFileWriter.
*
*/
- public static HoodieInternalRowFileWriter getInternalRowFileWriter(
- Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType
schema)
+ public static HoodieInternalRowFileWriter getInternalRowFileWriter(Path path,
+
HoodieTable hoodieTable,
+
HoodieWriteConfig writeConfig,
+
StructType schema)
throws IOException {
final String extension = FSUtils.getFileExtension(path.getName());
if (PARQUET.getFileExtension().equals(extension)) {
- return newParquetInternalRowFileWriter(path, config, schema,
hoodieTable);
+ return newParquetInternalRowFileWriter(path, hoodieTable, writeConfig,
schema, tryInstantiateBloomFilter(writeConfig));
}
throw new UnsupportedOperationException(extension + " format not supported
yet.");
}
- private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(
- Path path, HoodieWriteConfig writeConfig, StructType structType,
HoodieTable table)
+ private static HoodieInternalRowFileWriter
newParquetInternalRowFileWriter(Path path,
+
HoodieTable table,
+
HoodieWriteConfig writeConfig,
+
StructType structType,
+
Option<BloomFilter> bloomFilterOpt
+ )
throws IOException {
- BloomFilter filter = BloomFilterFactory.createBloomFilter(
- writeConfig.getBloomFilterNumEntries(),
- writeConfig.getBloomFilterFPP(),
- writeConfig.getDynamicBloomFilterMaxNumEntries(),
- writeConfig.getBloomFilterType());
Review Comment:
Extracting to a method makes things cleaner IMO
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java:
##########
@@ -18,20 +18,65 @@
package org.apache.hudi.keygen;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
/**
- * Spark key generator interface.
+ * Spark-specific {@link KeyGenerator} interface extension allowing
implementation to
+ * specifically implement record-key, partition-path generation w/o the need
for (expensive)
+ * conversion from Spark internal representation (for ex, to Avro)
*/
public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
+ /**
+ * Extracts record key from Spark's {@link Row}
+ *
+ * @param row instance of {@link Row} from which record-key is extracted
+ * @return record's (primary) key
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
String getRecordKey(Row row);
- String getRecordKey(InternalRow row, StructType schema);
+ /**
+ * Extracts record key from Spark's {@link InternalRow}
+ *
+ * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link
InternalRow} could
+ * internally hold just a binary representation of the data, while
{@link Row} has it
+ * deserialized into JVM-native representation (like {@code Integer},
{@code Long},
+ * {@code String}, etc)
+ *
+ * @param row instance of {@link InternalRow} from which record-key is
extracted
+ * @param schema schema {@link InternalRow} is adhering to
+ * @return record-key as instance of {@link UTF8String}
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ UTF8String getRecordKey(InternalRow row, StructType schema);
Review Comment:
Created HUDI-4446. Do we annotate them somehow?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
import scala.Function1;
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ private static final Logger LOG =
LogManager.getLogger(BuiltinKeyGenerator.class);
+
+ private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+ protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+ protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- // TODO avoid conversion to avro
- // since converterFn is transient this will be repeatedly initialized
over and over again
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
- }
- return getKey(converterFn.apply(row)).getRecordKey();
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getRecordKey(rowConverter.convertToAvro(row));
}
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getRecordKey(InternalRow internalRow, StructType schema) {
- try {
- // TODO fix
- buildFieldSchemaInfoIfNeeded(schema);
- return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow,
getRecordKeyFields(), recordKeySchemaInfo, false);
- } catch (Exception e) {
- throw new HoodieException("Conversion of InternalRow to Row failed with
exception", e);
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getPartitionPath(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getPartitionPath(InternalRow internalRow, StructType
schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+ return UTF8String.fromString(getPartitionPath(avroRecord));
+ }
+
+ protected void tryInitRowAccessor(StructType schema) {
+ if (this.rowAccessor == null) {
+ synchronized (this) {
Review Comment:
It won't: if 2 threads will try to assign, they will mutex on a lock and
only 1 will enter 2d condition
. It's a classic [double-checked
locking](https://en.wikipedia.org/wiki/Double-checked_locking) pattern -- we're
avoiding taking locks in the hot-path by taking them only in case the field has
not been init'd yet (which will happen at most twice.
##########
hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java:
##########
@@ -174,7 +174,9 @@ public static Configuration
getDefaultHadoopConf(KafkaConnectConfigs connectConf
* @return Returns the record key columns separated by comma.
*/
public static String getRecordKeyColumns(KeyGenerator keyGenerator) {
- return String.join(",", keyGenerator.getRecordKeyFieldNames());
+ return keyGenerator.getRecordKeyFieldNames().stream()
+ .map(HoodieAvroUtils::getRootLevelFieldName)
Review Comment:
Accidental blanket-refactoring. Reverted
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]