alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r974403023
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java:
##########
@@ -18,21 +18,28 @@
package org.apache.hudi.common.model;
+import java.io.IOException;
+import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.BaseKeyGenerator;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Properties;
+public interface HoodieRecordCompatibilityInterface {
-/**
- * HoodieMerge defines how to merge two records. It is a stateless component.
- * It can implement the merging logic of HoodieRecord of different engines
- * and avoid the performance consumption caused by the
serialization/deserialization of Avro payload.
- */
-public interface HoodieMerge extends Serializable {
-
- HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
+ /**
+ * This method used to extract HoodieKey not through keyGenerator.
+ */
+ HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
Review Comment:
👍
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -147,19 +141,19 @@ public static HoodieMergedLogRecordScanner.Builder
newBuilder() {
}
@Override
- protected void processNextRecord(HoodieRecord hoodieRecord) throws
IOException {
+ protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws
IOException {
String key = hoodieRecord.getRecordKey();
if (records.containsKey(key)) {
// Merge and store the merged record. The HoodieRecordPayload
implementation is free to decide what should be
// done when a DELETE (empty payload) is encountered before or after an
insert/update.
- HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
- HoodieRecordPayload oldValue = oldRecord.getData();
- HoodieRecordPayload combinedValue = (HoodieRecordPayload)
merge.preCombine(oldRecord, hoodieRecord).getData();
+ HoodieRecord<T> oldRecord = records.get(key);
+ T oldValue = oldRecord.getData();
+ T combinedValue = ((HoodieRecord<T>) recordMerger.merge(oldRecord,
hoodieRecord, readerSchema,
this.hoodieTableMetaClient.getTableConfig().getProps()).get()).getData();
// If combinedValue is oldValue, no need rePut oldRecord
if (combinedValue != oldValue) {
- HoodieOperation operation = hoodieRecord.getOperation();
- records.put(key, new HoodieAvroRecord<>(new HoodieKey(key,
hoodieRecord.getPartitionPath()), combinedValue, operation));
+ hoodieRecord.setData(combinedValue);
Review Comment:
Why are we resetting the data instead of using new `HoodieRecord` returned
by the Merger?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -126,11 +129,17 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Payload class used. Override this, if you like to
roll your own merge logic, when upserting/inserting. "
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL
in-effective");
- public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
- .key("hoodie.datasource.write.merge.class")
- .defaultValue(HoodieAvroRecordMerge.class.getName())
- .withDocumentation("Merge class provide stateless component interface
for merging records, and support various HoodieRecord "
- + "types, such as Spark records or Flink records.");
+ public static final ConfigProperty<String> MERGER_IMPLS = ConfigProperty
+ .key("hoodie.datasource.write.merger.impls")
+ .defaultValue(HoodieAvroRecordMerger.class.getName())
+ .withDocumentation("List of HoodieMerger implementations constituting
Hudi's merging strategy -- based on the engine used. "
+ + "These merger impls will filter by
hoodie.datasource.write.merger.strategy "
+ + "Hudi will pick most efficient implementation to perform
merging/combining of the records (during update, reading MOR table, etc)");
+
+ public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
+ .key("hoodie.datasource.write.merger.strategy")
+ .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
Review Comment:
Let's move this to HoodieMerger, rather than `StringUtils` (we can do it in
a follow-up)
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -156,11 +155,10 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("Payload class to use for performing compactions, i.e
merge delta logs with current base file and then "
+ " produce a new base file.");
- public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
- .key("hoodie.compaction.merge.class")
- .defaultValue(HoodieAvroRecordMerge.class.getName())
- .withDocumentation("Merge class provide stateless component interface
for merging records, and support various HoodieRecord "
- + "types, such as Spark records or Flink records.");
+ public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
+ .key("hoodie.compaction.merger.strategy")
+ .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
+ .withDocumentation("Id of merger strategy. Hudi will pick RecordMergers
in hoodie.datasource.write.merger.impls which has the same merger strategy id");
Review Comment:
nit: `HoodieRecordMerger` implementations
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -244,15 +242,24 @@ public class HoodieTableConfig extends HoodieConfig {
private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; //
<database_name>.<table_name>
- public HoodieTableConfig(FileSystem fs, String metaPath, String
payloadClassName) {
+ public HoodieTableConfig(FileSystem fs, String metaPath, String
payloadClassName, String mergerStrategy) {
Review Comment:
nit: `mergerStrategyId`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+public class HoodieSparkRecordUtils {
+
+ /**
+ * Utility method to convert InternalRow to HoodieRecord using schema and
payload class.
+ */
+ public static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean
withOperationField) {
+ return convertToHoodieSparkRecord(structType, data,
+ Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+ withOperationField, Option.empty());
+ }
+
+ public static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean
withOperationField,
+ Option<String> partitionName) {
+ return convertToHoodieSparkRecord(structType, data,
+ Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+ withOperationField, partitionName);
+ }
+
+ /**
+ * Utility method to convert bytes to HoodieRecord using schema and payload
class.
+ */
+ public static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType structType, InternalRow data,
Pair<String, String> recordKeyPartitionPathFieldPair,
+ boolean withOperationField, Option<String> partitionName) {
+ final String recKey = getValue(structType,
recordKeyPartitionPathFieldPair.getKey(), data).toString();
+ final String partitionPath = (partitionName.isPresent() ?
partitionName.get() :
+ getValue(structType, recordKeyPartitionPathFieldPair.getRight(),
data).toString());
+
+ HoodieOperation operation = withOperationField
+ ? HoodieOperation.fromName(getNullableValAsString(structType, data,
HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+ return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), data,
structType, operation);
+ }
+
+ private static Object getValue(StructType structType, String fieldName,
InternalRow row) {
+ NestedFieldPath posList =
HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
+ return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
+ }
+
+ /**
+ * Returns the string value of the given record {@code rec} and field {@code
fieldName}. The field and value both could be missing.
+ *
+ * @param row The record
+ * @param fieldName The field name
+ * @return the string form of the field or empty if the schema does not
contain the field name or the value is null
+ */
+ private static Option<String> getNullableValAsString(StructType structType,
InternalRow row, String fieldName) {
Review Comment:
I don't think we need this method (we can use `getValue(...).toString`
instead)
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -291,59 +284,51 @@ public void checkState() {
}
}
-
//////////////////////////////////////////////////////////////////////////////
-
- //
- // NOTE: This method duplicates those ones of the HoodieRecordPayload and
are placed here
- // for the duration of RFC-46 implementation, until migration off
`HoodieRecordPayload`
- // is complete
- //
- public abstract HoodieRecord mergeWith(HoodieRecord other, Schema
readerSchema, Schema writerSchema) throws IOException;
+ /**
+ * Get column in record to support RDDCustomColumnsSortPartitioner
+ */
+ public abstract Object getRecordColumnValues(Schema recordSchema, String[]
columns, boolean consistentLogicalTimestampEnabled);
- public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema
targetSchema, TypedProperties props) throws IOException;
+ /**
+ * Support bootstrap.
+ */
+ public abstract HoodieRecord mergeWith(HoodieRecord other, Schema
targetSchema) throws IOException;
/**
- * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata
fields.
+ * Rewrite record into new schema(add meta columns)
*/
- public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties
prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws
IOException;
+ public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties
props, Schema targetSchema) throws IOException;
- public abstract HoodieRecord rewriteRecordWithMetadata(Schema recordSchema,
Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields,
String fileName) throws IOException;
+ /**
+ * Support schema evolution.
+ */
+ public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties props, Schema newSchema, Map<String, String> renameCols) throws
IOException;
- public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties prop, Schema newSchema, Map<String, String> renameCols) throws
IOException;
+ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties props, Schema newSchema) throws IOException {
+ return rewriteRecordWithNewSchema(recordSchema, props, newSchema,
Collections.emptyMap());
+ }
- public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties prop, Schema newSchema, Map<String, String> renameCols, Mapper
mapper) throws IOException;
+ /**
+ * This method could change in the future.
+ * @temporary
+ */
+ public abstract HoodieRecord updateValues(Schema recordSchema, Properties
props, Map<String, String> metadataValues) throws IOException;
- public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties prop, Schema newSchema) throws IOException;
+ public abstract boolean isDelete(Schema schema, Properties props) throws
IOException;
- public abstract HoodieRecord overrideMetadataFieldValue(Schema recordSchema,
Properties prop, int pos, String newValue) throws IOException;
+ /**
+ * Is EmptyRecord. Generated by ExpressionPayload.
+ */
+ public abstract boolean shouldIgnore(Schema schema, Properties props) throws
IOException;
- public abstract HoodieRecord addMetadataValues(Schema recordSchema,
Properties prop, Map<HoodieMetadataField, String> metadataValues) throws
IOException;
+ public abstract Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema
schema, Properties props) throws IOException;
Review Comment:
Let's move this method to `HoodieCompatibilityInterface`
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java:
##########
@@ -20,33 +20,27 @@
package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import javax.annotation.Nonnull;
-
import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-
public class HoodieAvroRecord<T extends HoodieRecordPayload> extends
HoodieRecord<T> {
Review Comment:
We should rename this to be `HoodieLegacyAvroRecord` (to make it more clear
that this will eventually be going away)
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+public class HoodieSparkRecordUtils {
+
+ /**
+ * Utility method to convert InternalRow to HoodieRecord using schema and
payload class.
+ */
+ public static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean
withOperationField) {
+ return convertToHoodieSparkRecord(structType, data,
+ Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+ withOperationField, Option.empty());
+ }
+
+ public static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean
withOperationField,
+ Option<String> partitionName) {
+ return convertToHoodieSparkRecord(structType, data,
+ Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+ withOperationField, partitionName);
+ }
+
+ /**
+ * Utility method to convert bytes to HoodieRecord using schema and payload
class.
+ */
+ public static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType structType, InternalRow data,
Pair<String, String> recordKeyPartitionPathFieldPair,
+ boolean withOperationField, Option<String> partitionName) {
+ final String recKey = getValue(structType,
recordKeyPartitionPathFieldPair.getKey(), data).toString();
Review Comment:
We can't do that, we need to use key-gen to fetch both record-key and
partition-path
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+public class HoodieSparkRecordUtils {
+
+ /**
+ * Utility method to convert InternalRow to HoodieRecord using schema and
payload class.
+ */
+ public static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean
withOperationField) {
+ return convertToHoodieSparkRecord(structType, data,
+ Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+ withOperationField, Option.empty());
+ }
+
+ public static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean
withOperationField,
+ Option<String> partitionName) {
+ return convertToHoodieSparkRecord(structType, data,
+ Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+ withOperationField, partitionName);
+ }
+
+ /**
+ * Utility method to convert bytes to HoodieRecord using schema and payload
class.
+ */
+ public static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType structType, InternalRow data,
Pair<String, String> recordKeyPartitionPathFieldPair,
+ boolean withOperationField, Option<String> partitionName) {
+ final String recKey = getValue(structType,
recordKeyPartitionPathFieldPair.getKey(), data).toString();
+ final String partitionPath = (partitionName.isPresent() ?
partitionName.get() :
+ getValue(structType, recordKeyPartitionPathFieldPair.getRight(),
data).toString());
+
+ HoodieOperation operation = withOperationField
+ ? HoodieOperation.fromName(getNullableValAsString(structType, data,
HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+ return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), data,
structType, operation);
+ }
+
+ private static Object getValue(StructType structType, String fieldName,
InternalRow row) {
+ NestedFieldPath posList =
HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
+ return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
+ }
+
+ /**
+ * Returns the string value of the given record {@code rec} and field {@code
fieldName}. The field and value both could be missing.
+ *
+ * @param row The record
+ * @param fieldName The field name
+ * @return the string form of the field or empty if the schema does not
contain the field name or the value is null
+ */
+ private static Option<String> getNullableValAsString(StructType structType,
InternalRow row, String fieldName) {
+ String fieldVal =
!HoodieCatalystExpressionUtils$.MODULE$.existField(structType, fieldName)
+ ? null : StringUtils.objToString(getValue(structType, fieldName, row));
+ return Option.ofNullable(fieldVal);
+ }
+
+ /**
+ * Gets record column values into one object.
+ *
+ * @param row InternalRow.
+ * @param columns Names of the columns to get values.
+ * @param structType {@link StructType} instance.
+ * @return Column value if a single column, or concatenated String values by
comma.
+ */
+ public static Object getRecordColumnValues(InternalRow row,
Review Comment:
Please check my other comment regarding making this method return an array
of objects, instead of concatenated string
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -291,59 +284,51 @@ public void checkState() {
}
}
-
//////////////////////////////////////////////////////////////////////////////
-
- //
- // NOTE: This method duplicates those ones of the HoodieRecordPayload and
are placed here
- // for the duration of RFC-46 implementation, until migration off
`HoodieRecordPayload`
- // is complete
- //
- public abstract HoodieRecord mergeWith(HoodieRecord other, Schema
readerSchema, Schema writerSchema) throws IOException;
+ /**
+ * Get column in record to support RDDCustomColumnsSortPartitioner
+ */
+ public abstract Object getRecordColumnValues(Schema recordSchema, String[]
columns, boolean consistentLogicalTimestampEnabled);
- public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema
targetSchema, TypedProperties props) throws IOException;
+ /**
+ * Support bootstrap.
+ */
+ public abstract HoodieRecord mergeWith(HoodieRecord other, Schema
targetSchema) throws IOException;
Review Comment:
We should not have this method in the `HoodieRecord` -- this method should
be implemented w/in `HoodieRecordMerger` itself
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -213,6 +200,10 @@ public HoodieRecord
setCurrentLocation(HoodieRecordLocation location) {
return this;
}
+ public void setData(T data) {
Review Comment:
We should avoid any mutating methods in the interface
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import static
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+ private StructType structType = null;
Review Comment:
We should make this `transient` to make sure we don't accidentally serialize
schema along w/ every record
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java:
##########
@@ -20,18 +20,41 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
public class HoodieFileReaderFactory {
- public static HoodieAvroFileReader getFileReader(Configuration conf, Path
path) throws IOException {
+ public static HoodieFileReaderFactory
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
+ switch (recordType) {
+ case AVRO:
+ return HoodieAvroFileReaderFactory.getFileReaderFactory();
+ case SPARK:
+ try {
+ Class<?> clazz =
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
+ Method method = clazz.getMethod("getFileReaderFactory", null);
Review Comment:
Few nits:
- We don't need to make FileFactory a singleton, we can instantiate it
every time
- That makes this much simpler we can just call `newInstance`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import static
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+ private StructType structType = null;
+ private Option<Long> schemaFingerPrint = Option.empty();
+
+ public HoodieSparkRecord(InternalRow data, StructType schema) {
+ super(null, data);
+ initSchema(schema);
+ }
+
+ public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema)
{
+ super(key, data);
+ initSchema(schema);
+ }
+
+ public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema,
HoodieOperation operation) {
+ super(key, data, operation);
+ initSchema(schema);
+ }
+
+ public HoodieSparkRecord(HoodieSparkRecord record) {
+ super(record);
+ initSchema(record.getStructType());
+ }
+
+ @Override
+ public HoodieRecord<InternalRow> newInstance() {
+ return new HoodieSparkRecord(this);
+ }
+
+ @Override
+ public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation
op) {
+ return new HoodieSparkRecord(key, data, getStructType(), op);
+ }
+
+ @Override
+ public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+ return new HoodieSparkRecord(key, data, getStructType());
+ }
+
+ @Override
+ public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+ if (key != null) {
+ return getRecordKey();
+ }
+ return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface)
keyGeneratorOpt.get())
+ .getRecordKey(data, getStructType()).toString() :
data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+ }
+
+ @Override
+ public String getRecordKey(String keyFieldName) {
+ if (key != null) {
+ return getRecordKey();
+ }
+ DataType dataType = getStructType().apply(keyFieldName).dataType();
+ int pos = getStructType().fieldIndex(keyFieldName);
+ return data.get(pos, dataType).toString();
+ }
+
+ @Override
+ public HoodieRecordType getRecordType() {
+ return HoodieRecordType.SPARK;
+ }
+
+ @Override
+ public Object getRecordColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
+ return HoodieSparkRecordUtils.getRecordColumnValues(data, columns,
getStructType(), consistentLogicalTimestampEnabled);
+ }
+
+ @Override
+ public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema)
throws IOException {
+ StructType otherStructType = ((HoodieSparkRecord) other).getStructType();
+ StructType writerStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data,
getStructType(), (InternalRow) other.getData(), otherStructType,
writerStructType);
+ return new HoodieSparkRecord(getKey(), mergeRow, writerStructType,
getOperation());
+ }
+
+ @Override
+ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props,
Schema targetSchema) throws IOException {
+ StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ UTF8String[] metaFields = extractMetaField(targetStructType);
+ if (metaFields.length == 0) {
+ throw new UnsupportedOperationException();
+ }
+
+ InternalRow resultRow;
+ if (extractMetaField(getStructType()).length == 0) {
+ resultRow = new HoodieInternalRow(metaFields, data, false);
+ } else {
+ resultRow = new HoodieInternalRow(metaFields, data, true);
+ }
+
+ return new HoodieSparkRecord(getKey(), resultRow, targetStructType,
getOperation());
+ }
+
+ @Override
+ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties props, Schema newSchema, Map<String, String> renameCols) throws
IOException {
+ StructType newStructType =
HoodieInternalRowUtils.getCachedSchema(newSchema);
+ InternalRow rewriteRow =
HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, getStructType(),
newStructType, renameCols);
+ UnsafeProjection unsafeConvert =
HoodieInternalRowUtils.getCachedUnsafeConvert(newStructType);
+ InternalRow resultRow = unsafeConvert.apply(rewriteRow);
+ UTF8String[] metaFields = extractMetaField(newStructType);
+ if (metaFields.length > 0) {
+ resultRow = new HoodieInternalRow(metaFields, data, true);
+ }
+
+ return new HoodieSparkRecord(getKey(), resultRow, newStructType,
getOperation());
+ }
+
+ @Override
+ public HoodieRecord updateValues(Schema recordSchema, Properties props,
Map<String, String> metadataValues) throws IOException {
+ metadataValues.forEach((key, value) -> {
+ int pos = getStructType().fieldIndex(key);
+ if (value != null) {
+ data.update(pos, CatalystTypeConverters.convertToCatalyst(value));
+ }
+ });
+
+ return new HoodieSparkRecord(getKey(), data, getStructType(),
getOperation());
+ }
+
+ @Override
+ public boolean isDelete(Schema schema, Properties props) throws IOException {
+ if (null == data) {
+ return true;
+ }
+ if (schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) {
+ return false;
+ }
+ Object deleteMarker =
data.get(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(),
BooleanType);
+ return deleteMarker instanceof Boolean && (boolean) deleteMarker;
+ }
+
+ @Override
+ public boolean shouldIgnore(Schema schema, Properties props) throws
IOException {
+ if (data != null && data.equals(SENTINEL)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
+ Schema schema, Properties props,
+ Option<Pair<String, String>> simpleKeyGenFieldsOpt,
+ Boolean withOperation,
+ Option<String> partitionNameOp,
+ Boolean populateMetaFields) {
+ if (populateMetaFields) {
+ return
HoodieSparkRecordUtils.convertToHoodieSparkRecord(getStructType(), data,
withOperation);
Review Comment:
Let's move this methods in this class and make them private to limit their
access (they should also be deprecated/removed after we deprecate this method)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -505,7 +514,9 @@ public class HoodieWriteConfig extends HoodieConfig {
private HoodieMetadataConfig metadataConfig;
private HoodieMetastoreConfig metastoreConfig;
private HoodieCommonConfig commonConfig;
+ private HoodieStorageConfig storageConfig;
private EngineType engineType;
+ private HoodieRecordMerger recordMerger;
Review Comment:
I don't think we need to hold `recordMerger` -- we should instantiate it on
the fly
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.twitter.chill.KSerializer
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream,
ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import org.apache.avro.SchemaNormalization
+import org.apache.commons.io.IOUtils
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkEnv, SparkException}
+import scala.collection.mutable
+
+/**
+ * Custom serializer used for generic spark records. If the user registers the
schemas
+ * ahead of time, then the schema's fingerprint will be sent with each message
instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally expensive
so the serializer
+ * caches all previously seen values as to reduce the amount of work needed to
do.
+ * @param schemas a map where the keys are unique IDs for spark schemas and
the values are the
+ * string representation of the Avro schema, used to decrease
the amount of data
+ * that needs to be serialized.
+ */
+class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends
KSerializer[HoodieSparkRecord] {
Review Comment:
Let's not forget that we most importantly need this serializer to be
registered w/ Spark:
https://spark.incubator.apache.org/docs/0.6.0/tuning.html#data-serialization
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.twitter.chill.KSerializer
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream,
ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import org.apache.avro.SchemaNormalization
+import org.apache.commons.io.IOUtils
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkEnv, SparkException}
+import scala.collection.mutable
+
+/**
+ * Custom serializer used for generic spark records. If the user registers the
schemas
+ * ahead of time, then the schema's fingerprint will be sent with each message
instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally expensive
so the serializer
+ * caches all previously seen values as to reduce the amount of work needed to
do.
+ * @param schemas a map where the keys are unique IDs for spark schemas and
the values are the
+ * string representation of the Avro schema, used to decrease
the amount of data
+ * that needs to be serialized.
+ */
+class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends
KSerializer[HoodieSparkRecord] {
Review Comment:
This is rather `HoodieSparkRecordSerializer`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
Review Comment:
@wzx140 i think we might have missed this comment
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -30,9 +34,19 @@
* It can implement the merging logic of HoodieRecord of different engines
* and avoid the performance consumption caused by the
serialization/deserialization of Avro payload.
*/
-public interface HoodieMerge extends Serializable {
-
- HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieRecordMerger extends Serializable {
+
+ /**
+ * This method converges combineAndGetUpdateValue and precombine from
HoodiePayload.
+ * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we
can translate as having 3 versions A, B, C
+ * of the single record, both orders of operations applications have to
yield the same result)
+ */
+ Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema
schema, Properties props) throws IOException;
Review Comment:
@vinothchandar we've touched upon this w/ @prasannarajaperumal recently:
- Initial take is that we're planning to have this method involved only
when 2 records are merged (deletion is sub-type of merge, where second record
is sentinel). Insertions will bypass this method
- Your concern regarding users who have custom logic in `getInsertValue` is
valid, but we don't want to overload the API out the gate and want actually to
start w/ a simple API and increase complexity as we get more signals in terms
of the other ways people are using (if they do)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]