vinothchandar commented on a change in pull request #2311:
URL: https://github.com/apache/hudi/pull/2311#discussion_r543467964



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
##########
@@ -0,0 +1,12 @@
+package org.apache.hudi.common.model;
+
+/**
+ * Since both payload classes and HoodiePayloadConfig needs to access these 
props, storing it here.
+ */
+public class HoodiePayloadProps {
+
+  // payload ordering field

Review comment:
       more descriptive doc?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
+import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Default payload used for delta streamer.
+ * <p>
+ * 1. preCombine - Picks the latest delta record for a key, based on an 
ordering field
+ * 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest record 
based on ordering field value.
+ */
+public class DefaultHoodieRecordPayload extends BaseAvroPayload
+    implements HoodieRecordPayload<DefaultHoodieRecordPayload> {
+
+  public DefaultHoodieRecordPayload(GenericRecord record, Comparable 
orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public DefaultHoodieRecordPayload(Option<GenericRecord> record) {
+    this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural 
order
+  }
+
+  @Override
+  public DefaultHoodieRecordPayload preCombine(DefaultHoodieRecordPayload 
another) {
+    // pick the payload with greatest ordering value
+    if (another.orderingVal.compareTo(orderingVal) > 0) {
+      return another;
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public Option<IndexedRecord> getInsertValue(Schema schema) throws 
IOException {
+    if (recordBytes.length == 0) {
+      return Option.empty();
+    }
+    IndexedRecord indexedRecord = bytesToAvro(recordBytes, schema);
+    if (isDeleteRecord((GenericRecord) indexedRecord)) {
+      return Option.empty();
+    } else {
+      return Option.of(indexedRecord);
+    }
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema) throws IOException{
+    return getInsertValue(schema);
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
+    if (recordBytes.length == 0) {
+      return Option.empty();
+    }
+    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+    /*
+     * Combining strategy here returns currentValue on disk if incoming record 
is older.
+     * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
+     * or an insert/update record. In any case, if it is older than the record 
in disk, the currentValue
+     * in disk is returned (to be rewritten with new commit time).
+     *
+     * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation 
type do not hit this code path
+     * and need to be dealt with separately.
+     */
+    Object persistedOrderingVal = getNestedFieldVal((GenericRecord) 
currentValue, 
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP), true);
+    Comparable incomingOrderingVal = (Comparable) 
getNestedFieldVal(incomingRecord, 
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP), false);
+
+    // Null check is needed here to support schema evolution. The record in 
storage may be from old schema where
+    // the new ordering column might not be present and hence returns null.
+    if (persistedOrderingVal != null && ((Comparable) 
persistedOrderingVal).compareTo(incomingOrderingVal) > 0) {

Review comment:
       nts: the check means persistedOrderingVal > incomingOrderingVal i.e we 
should retain the value on disk 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -29,47 +29,93 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Properties;
 
 /**
- * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which
- * depend on record specific logic.
+ * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on 
record specific logic.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends 
Serializable {
 
   /**
-   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to
-   * insert/upsert (if combining turned on in HoodieClientConfig).
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig).
    */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   T preCombine(T another);
 
   /**
-   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on
-   * storage and whats contained in this object.
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig) by taking in a property map. Implementation can 
leverage the property to decide their business logic to do preCombine.
+   * @param another instance of another {@link HoodieRecordPayload} to be 
combined with.
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return the combined value
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default T preCombine(T another, Properties properties) {
+    return preCombine(another);
+  }
+
+  /**
+   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on storage and whats contained
+   * in this object.
    * <p>
-   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You
-   * may be reading DB redo logs, and merge them with current image for a 
database row on storage
+   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You may be reading DB redo logs,

Review comment:
       I think we should just refer to the other method here in the javadoc, 
instead of repeating the entire description here again. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -29,47 +29,93 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Properties;
 
 /**
- * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which
- * depend on record specific logic.
+ * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on 
record specific logic.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends 
Serializable {
 
   /**
-   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to
-   * insert/upsert (if combining turned on in HoodieClientConfig).
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig).
    */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   T preCombine(T another);
 
   /**
-   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on
-   * storage and whats contained in this object.
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig) by taking in a property map. Implementation can 
leverage the property to decide their business logic to do preCombine.
+   * @param another instance of another {@link HoodieRecordPayload} to be 
combined with.
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return the combined value
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default T preCombine(T another, Properties properties) {
+    return preCombine(another);
+  }
+
+  /**
+   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on storage and whats contained
+   * in this object.
    * <p>
-   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You
-   * may be reading DB redo logs, and merge them with current image for a 
database row on storage
+   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You may be reading DB redo logs,

Review comment:
       close the `</p>`

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
+import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Default payload used for delta streamer.
+ * <p>
+ * 1. preCombine - Picks the latest delta record for a key, based on an 
ordering field
+ * 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest record 
based on ordering field value.
+ */
+public class DefaultHoodieRecordPayload extends BaseAvroPayload
+    implements HoodieRecordPayload<DefaultHoodieRecordPayload> {
+
+  public DefaultHoodieRecordPayload(GenericRecord record, Comparable 
orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public DefaultHoodieRecordPayload(Option<GenericRecord> record) {
+    this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural 
order
+  }
+
+  @Override
+  public DefaultHoodieRecordPayload preCombine(DefaultHoodieRecordPayload 
another) {
+    // pick the payload with greatest ordering value
+    if (another.orderingVal.compareTo(orderingVal) > 0) {
+      return another;
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public Option<IndexedRecord> getInsertValue(Schema schema) throws 
IOException {
+    if (recordBytes.length == 0) {
+      return Option.empty();
+    }
+    IndexedRecord indexedRecord = bytesToAvro(recordBytes, schema);
+    if (isDeleteRecord((GenericRecord) indexedRecord)) {
+      return Option.empty();
+    } else {
+      return Option.of(indexedRecord);
+    }
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema) throws IOException{
+    return getInsertValue(schema);
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
+    if (recordBytes.length == 0) {
+      return Option.empty();
+    }
+    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+    /*
+     * Combining strategy here returns currentValue on disk if incoming record 
is older.
+     * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
+     * or an insert/update record. In any case, if it is older than the record 
in disk, the currentValue
+     * in disk is returned (to be rewritten with new commit time).
+     *
+     * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation 
type do not hit this code path
+     * and need to be dealt with separately.
+     */
+    Object persistedOrderingVal = getNestedFieldVal((GenericRecord) 
currentValue, 
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP), true);

Review comment:
       just do the `Comparable` cast here itself?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -29,47 +29,93 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Properties;
 
 /**
- * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which
- * depend on record specific logic.
+ * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on 
record specific logic.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends 
Serializable {
 
   /**
-   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to
-   * insert/upsert (if combining turned on in HoodieClientConfig).
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig).
    */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   T preCombine(T another);
 
   /**
-   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on
-   * storage and whats contained in this object.
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig) by taking in a property map. Implementation can 
leverage the property to decide their business logic to do preCombine.
+   * @param another instance of another {@link HoodieRecordPayload} to be 
combined with.
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return the combined value
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default T preCombine(T another, Properties properties) {
+    return preCombine(another);
+  }
+
+  /**
+   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on storage and whats contained
+   * in this object.
    * <p>
-   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You
-   * may be reading DB redo logs, and merge them with current image for a 
database row on storage
+   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You may be reading DB redo logs,
+   * and merge them with current image for a database row on storage
    *
    * @param currentValue Current value in storage, to merge/combine this 
payload with
    * @param schema Schema used for record
    * @return new combined/merged value to be written back to storage. EMPTY to 
skip writing this record.
    */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, 
Schema schema) throws IOException;
 
   /**
-   * Generates an avro record out of the given HoodieRecordPayload, to be 
written out to storage. Called when writing a
-   * new value for the given HoodieKey, wherein there is no existing record in 
storage to be combined against. (i.e
-   * insert) Return EMPTY to skip writing this record.
+   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on storage and whats contained
+   * in this object. This method takes in a property map as an arg so that 
implementation can decide their business logic based
+   *    * on some properties set.
+   * <p>
+   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You may be reading DB redo logs,
+   * and merge them with current image for a database row on storage
+   *
+   * @param currentValue Current value in storage, to merge/combine this 
payload with
+   * @param schema Schema used for record
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return new combined/merged value to be written back to storage. EMPTY to 
skip writing this record.
    */
+  default Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
+    return combineAndGetUpdateValue(currentValue, schema);
+  }
+
+  /**
+   * Generates an avro record out of the given HoodieRecordPayload, to be 
written out to storage. Called when writing a new value for the given
+   * HoodieKey, wherein there is no existing record in storage to be combined 
against. (i.e insert) Return EMPTY to skip writing this record.
+   * @param schema Schema used for record
+   * @return the {@link IndexedRecord} to be inserted.
+   */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   Option<IndexedRecord> getInsertValue(Schema schema) throws IOException;
 
   /**
-   * This method can be used to extract some metadata from 
HoodieRecordPayload. The metadata is passed to
-   * {@code WriteStatus.markSuccess()} and {@code WriteStatus.markFailure()} 
in order to compute some aggregate metrics
-   * using the metadata in the context of a write success or failure.
+   * Generates an avro record out of the given HoodieRecordPayload, to be 
written out to storage. Called when writing a new value for the given
+   * HoodieKey, wherein there is no existing record in storage to be combined 
against. (i.e insert) Return EMPTY to skip writing this record.
+   * This method takes in a property map as an arg so that implementation can 
decide their business logic based on some properties set.

Review comment:
       this line can be bit pithy? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
+import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Default payload used for delta streamer.

Review comment:
       this is not just for delta streamer. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
+import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Default payload used for delta streamer.
+ * <p>
+ * 1. preCombine - Picks the latest delta record for a key, based on an 
ordering field
+ * 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest record 
based on ordering field value.
+ */
+public class DefaultHoodieRecordPayload extends BaseAvroPayload

Review comment:
       is this class pretty much a modified version of 
`OverwriteWithLatestAvroPayload`?  if so, can we reuse some code by having that 
extend from this, and override methods as needed?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -29,47 +29,93 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Properties;
 
 /**
- * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which
- * depend on record specific logic.
+ * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on 
record specific logic.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends 
Serializable {
 
   /**
-   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to
-   * insert/upsert (if combining turned on in HoodieClientConfig).
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig).
    */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   T preCombine(T another);
 
   /**
-   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on
-   * storage and whats contained in this object.
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on

Review comment:
       same here

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -729,6 +731,10 @@ public FileSystemViewStorageConfig 
getClientSpecifiedViewStorageConfig() {
     return clientSpecifiedViewStorageConfig;
   }
 
+  public HoodiePayloadConfig getHoodiePayloadConfig() {

Review comment:
       rename: getPayloadConfig()

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
##########
@@ -29,6 +29,7 @@
  * Base class for all AVRO record based payloads, that can be ordered based on 
a field.
  */
 public abstract class BaseAvroPayload implements Serializable {
+  

Review comment:
       nit: extra line

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -29,47 +29,93 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Properties;
 
 /**
- * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which
- * depend on record specific logic.
+ * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on 
record specific logic.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends 
Serializable {
 
   /**
-   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to
-   * insert/upsert (if combining turned on in HoodieClientConfig).
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig).

Review comment:
       please remove "if combinng turned on ..." comment. we should ideally 
refrain from referring to higher level constructs from lower layers.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -29,47 +29,93 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Properties;
 
 /**
- * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which
- * depend on record specific logic.
+ * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on 
record specific logic.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends 
Serializable {
 
   /**
-   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to
-   * insert/upsert (if combining turned on in HoodieClientConfig).
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig).
    */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   T preCombine(T another);
 
   /**
-   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on
-   * storage and whats contained in this object.
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig) by taking in a property map. Implementation can 
leverage the property to decide their business logic to do preCombine.
+   * @param another instance of another {@link HoodieRecordPayload} to be 
combined with.
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return the combined value
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default T preCombine(T another, Properties properties) {
+    return preCombine(another);
+  }
+
+  /**
+   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on storage and whats contained
+   * in this object.
    * <p>
-   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You
-   * may be reading DB redo logs, and merge them with current image for a 
database row on storage
+   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You may be reading DB redo logs,
+   * and merge them with current image for a database row on storage
    *
    * @param currentValue Current value in storage, to merge/combine this 
payload with
    * @param schema Schema used for record
    * @return new combined/merged value to be written back to storage. EMPTY to 
skip writing this record.
    */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)

Review comment:
       mark all deprecated methods with the right ApiMaturityLevel.DEPRECATED?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -29,47 +29,93 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Properties;
 
 /**
- * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which
- * depend on record specific logic.
+ * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on 
record specific logic.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends 
Serializable {
 
   /**
-   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to
-   * insert/upsert (if combining turned on in HoodieClientConfig).
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig).
    */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   T preCombine(T another);
 
   /**
-   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on
-   * storage and whats contained in this object.
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig) by taking in a property map. Implementation can 
leverage the property to decide their business logic to do preCombine.
+   * @param another instance of another {@link HoodieRecordPayload} to be 
combined with.
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return the combined value
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default T preCombine(T another, Properties properties) {
+    return preCombine(another);
+  }
+
+  /**
+   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on storage and whats contained
+   * in this object.
    * <p>
-   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You
-   * may be reading DB redo logs, and merge them with current image for a 
database row on storage
+   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You may be reading DB redo logs,
+   * and merge them with current image for a database row on storage
    *
    * @param currentValue Current value in storage, to merge/combine this 
payload with
    * @param schema Schema used for record
    * @return new combined/merged value to be written back to storage. EMPTY to 
skip writing this record.
    */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, 
Schema schema) throws IOException;
 
   /**
-   * Generates an avro record out of the given HoodieRecordPayload, to be 
written out to storage. Called when writing a
-   * new value for the given HoodieKey, wherein there is no existing record in 
storage to be combined against. (i.e
-   * insert) Return EMPTY to skip writing this record.
+   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on storage and whats contained
+   * in this object. This method takes in a property map as an arg so that 
implementation can decide their business logic based
+   *    * on some properties set.
+   * <p>
+   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You may be reading DB redo logs,
+   * and merge them with current image for a database row on storage
+   *
+   * @param currentValue Current value in storage, to merge/combine this 
payload with
+   * @param schema Schema used for record
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return new combined/merged value to be written back to storage. EMPTY to 
skip writing this record.
    */
+  default Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
+    return combineAndGetUpdateValue(currentValue, schema);
+  }
+
+  /**
+   * Generates an avro record out of the given HoodieRecordPayload, to be 
written out to storage. Called when writing a new value for the given
+   * HoodieKey, wherein there is no existing record in storage to be combined 
against. (i.e insert) Return EMPTY to skip writing this record.
+   * @param schema Schema used for record
+   * @return the {@link IndexedRecord} to be inserted.
+   */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   Option<IndexedRecord> getInsertValue(Schema schema) throws IOException;
 
   /**
-   * This method can be used to extract some metadata from 
HoodieRecordPayload. The metadata is passed to
-   * {@code WriteStatus.markSuccess()} and {@code WriteStatus.markFailure()} 
in order to compute some aggregate metrics
-   * using the metadata in the context of a write success or failure.
+   * Generates an avro record out of the given HoodieRecordPayload, to be 
written out to storage. Called when writing a new value for the given
+   * HoodieKey, wherein there is no existing record in storage to be combined 
against. (i.e insert) Return EMPTY to skip writing this record.
+   * This method takes in a property map as an arg so that implementation can 
decide their business logic based on some properties set.
+   * @param schema Schema used for record
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return the {@link IndexedRecord} to be inserted.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default Option<IndexedRecord> getInsertValue(Schema schema, Properties 
properties) throws IOException {
+    return getInsertValue(schema);
+  }
+
+  /**
+   * This method can be used to extract some metadata from 
HoodieRecordPayload. The metadata is passed to {@code 
WriteStatus.markSuccess()} and

Review comment:
       thanks for fixing this 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -29,47 +29,93 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Properties;
 
 /**
- * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which
- * depend on record specific logic.
+ * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on 
record specific logic.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends 
Serializable {
 
   /**
-   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to
-   * insert/upsert (if combining turned on in HoodieClientConfig).
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig).
    */
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   T preCombine(T another);
 
   /**
-   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on
-   * storage and whats contained in this object.
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert (if combining turned on
+   * in HoodieClientConfig) by taking in a property map. Implementation can 
leverage the property to decide their business logic to do preCombine.
+   * @param another instance of another {@link HoodieRecordPayload} to be 
combined with.
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return the combined value
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default T preCombine(T another, Properties properties) {
+    return preCombine(another);
+  }
+
+  /**
+   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on storage and whats contained
+   * in this object.
    * <p>
-   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You
-   * may be reading DB redo logs, and merge them with current image for a 
database row on storage
+   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You may be reading DB redo logs,

Review comment:
       place 2 on a newline?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to