vinothchandar commented on code in PR #5523:
URL: https://github.com/apache/hudi/pull/5523#discussion_r928805168
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -137,47 +135,59 @@ public HoodieRowCreateHandle(HoodieTable table,
* @throws IOException
*/
public void write(InternalRow row) throws IOException {
+ if (populateMetaFields) {
+ writeRow(row);
+ } else {
+ writeRowNoMetaFields(row);
+ }
+ }
+
+ private void writeRow(InternalRow row) {
try {
// NOTE: PLEASE READ THIS CAREFULLY BEFORE MODIFYING
// This code lays in the hot-path, and substantial caution should
be
// exercised making changes to it to minimize amount of excessive:
- // - Conversions b/w Spark internal (low-level) types and JVM
native ones (like
- // [[UTF8String]] and [[String]])
+ // - Conversions b/w Spark internal types and JVM native ones
(like [[UTF8String]]
+ // and [[String]])
// - Repeated computations (for ex, converting file-path to
[[UTF8String]] over and
// over again)
- UTF8String recordKey = row.getUTF8String(RECORD_KEY_META_FIELD_ORD);
-
- InternalRow updatedRow;
- // In cases when no meta-fields need to be added we simply relay
provided row to
- // the writer as is
- if (!populateMetaFields) {
- updatedRow = row;
- } else {
- UTF8String partitionPath =
row.getUTF8String(PARTITION_PATH_META_FIELD_ORD);
- // This is the only meta-field that is generated dynamically, hence
conversion b/w
- // [[String]] and [[UTF8String]] is unavoidable
- UTF8String seqId =
UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
-
- updatedRow = new HoodieInternalRow(commitTime, seqId, recordKey,
- partitionPath, fileName, row, true);
- }
+ UTF8String recordKey =
row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD);
+ UTF8String partitionPath =
row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
+ // This is the only meta-field that is generated dynamically, hence
conversion b/w
+ // [[String]] and [[UTF8String]] is unavoidable
+ UTF8String seqId =
UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
+
+ InternalRow updatedRow = new HoodieInternalRow(commitTime, seqId,
recordKey,
+ partitionPath, fileName, row, true);
try {
fileWriter.writeRow(recordKey, updatedRow);
// NOTE: To avoid conversion on the hot-path we only convert
[[UTF8String]] into [[String]]
// in cases when successful records' writes are being tracked
writeStatus.markSuccess(writeStatus.isTrackingSuccessfulWrites() ?
recordKey.toString() : null);
- } catch (Throwable t) {
+ } catch (Exception t) {
Review Comment:
why do we reduce this to Exception? Throwable can catch any unexpected
runtime errors as well, right
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -137,47 +135,59 @@ public HoodieRowCreateHandle(HoodieTable table,
* @throws IOException
*/
public void write(InternalRow row) throws IOException {
+ if (populateMetaFields) {
+ writeRow(row);
+ } else {
+ writeRowNoMetaFields(row);
Review Comment:
nit: rename to `writeRowWithoutMetaFields`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
import scala.Function1;
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ private static final Logger LOG =
LogManager.getLogger(BuiltinKeyGenerator.class);
+
+ private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+ protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+ protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- // TODO avoid conversion to avro
- // since converterFn is transient this will be repeatedly initialized
over and over again
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
- }
- return getKey(converterFn.apply(row)).getRecordKey();
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getRecordKey(rowConverter.convertToAvro(row));
}
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getRecordKey(InternalRow internalRow, StructType schema) {
- try {
- // TODO fix
- buildFieldSchemaInfoIfNeeded(schema);
- return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow,
getRecordKeyFields(), recordKeySchemaInfo, false);
- } catch (Exception e) {
- throw new HoodieException("Conversion of InternalRow to Row failed with
exception", e);
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
Review Comment:
is this a new API change compared to 0.11.1?. if not and the return type is
a breaking change, should be called out loud and clear in release notes.
@codope
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -57,9 +57,9 @@
IntStream.range(0, HOODIE_META_COLUMNS.size()).mapToObj(idx ->
Pair.of(HOODIE_META_COLUMNS.get(idx), idx))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
- public static int RECORD_KEY_META_FIELD_POS =
HOODIE_META_COLUMNS_NAME_TO_POS.get(RECORD_KEY_METADATA_FIELD);
- public static int PARTITION_PATH_META_FIELD_POS =
HOODIE_META_COLUMNS_NAME_TO_POS.get(PARTITION_PATH_METADATA_FIELD);
- public static int FILENAME_META_FIELD_POS =
HOODIE_META_COLUMNS_NAME_TO_POS.get(FILENAME_METADATA_FIELD);
+ public static int RECORD_KEY_META_FIELD_ORD =
HOODIE_META_COLUMNS_NAME_TO_POS.get(RECORD_KEY_METADATA_FIELD);
Review Comment:
-1 on these renames as well.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
import scala.Function1;
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ private static final Logger LOG =
LogManager.getLogger(BuiltinKeyGenerator.class);
+
+ private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+ protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+ protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- // TODO avoid conversion to avro
- // since converterFn is transient this will be repeatedly initialized
over and over again
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
- }
- return getKey(converterFn.apply(row)).getRecordKey();
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getRecordKey(rowConverter.convertToAvro(row));
}
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getRecordKey(InternalRow internalRow, StructType schema) {
- try {
- // TODO fix
- buildFieldSchemaInfoIfNeeded(schema);
- return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow,
getRecordKeyFields(), recordKeySchemaInfo, false);
- } catch (Exception e) {
- throw new HoodieException("Conversion of InternalRow to Row failed with
exception", e);
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getPartitionPath(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getPartitionPath(InternalRow internalRow, StructType
schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+ return UTF8String.fromString(getPartitionPath(avroRecord));
+ }
+
+ protected void tryInitRowAccessor(StructType schema) {
+ if (this.rowAccessor == null) {
+ synchronized (this) {
Review Comment:
Does any code path have concurrent access to key generator instance
creation, which I assume calls tryInitRowAccessor()?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
import scala.Function1;
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ private static final Logger LOG =
LogManager.getLogger(BuiltinKeyGenerator.class);
+
+ private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+ protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+ protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- // TODO avoid conversion to avro
- // since converterFn is transient this will be repeatedly initialized
over and over again
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
- }
- return getKey(converterFn.apply(row)).getRecordKey();
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getRecordKey(rowConverter.convertToAvro(row));
}
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getRecordKey(InternalRow internalRow, StructType schema) {
- try {
- // TODO fix
- buildFieldSchemaInfoIfNeeded(schema);
- return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow,
getRecordKeyFields(), recordKeySchemaInfo, false);
- } catch (Exception e) {
- throw new HoodieException("Conversion of InternalRow to Row failed with
exception", e);
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getPartitionPath(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getPartitionPath(InternalRow internalRow, StructType
schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+ return UTF8String.fromString(getPartitionPath(avroRecord));
+ }
+
+ protected void tryInitRowAccessor(StructType schema) {
+ if (this.rowAccessor == null) {
+ synchronized (this) {
Review Comment:
+1 on the pattern. We do this also in other places in hudi-common
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -157,7 +152,29 @@ public List<HoodieInternalWriteStatus> getWriteStatuses()
throws IOException {
return writeStatusList;
}
- public void abort() {
+ public void abort() {}
+
+ public void close() throws IOException {
+ for (HoodieRowCreateHandle rowCreateHandle : handles.values()) {
+ writeStatusList.add(rowCreateHandle.close());
+ }
+ handles.clear();
+ handle = null;
+ }
+
+ private UTF8String extractPartitionPath(InternalRow row) {
+ if (populateMetaFields) {
+ // In case meta-fields are materialized w/in the table itself, we can
just simply extract
+ // partition path from there
+ //
+ // NOTE: Helper keeps track of [[lastKnownPartitionPath]] as
[[UTF8String]] to avoid
+ // conversion from Catalyst internal representation into a
[[String]]
+ return row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS);
+ } else if (keyGeneratorOpt.isPresent()) {
+ return keyGeneratorOpt.get().getPartitionPath(row, structType);
Review Comment:
+1 on the SoT. the if clauses across the code base are hard to maintain.
IIRC that was done to avoid UDF overhead in the first place? which may not be
needed anymore?
That said, can you both confirm there are no perf side-effects to doing it
either way?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java:
##########
@@ -46,16 +47,27 @@
* field in the partition path, use field1:simple 3. If you want your table to
be non partitioned, simply leave it as blank.
*
* RecordKey is internally generated using either SimpleKeyGenerator or
ComplexKeyGenerator.
+ *
+ * @deprecated
*/
+@Deprecated
Review Comment:
Is this addressed?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.unsafe;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A helper class to write {@link UTF8String}s to an internal buffer and build
the concatenated
+ * {@link UTF8String} at the end.
+ */
+public class UTF8StringBuilder {
+
+ private static final int ARRAY_MAX =
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
+
+ private byte[] buffer;
+ private int cursor = Platform.BYTE_ARRAY_OFFSET;
+
+ public UTF8StringBuilder() {
Review Comment:
lets add UTs to help maintain this longer term. and if code is re-used from
another project, we need to add a reference to the NOTICE. You ll see other
examples there.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
import scala.Function1;
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ private static final Logger LOG =
LogManager.getLogger(BuiltinKeyGenerator.class);
+
+ private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+ protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+ protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- // TODO avoid conversion to avro
- // since converterFn is transient this will be repeatedly initialized
over and over again
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
- }
- return getKey(converterFn.apply(row)).getRecordKey();
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getRecordKey(rowConverter.convertToAvro(row));
}
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getRecordKey(InternalRow internalRow, StructType schema) {
- try {
- // TODO fix
- buildFieldSchemaInfoIfNeeded(schema);
- return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow,
getRecordKeyFields(), recordKeySchemaInfo, false);
- } catch (Exception e) {
- throw new HoodieException("Conversion of InternalRow to Row failed with
exception", e);
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getPartitionPath(rowConverter.convertToAvro(row));
+ }
+
+ @Override
+ public UTF8String getPartitionPath(InternalRow internalRow, StructType
schema) {
+ tryInitRowConverter(schema);
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+ return UTF8String.fromString(getPartitionPath(avroRecord));
+ }
+
+ protected void tryInitRowAccessor(StructType schema) {
+ if (this.rowAccessor == null) {
+ synchronized (this) {
+ if (this.rowAccessor == null) {
+ this.rowAccessor = new SparkRowAccessor(schema);
+ }
+ }
}
}
+
/**
- * Fetch partition path from {@link Row}.
- *
- * @param row instance of {@link Row} from which partition path is requested
- * @return the partition path of interest from {@link Row}.
+ * NOTE: This method has to stay final (so that it's easier for JIT compiler
to apply certain
+ * optimizations, like inlining)
*/
+ protected final String combinePartitionPath(Object... partitionPathParts) {
+ return combinePartitionPathInternal(
+ JavaStringBuilder::new,
+ BuiltinKeyGenerator::toString,
+ this::tryEncodePartitionPath,
+ BuiltinKeyGenerator::handleNullOrEmptyPartitionPathPart,
+ partitionPathParts
+ );
+ }
- @Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getPartitionPath(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
+ /**
+ * NOTE: This method has to stay final (so that it's easier for JIT compiler
to apply certain
+ * optimizations, like inlining)
+ */
+ protected final UTF8String combinePartitionPathUnsafe(Object...
partitionPathParts) {
+ return combinePartitionPathInternal(
+ UTF8StringBuilder::new,
+ BuiltinKeyGenerator::toUTF8String,
+ this::tryEncodePartitionPathUTF8,
+ BuiltinKeyGenerator::handleNullOrEmptyPartitionPathPartUTF8,
+ partitionPathParts
+ );
+ }
+
+ /**
+ * NOTE: This method has to stay final (so that it's easier for JIT compiler
to apply certain
+ * optimizations, like inlining)
+ */
+ protected final String combineRecordKey(Object... recordKeyParts) {
+ return combineRecordKeyInternal(
+ JavaStringBuilder::new,
+ BuiltinKeyGenerator::toString,
+ BuiltinKeyGenerator::handleNullRecordKey,
+ recordKeyParts
+ );
+ }
+
+ /**
+ * NOTE: This method has to stay final (so that it's easier for JIT compiler
to apply certain
+ * optimizations, like inlining)
+ */
+ protected final UTF8String combineRecordKeyUnsafe(Object... recordKeyParts) {
+ return combineRecordKeyInternal(
+ UTF8StringBuilder::new,
+ BuiltinKeyGenerator::toUTF8String,
+ BuiltinKeyGenerator::handleNullRecordKey,
+ recordKeyParts
+ );
+ }
+
+ /**
+ * NOTE: This method has to stay final (so that it's easier for JIT compiler
to apply certain
+ * optimizations, like inlining)
+ */
+ protected final String combineCompositeRecordKey(Object... recordKeyParts) {
+ return combineCompositeRecordKeyInternal(
+ JavaStringBuilder::new,
+ BuiltinKeyGenerator::toString,
+ BuiltinKeyGenerator::handleNullOrEmptyCompositeKeyPart,
+ BuiltinKeyGenerator::isNullOrEmptyCompositeKeyPart,
+ recordKeyParts
+ );
+ }
+
+ /**
+ * NOTE: This method has to stay final (so that it's easier for JIT compiler
to apply certain
+ * optimizations, like inlining)
+ */
+ protected final UTF8String combineCompositeRecordKeyUnsafe(Object...
recordKeyParts) {
+ return combineCompositeRecordKeyInternal(
+ UTF8StringBuilder::new,
+ BuiltinKeyGenerator::toUTF8String,
+ BuiltinKeyGenerator::handleNullOrEmptyCompositeKeyPartUTF8,
+ BuiltinKeyGenerator::isNullOrEmptyCompositeKeyPartUTF8,
+ recordKeyParts
+ );
+ }
+
+ private <S> S combineRecordKeyInternal(
+ Supplier<StringBuilder<S>> builderFactory,
+ Function<Object, S> converter,
+ Function<S, S> emptyKeyPartHandler,
+ Object... recordKeyParts
+ ) {
+ if (recordKeyParts.length == 1) {
+ return emptyKeyPartHandler.apply(converter.apply(recordKeyParts[0]));
}
- return getKey(converterFn.apply(row)).getPartitionPath();
+
+ StringBuilder<S> sb = builderFactory.get();
+ for (int i = 0; i < recordKeyParts.length; ++i) {
+ // NOTE: If record-key part has already been a string [[toString]] will
be a no-op
+ sb.append(emptyKeyPartHandler.apply(converter.apply(recordKeyParts[i])));
+
+ if (i < recordKeyParts.length - 1) {
+ sb.appendJava(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
+ }
+ }
+
+ return sb.build();
+ }
+
+ private <S> S combineCompositeRecordKeyInternal(
+ Supplier<StringBuilder<S>> builderFactory,
+ Function<Object, S> converter,
+ Function<S, S> emptyKeyPartHandler,
+ Predicate<S> isNullOrEmptyKeyPartPredicate,
+ Object... recordKeyParts
+ ) {
+ boolean hasNonNullNonEmptyPart = false;
+
+ StringBuilder<S> sb = builderFactory.get();
+ for (int i = 0; i < recordKeyParts.length; ++i) {
+ // NOTE: If record-key part has already been a string [[toString]] will
be a no-op
+ S convertedKeyPart =
emptyKeyPartHandler.apply(converter.apply(recordKeyParts[i]));
+
+ sb.appendJava(recordKeyFields.get(i));
+ sb.appendJava(COMPOSITE_KEY_FIELD_VALUE_INFIX);
+ sb.append(convertedKeyPart);
+ // This check is to validate that overall composite-key has at least one
non-null, non-empty
+ // segment
+ hasNonNullNonEmptyPart |=
!isNullOrEmptyKeyPartPredicate.test(convertedKeyPart);
+
+ if (i < recordKeyParts.length - 1) {
+ sb.appendJava(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
+ }
+ }
+
+ if (hasNonNullNonEmptyPart) {
+ return sb.build();
+ } else {
+ throw new HoodieKeyException(String.format("All of the values for (%s)
were either null or empty", recordKeyFields));
+ }
+ }
+
+ private <S> S combinePartitionPathInternal(Supplier<StringBuilder<S>>
builderFactory,
+ Function<Object, S> converter,
+ Function<S, S> encoder,
+ Function<S, S> emptyHandler,
+ Object... partitionPathParts) {
+ checkState(partitionPathParts.length == partitionPathFields.size());
+ // Avoid creating [[StringBuilder]] in case there's just one
partition-path part,
+ // and Hive-style of partitioning is not required
+ if (!hiveStylePartitioning && partitionPathParts.length == 1) {
+ return emptyHandler.apply(converter.apply(partitionPathParts[0]));
+ }
+
+ StringBuilder<S> sb = builderFactory.get();
+ for (int i = 0; i < partitionPathParts.length; ++i) {
+ S partitionPathPartStr =
encoder.apply(emptyHandler.apply(converter.apply(partitionPathParts[i])));
+
+ if (hiveStylePartitioning) {
+ sb.appendJava(partitionPathFields.get(i))
+ .appendJava("=")
+ .append(partitionPathPartStr);
+ } else {
+ sb.append(partitionPathPartStr);
+ }
+
+ if (i < partitionPathParts.length - 1) {
+ sb.appendJava(DEFAULT_PARTITION_PATH_SEPARATOR);
+ }
+ }
+
+ return sb.build();
+ }
+
+ private String tryEncodePartitionPath(String partitionPathPart) {
+ return encodePartitionPath ?
PartitionPathEncodeUtils.escapePathName(partitionPathPart) : partitionPathPart;
+ }
+
+ private UTF8String tryEncodePartitionPathUTF8(UTF8String partitionPathPart) {
+ // NOTE: This method avoids [[UTF8String]] to [[String]] conversion (and
back) unless
+ // partition-path encoding is enabled
+ return encodePartitionPath ?
UTF8String.fromString(PartitionPathEncodeUtils.escapePathName(partitionPathPart.toString()))
: partitionPathPart;
+ }
+
+ private void tryInitRowConverter(StructType structType) {
+ if (rowConverter == null) {
+ synchronized (this) {
+ if (rowConverter == null) {
+ rowConverter = new SparkRowConverter(structType);
+ }
+ }
+ }
+ }
+
+ protected static String requireNonNullNonEmptyKey(String key) {
+ if (key != null && key.length() > 0) {
+ return key;
+ } else {
+ throw new HoodieKeyException("Record key has to be non-empty string!");
+ }
+ }
+
+ protected static UTF8String requireNonNullNonEmptyKey(UTF8String key) {
+ if (key != null && key.numChars() > 0) {
+ return key;
+ } else {
+ throw new HoodieKeyException("Record key has to be non-empty string!");
+ }
+ }
+
+ protected static <S> S handleNullRecordKey(S s) {
+ if (s == null || s.toString().isEmpty()) {
+ throw new HoodieKeyException("Record key has to be non-null!");
+ }
+
+ return s;
+ }
+
+ private static UTF8String toUTF8String(Object o) {
+ if (o == null) {
+ return null;
+ } else if (o instanceof UTF8String) {
+ return (UTF8String) o;
+ } else {
+ // NOTE: If object is a [[String]], [[toString]] would be a no-op
+ return UTF8String.fromString(o.toString());
+ }
+ }
+
+ private static String toString(Object o) {
+ return o == null ? null : o.toString();
+ }
+
+ private static String handleNullOrEmptyCompositeKeyPart(Object keyPart) {
+ if (keyPart == null) {
+ return NULL_RECORDKEY_PLACEHOLDER;
+ } else {
+ // NOTE: [[toString]] is a no-op if key-part was already a [[String]]
+ String keyPartStr = keyPart.toString();
+ return !keyPartStr.isEmpty() ? keyPartStr : EMPTY_RECORDKEY_PLACEHOLDER;
+ }
+ }
+
+ private static UTF8String handleNullOrEmptyCompositeKeyPartUTF8(UTF8String
keyPart) {
+ if (keyPart == null) {
+ return NULL_RECORD_KEY_PLACEHOLDER_UTF8;
+ } else if (keyPart.numChars() == 0) {
+ return EMPTY_RECORD_KEY_PLACEHOLDER_UTF8;
+ }
+
+ return keyPart;
+ }
+
+ @SuppressWarnings("StringEquality")
+ private static boolean isNullOrEmptyCompositeKeyPart(String keyPart) {
+ // NOTE: Converted key-part is compared against null/empty stub using
ref-equality
+ // for performance reasons (it relies on the fact that we're using
internalized
+ // constants)
+ return keyPart == NULL_RECORDKEY_PLACEHOLDER || keyPart ==
EMPTY_RECORDKEY_PLACEHOLDER;
+ }
+
+ private static boolean isNullOrEmptyCompositeKeyPartUTF8(UTF8String keyPart)
{
+ // NOTE: Converted key-part is compared against null/empty stub using
ref-equality
+ // for performance reasons (it relies on the fact that we're using
internalized
+ // constants)
+ return keyPart == NULL_RECORD_KEY_PLACEHOLDER_UTF8 || keyPart ==
EMPTY_RECORD_KEY_PLACEHOLDER_UTF8;
+ }
+
+ private static String handleNullOrEmptyPartitionPathPart(Object
partitionPathPart) {
+ if (partitionPathPart == null) {
+ return HUDI_DEFAULT_PARTITION_PATH;
+ } else {
+ // NOTE: [[toString]] is a no-op if key-part was already a [[String]]
+ String keyPartStr = partitionPathPart.toString();
+ return keyPartStr.isEmpty() ? HUDI_DEFAULT_PARTITION_PATH : keyPartStr;
+ }
+ }
+
+ private static UTF8String handleNullOrEmptyPartitionPathPartUTF8(UTF8String
keyPart) {
+ if (keyPart == null || keyPart.numChars() == 0) {
+ return HUDI_DEFAULT_PARTITION_PATH_UTF8;
+ }
+
+ return keyPart;
}
/**
- * Fetch partition path from {@link InternalRow}.
+ * Converts provided (raw) value extracted from the {@link InternalRow}
object into a deserialized,
+ * JVM native format (for ex, converting {@code Long} into {@link Instant},
+ * {@code Integer} to {@link LocalDate}, etc)
+ *
+ * This method allows to avoid costly full-row deserialization sequence.
Note, that this method
+ * should be maintained in sync w/
+ *
+ * <ol>
+ * <li>{@code RowEncoder#deserializerFor}, as well as</li>
+ * <li>{@code HoodieAvroUtils#convertValueForAvroLogicalTypes}</li>
+ * </ol>
*
- * @param internalRow {@link InternalRow} instance from which partition path
needs to be fetched from.
- * @param structType schema of the internalRow.
- * @return the partition path.
+ * @param dataType target data-type of the given value
+ * @param value target value to be converted
*/
- @Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getPartitionPath(InternalRow internalRow, StructType
structType) {
- try {
- buildFieldSchemaInfoIfNeeded(structType);
- return
RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow,
getPartitionPathFields(),
- hiveStylePartitioning, partitionPathSchemaInfo);
- } catch (Exception e) {
- throw new HoodieException("Conversion of InternalRow to Row failed with
exception", e);
- }
- }
-
- void buildFieldSchemaInfoIfNeeded(StructType structType) {
- if (this.structType == null) {
- this.structType = structType;
- getRecordKeyFields()
- .stream().filter(f -> !f.isEmpty())
- .forEach(f -> recordKeySchemaInfo.put(f,
RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, true)));
- if (getPartitionPathFields() != null) {
- getPartitionPathFields().stream().filter(f -> !f.isEmpty())
- .forEach(f -> partitionPathSchemaInfo.put(f,
RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, false)));
+ private static Object convertToLogicalDataType(DataType dataType, Object
value) {
+ if (value == null) {
+ return null;
+ } else if (dataType instanceof TimestampType) {
+ // Provided value have to be [[Long]] in this case, representing micros
since epoch
+ return new Timestamp((Long) value / 1000);
+ } else if (dataType instanceof DateType) {
+ // Provided value have to be [[Int]] in this case
+ return LocalDate.ofEpochDay((Integer) value);
+ }
+
+ return value;
+ }
+
+ protected static class SparkRowConverter {
+ private static final String STRUCT_NAME = "hoodieRowTopLevelField";
+ private static final String NAMESPACE = "hoodieRow";
+
+ private final Function1<Row, GenericRecord> avroConverter;
+ private final SparkRowSerDe rowSerDe;
+
+ SparkRowConverter(StructType schema) {
+ this.rowSerDe = HoodieSparkUtils.getDeserializer(schema);
+ this.avroConverter = AvroConversionUtils.createConverterToAvro(schema,
STRUCT_NAME, NAMESPACE);
+ }
+
+ GenericRecord convertToAvro(Row row) {
+ return avroConverter.apply(row);
+ }
+
+ GenericRecord convertToAvro(InternalRow row) {
+ return avroConverter.apply(rowSerDe.deserializeRow(row));
+ }
+ }
+
+ protected class SparkRowAccessor {
Review Comment:
does this class have a UT?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.unsafe;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A helper class to write {@link UTF8String}s to an internal buffer and build
the concatenated
+ * {@link UTF8String} at the end.
+ */
+public class UTF8StringBuilder {
+
+ private static final int ARRAY_MAX =
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
+
+ private byte[] buffer;
+ private int cursor = Platform.BYTE_ARRAY_OFFSET;
+
+ public UTF8StringBuilder() {
Review Comment:
cc @xushiyan this probably should we on the PR template checklist too.
question on whether PR reuses code from another project
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java:
##########
@@ -46,11 +46,9 @@ public SimpleKeyGenerator(TypedProperties props) {
SimpleKeyGenerator(TypedProperties props, String recordKeyField, String
partitionPathField) {
super(props);
- this.recordKeyFields = recordKeyField == null
- ? Collections.emptyList() : Collections.singletonList(recordKeyField);
- this.partitionPathFields = partitionPathField == null
- ? Collections.emptyList() :
Collections.singletonList(partitionPathField);
- simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField,
partitionPathField);
+ this.recordKeyFields = recordKeyField == null ? Collections.emptyList() :
Collections.singletonList(recordKeyField);
Review Comment:
-1 on these code reformatting in the same PR with a large functionality
change
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -137,47 +135,59 @@ public HoodieRowCreateHandle(HoodieTable table,
* @throws IOException
*/
public void write(InternalRow row) throws IOException {
+ if (populateMetaFields) {
+ writeRow(row);
+ } else {
+ writeRowNoMetaFields(row);
+ }
+ }
+
+ private void writeRow(InternalRow row) {
try {
// NOTE: PLEASE READ THIS CAREFULLY BEFORE MODIFYING
// This code lays in the hot-path, and substantial caution should
be
// exercised making changes to it to minimize amount of excessive:
- // - Conversions b/w Spark internal (low-level) types and JVM
native ones (like
- // [[UTF8String]] and [[String]])
+ // - Conversions b/w Spark internal types and JVM native ones
(like [[UTF8String]]
+ // and [[String]])
// - Repeated computations (for ex, converting file-path to
[[UTF8String]] over and
// over again)
- UTF8String recordKey = row.getUTF8String(RECORD_KEY_META_FIELD_ORD);
-
- InternalRow updatedRow;
- // In cases when no meta-fields need to be added we simply relay
provided row to
- // the writer as is
- if (!populateMetaFields) {
- updatedRow = row;
- } else {
- UTF8String partitionPath =
row.getUTF8String(PARTITION_PATH_META_FIELD_ORD);
- // This is the only meta-field that is generated dynamically, hence
conversion b/w
- // [[String]] and [[UTF8String]] is unavoidable
- UTF8String seqId =
UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
-
- updatedRow = new HoodieInternalRow(commitTime, seqId, recordKey,
- partitionPath, fileName, row, true);
- }
+ UTF8String recordKey =
row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD);
+ UTF8String partitionPath =
row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
+ // This is the only meta-field that is generated dynamically, hence
conversion b/w
+ // [[String]] and [[UTF8String]] is unavoidable
+ UTF8String seqId =
UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
+
+ InternalRow updatedRow = new HoodieInternalRow(commitTime, seqId,
recordKey,
+ partitionPath, fileName, row, true);
try {
fileWriter.writeRow(recordKey, updatedRow);
// NOTE: To avoid conversion on the hot-path we only convert
[[UTF8String]] into [[String]]
// in cases when successful records' writes are being tracked
writeStatus.markSuccess(writeStatus.isTrackingSuccessfulWrites() ?
recordKey.toString() : null);
- } catch (Throwable t) {
+ } catch (Exception t) {
Review Comment:
this will change the failure behavior in general for Spark writes? if yes,
lets revert to old way
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -137,47 +135,59 @@ public HoodieRowCreateHandle(HoodieTable table,
* @throws IOException
*/
public void write(InternalRow row) throws IOException {
+ if (populateMetaFields) {
+ writeRow(row);
+ } else {
+ writeRowNoMetaFields(row);
+ }
+ }
+
+ private void writeRow(InternalRow row) {
try {
// NOTE: PLEASE READ THIS CAREFULLY BEFORE MODIFYING
// This code lays in the hot-path, and substantial caution should
be
// exercised making changes to it to minimize amount of excessive:
- // - Conversions b/w Spark internal (low-level) types and JVM
native ones (like
- // [[UTF8String]] and [[String]])
+ // - Conversions b/w Spark internal types and JVM native ones
(like [[UTF8String]]
+ // and [[String]])
// - Repeated computations (for ex, converting file-path to
[[UTF8String]] over and
// over again)
- UTF8String recordKey = row.getUTF8String(RECORD_KEY_META_FIELD_ORD);
-
- InternalRow updatedRow;
- // In cases when no meta-fields need to be added we simply relay
provided row to
- // the writer as is
- if (!populateMetaFields) {
- updatedRow = row;
- } else {
- UTF8String partitionPath =
row.getUTF8String(PARTITION_PATH_META_FIELD_ORD);
- // This is the only meta-field that is generated dynamically, hence
conversion b/w
- // [[String]] and [[UTF8String]] is unavoidable
- UTF8String seqId =
UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
-
- updatedRow = new HoodieInternalRow(commitTime, seqId, recordKey,
- partitionPath, fileName, row, true);
- }
+ UTF8String recordKey =
row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD);
+ UTF8String partitionPath =
row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
+ // This is the only meta-field that is generated dynamically, hence
conversion b/w
+ // [[String]] and [[UTF8String]] is unavoidable
+ UTF8String seqId =
UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
+
+ InternalRow updatedRow = new HoodieInternalRow(commitTime, seqId,
recordKey,
+ partitionPath, fileName, row, true);
try {
fileWriter.writeRow(recordKey, updatedRow);
// NOTE: To avoid conversion on the hot-path we only convert
[[UTF8String]] into [[String]]
// in cases when successful records' writes are being tracked
writeStatus.markSuccess(writeStatus.isTrackingSuccessfulWrites() ?
recordKey.toString() : null);
- } catch (Throwable t) {
+ } catch (Exception t) {
writeStatus.markFailure(recordKey.toString(), t);
}
- } catch (Throwable ge) {
- writeStatus.setGlobalError(ge);
- throw ge;
+ } catch (Exception e) {
+ writeStatus.setGlobalError(e);
+ throw e;
+ }
+ }
+
+ private void writeRowNoMetaFields(InternalRow row) {
+ try {
+ // TODO make sure writing w/ and w/o meta fields is consistent
(currently writing w/o
+ // meta-fields would fail if any record will, while when writing w/
meta-fields it won't)
+ fileWriter.writeRow(row);
+ writeStatus.markSuccess();
+ } catch (Exception e) {
Review Comment:
same comment here
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java:
##########
@@ -18,20 +18,65 @@
package org.apache.hudi.keygen;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
/**
- * Spark key generator interface.
+ * Spark-specific {@link KeyGenerator} interface extension allowing
implementation to
+ * specifically implement record-key, partition-path generation w/o the need
for (expensive)
+ * conversion from Spark internal representation (for ex, to Avro)
*/
public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
+ /**
+ * Extracts record key from Spark's {@link Row}
+ *
+ * @param row instance of {@link Row} from which record-key is extracted
+ * @return record's (primary) key
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
String getRecordKey(Row row);
- String getRecordKey(InternalRow row, StructType schema);
+ /**
+ * Extracts record key from Spark's {@link InternalRow}
+ *
+ * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link
InternalRow} could
+ * internally hold just a binary representation of the data, while
{@link Row} has it
+ * deserialized into JVM-native representation (like {@code Integer},
{@code Long},
+ * {@code String}, etc)
+ *
+ * @param row instance of {@link InternalRow} from which record-key is
extracted
+ * @param schema schema {@link InternalRow} is adhering to
+ * @return record-key as instance of {@link UTF8String}
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ UTF8String getRecordKey(InternalRow row, StructType schema);
+ /**
+ * Extracts partition-path from {@link Row}
+ *
+ * @param row instance of {@link Row} from which partition-path is extracted
+ * @return record's partition-path
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
String getPartitionPath(Row row);
- String getPartitionPath(InternalRow internalRow, StructType structType);
+ /**
+ * Extracts partition-path from Spark's {@link InternalRow}
+ *
+ * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link
InternalRow} could
+ * internally hold just a binary representation of the data, while
{@link Row} has it
+ * deserialized into JVM-native representation (like {@code Integer},
{@code Long},
+ * {@code String}, etc)
+ *
+ * @param row instance of {@link InternalRow} from which record-key is
extracted
+ * @param schema schema {@link InternalRow} is adhering to
+ * @return partition-path as instance of {@link UTF8String}
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ UTF8String getPartitionPath(InternalRow row, StructType schema);
Review Comment:
this is a breaking API change to be called out. cc @codope
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -50,22 +51,27 @@ public class HoodieInternalRow extends InternalRow {
/**
* Collection of meta-fields as defined by {@link
HoodieRecord#HOODIE_META_COLUMNS}
+ *
+ * NOTE: {@code HoodieInternalRow} *always* overlays its own meta-fields
even in case
+ * when source row also contains them, to make sure these fields are
mutable and
+ * can be updated (for ex, {@link UnsafeRow} doesn't support mutations
due to
+ * its memory layout, as it persists field offsets)
*/
private final UTF8String[] metaFields;
- private final InternalRow row;
+ private final InternalRow sourceRow;
/**
- * Specifies whether source {@link #row} contains meta-fields
+ * Specifies whether source {@link #sourceRow} contains meta-fields
*/
- private final boolean containsMetaFields;
+ private final boolean sourceContainsMetaFields;
Review Comment:
+1 this is already part of guidelines.
https://hudi.apache.org/contribute/developer-setup#coding-guidelines next time,
lets not land any PRs violating this, until resolved.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
import scala.Function1;
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link
SparkKeyGeneratorInterface}, which
+ * by default however fallback to Avro implementation. For maximum
performance (to avoid
+ * conversion from Spark's internal data-types to Avro) you should
override these methods
+ * in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
*/
+@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements
SparkKeyGeneratorInterface {
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
- private Function1<Row, GenericRecord> converterFn = null;
- private final AtomicBoolean validatePartitionFields = new
AtomicBoolean(false);
- protected StructType structType;
+ private static final Logger LOG =
LogManager.getLogger(BuiltinKeyGenerator.class);
+
+ private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+ protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 =
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+ protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+ protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
- protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo =
new HashMap<>();
- protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo
= new HashMap<>();
+ protected transient volatile SparkRowConverter rowConverter;
+ protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
- /**
- * Fetch record key from {@link Row}.
- *
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
- // TODO avoid conversion to avro
- // since converterFn is transient this will be repeatedly initialized
over and over again
- if (null == converterFn) {
- converterFn = AvroConversionUtils.createConverterToAvro(row.schema(),
STRUCT_NAME, NAMESPACE);
- }
- return getKey(converterFn.apply(row)).getRecordKey();
+ tryInitRowConverter(row.schema());
+ // NOTE: This implementation has considerable computational overhead and
has to be overridden
+ // to provide for optimal performance on Spark. This implementation
provided exclusively
+ // for compatibility reasons.
+ return getRecordKey(rowConverter.convertToAvro(row));
}
@Override
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getRecordKey(InternalRow internalRow, StructType schema) {
- try {
- // TODO fix
- buildFieldSchemaInfoIfNeeded(schema);
- return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow,
getRecordKeyFields(), recordKeySchemaInfo, false);
- } catch (Exception e) {
- throw new HoodieException("Conversion of InternalRow to Row failed with
exception", e);
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
Review Comment:
cc @xushiyan Could we update PR template to get authors to write out any
breaking changes to API, or table version changes in the PR description.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]