vinothchandar commented on code in PR #5627:
URL: https://github.com/apache/hudi/pull/5627#discussion_r909033492


##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for HoodieRecord.
+ */
+public class HoodieRecordUtils {

Review Comment:
   UTs for this class, if not exists



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -59,6 +61,7 @@
    */
   protected final Schema tableSchema;
   protected final Schema tableSchemaWithMetaFields;
+  protected final HoodieMerge hoodieMerge;

Review Comment:
   just `merge` ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -791,6 +792,11 @@ public PropertyBuilder setPayloadClassName(String 
payloadClassName) {
       return this;
     }
 
+    public PropertyBuilder setMergeClassName(String mergeClassName) {

Review Comment:
   What's our migration plan ? do we replace built payloads with equivalent 
merge classes? We can ask the community how painful it'll be to just port over 
their custom payloads? 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -131,25 +131,35 @@ public String getFieldName() {
    */
   private HoodieOperation operation;
 
+  /**
+   * For purposes of preCombining.
+   */
+  private Comparable<?> orderingVal;

Review Comment:
   cc @danny0405 for comments



##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for HoodieRecord.
+ */
+public class HoodieRecordUtils {
+
+  private static final Map<String, Object> INSTANCE_CACHE = new HashMap<>();

Review Comment:
   would n't ReflectUtils already do this? Can we reuse that



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {

Review Comment:
   Add UT for this if not exists?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -131,25 +131,35 @@ public String getFieldName() {
    */
   private HoodieOperation operation;
 
+  /**
+   * For purposes of preCombining.
+   */
+  private Comparable<?> orderingVal;

Review Comment:
   wondering if we should call this `eventTime` (instantTime/commitTime is 
arrival time), then the concepts become clearer



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -234,6 +235,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + "the record payload class to merge records in the log against each 
other, merge again with the base file and "
           + "produce the final record to be written after compaction.");
 
+  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty

Review Comment:
   looks great



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -303,7 +305,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ 
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
       // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
       //       on the record from the Delta Log
-      toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
+      val combinedRecord = hoodieMerge.combineAndGetUpdateValue(new 
HoodieAvroIndexedRecord(curAvroRecord), newRecord, logFileReaderAvroSchema, 
payloadProps)

Review Comment:
   we should do some sanity checks on performance



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * HoodieMerge defines how to merge two records. It is a stateless component.
+ * It can implement the merging logic of HoodieRecord of different engines
+ * and avoid the performance consumption caused by the 
serialization/deserialization of Avro payload.
+ */
+public interface HoodieMerge extends Serializable {
+  
+  HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);

Review Comment:
   can we have a single API that can handle both? 
   
   ```
      Option<HoodieRecord> merge(Option<HoodieRecord> older, 
Option<HoodieRecord> newer) {
      
      }
   ```
   
   So for CreateHandle/AppendHandle, where we do inserts we can actually have 
`older=Option.Empty` and in case the merge implementation wants to delete the 
skip, it can return `Option.Empty` . 
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hudi.TypeUtils.unsafeCast;
+
+public class HoodieAvroRecordMerge implements HoodieMerge {
+  @Override
+  public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {

Review Comment:
   Hmmm. one reason why need separate `preCombine` and 
`combineAndGetUpdateValue` seems to be for supporting the payload API fallback? 
Would love to have a single simple merge() method if possible, let me mull this 
over more 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -301,6 +301,12 @@ object DataSourceWriteOptions {
    */
   val PAYLOAD_CLASS_NAME = HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME
 
+  /**
+   * HoodieMerge will replace the payload to process the merge of data
+   * and provide the same capabilities as the payload
+   */
+  val MERGE_CLASS_NAME = HoodieWriteConfig.MERGE_CLASS_NAME

Review Comment:
   do we need a config to control whether we do `SparkRecord` based writing or 
avro based writing? is Avro (old behavior the default still?)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -103,6 +106,7 @@ protected HoodieWriteHandle(HoodieWriteConfig config, 
String instantTime, String
     this.taskContextSupplier = taskContextSupplier;
     this.writeToken = makeWriteToken();
     schemaOnReadEnabled = 
!isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
+    this.hoodieMerge = 
HoodieRecordUtils.loadHoodieMerge(config.getMergeClass());

Review Comment:
   rename: `loadMerge` ? we can drop the `hoodie` from methods



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -123,6 +124,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Payload class used. Override this, if you like to 
roll your own merge logic, when upserting/inserting. "
           + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL 
in-effective");
 
+  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty

Review Comment:
   we can make this just `hoodie.merge.class` . the datasource naming below is 
probably not a good one if you are basing off that
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to