This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 33d338f  [HUDI-115] Adding DefaultHoodieRecordPayload to honor 
ordering with combineAndGetUpdateValue (#2311)
33d338f is described below

commit 33d338f3923862fdff24443a02d8a33a56d92e63
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Dec 19 22:19:42 2020 -0500

    [HUDI-115] Adding DefaultHoodieRecordPayload to honor ordering with 
combineAndGetUpdateValue (#2311)
    
    * Added ability to pass in `properties` to payload methods, so they can 
perform table/record specific merges
    * Added default methods so existing payload classes are backwards 
compatible.
    * Adding DefaultHoodiePayload to honor ordering while merging two records
    * Fixing default payload based on feedback
---
 .../apache/hudi/config/HoodiePayloadConfig.java    |  74 ++++++++++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  15 +++
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   3 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |   2 +-
 .../common/model/DefaultHoodieRecordPayload.java   |  82 ++++++++++++++++
 .../hudi/common/model/HoodiePayloadProps.java      |  32 ++++++
 .../hudi/common/model/HoodieRecordPayload.java     |  73 ++++++++++----
 .../model/TestDefaultHoodieRecordPayload.java      | 107 +++++++++++++++++++++
 .../java/org/apache/hudi/util/StreamerUtil.java    |   3 +
 .../integ/testsuite/HoodieTestSuiteWriter.java     |   4 +
 .../main/java/org/apache/hudi/DataSourceUtils.java |   7 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   1 -
 .../org/apache/hudi/TestDataSourceDefaults.scala   |  62 +++++++++++-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   3 +
 14 files changed, 444 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
new file mode 100644
index 0000000..442bd02
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.config.HoodieMemoryConfig.Builder;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_ORDERING_FIELD_VAL;
+import static 
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP;
+
+/**
+ * Hoodie payload related configs.
+ */
+public class HoodiePayloadConfig extends DefaultHoodieConfig {
+
+  public HoodiePayloadConfig(Properties props) {
+    super(props);
+  }
+
+  public static HoodiePayloadConfig.Builder newBuilder() {
+    return new HoodiePayloadConfig.Builder();
+  }
+
+  public static class Builder {
+
+    private final Properties props = new Properties();
+
+    public Builder fromFile(File propertiesFile) throws IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.props.load(reader);
+        return this;
+      }
+    }
+
+    public Builder fromProperties(Properties props) {
+      this.props.putAll(props);
+      return this;
+    }
+
+    public Builder withPayloadOrderingField(String payloadOrderingField) {
+      props.setProperty(PAYLOAD_ORDERING_FIELD_PROP, 
String.valueOf(payloadOrderingField));
+      return this;
+    }
+
+    public HoodiePayloadConfig build() {
+      HoodiePayloadConfig config = new HoodiePayloadConfig(props);
+      setDefaultOnCondition(props, 
!props.containsKey(PAYLOAD_ORDERING_FIELD_PROP), 
DEFAULT_PAYLOAD_ORDERING_FIELD_VAL,
+          String.valueOf(DEFAULT_PAYLOAD_ORDERING_FIELD_VAL));
+      return config;
+    }
+  }
+
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 8c22cab..e5baaf6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -145,6 +145,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   // We keep track of original config and rewritten config
   private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
   private FileSystemViewStorageConfig viewStorageConfig;
+  private HoodiePayloadConfig hoodiePayloadConfig;
 
   private EngineType engineType;
 
@@ -163,6 +164,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     this.consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
     this.clientSpecifiedViewStorageConfig = 
FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
     this.viewStorageConfig = clientSpecifiedViewStorageConfig;
+    this.hoodiePayloadConfig = 
HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
   }
 
   public static HoodieWriteConfig.Builder newBuilder() {
@@ -744,6 +746,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return clientSpecifiedViewStorageConfig;
   }
 
+  public HoodiePayloadConfig getPayloadConfig() {
+    return hoodiePayloadConfig;
+  }
+
   /**
    * Commit call back configs.
    */
@@ -804,6 +810,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     private boolean isViewConfigSet = false;
     private boolean isConsistencyGuardSet = false;
     private boolean isCallbackConfigSet = false;
+    private boolean isPayloadConfigSet = false;
 
     public Builder withEngineType(EngineType engineType) {
       this.engineType = engineType;
@@ -944,6 +951,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
       return this;
     }
 
+    public Builder withPayloadConfig(HoodiePayloadConfig payloadConfig) {
+      props.putAll(payloadConfig.getProps());
+      isPayloadConfigSet = true;
+      return this;
+    }
+
     public Builder withAutoCommit(boolean autoCommit) {
       props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
       return this;
@@ -1084,6 +1097,8 @@ public class HoodieWriteConfig extends 
DefaultHoodieConfig {
           ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
       setDefaultOnCondition(props, !isCallbackConfigSet,
           
HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
+      setDefaultOnCondition(props, !isPayloadConfigSet,
+          HoodiePayloadConfig.newBuilder().fromProperties(props).build());
 
       setDefaultOnCondition(props, 
!props.containsKey(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION),
           EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, 
DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index cab7283..1b98de4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -225,7 +225,8 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
       HoodieRecord<T> hoodieRecord = new 
HoodieRecord<>(keyToNewRecords.get(key));
       try {
         Option<IndexedRecord> combinedAvroRecord =
-            hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, 
useWriterSchema ? writerSchemaWithMetafields : writerSchema);
+            hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, 
useWriterSchema ? writerSchemaWithMetafields : writerSchema,
+                config.getPayloadConfig().getProps());
         if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
           /*
            * ONLY WHEN 1) we have an update for this key AND 2) We are able to 
successfully write the the combined new
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 3b356a7..b1dcff9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -442,7 +442,7 @@ public class HoodieAvroUtils {
    * @param fieldValue avro field value
    * @return field value either converted (for certain data types) or as it is.
    */
-  private static Object convertValueForSpecificDataTypes(Schema fieldSchema, 
Object fieldValue) {
+  public static Object convertValueForSpecificDataTypes(Schema fieldSchema, 
Object fieldValue) {
     if (fieldSchema == null) {
       return fieldValue;
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
new file mode 100644
index 0000000..8fc75a1
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
+import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;
+
+/**
+ * {@link HoodieRecordPayload} impl that honors ordering field in both 
preCombine and combineAndGetUpdateValue.
+ * <p>
+ * 1. preCombine - Picks the latest delta record for a key, based on an 
ordering field 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest 
record based on ordering field value.
+ */
+public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload 
{
+
+  public DefaultHoodieRecordPayload(GenericRecord record, Comparable 
orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public DefaultHoodieRecordPayload(Option<GenericRecord> record) {
+    this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural 
order
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
+    if (recordBytes.length == 0) {
+      return Option.empty();
+    }
+    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+    /*
+     * Combining strategy here returns currentValue on disk if incoming record 
is older.
+     * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
+     * or an insert/update record. In any case, if it is older than the record 
in disk, the currentValue
+     * in disk is returned (to be rewritten with new commit time).
+     *
+     * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation 
type do not hit this code path
+     * and need to be dealt with separately.
+     */
+    Object persistedOrderingVal = getNestedFieldVal((GenericRecord) 
currentValue, 
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP), true);
+    Comparable incomingOrderingVal = (Comparable) 
getNestedFieldVal(incomingRecord, 
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP), false);
+
+    // Null check is needed here to support schema evolution. The record in 
storage may be from old schema where
+    // the new ordering column might not be present and hence returns null.
+    if (persistedOrderingVal != null && ((Comparable) 
persistedOrderingVal).compareTo(incomingOrderingVal) > 0) {
+      return Option.of(currentValue);
+    }
+
+    /*
+     * We reached a point where the value is disk is older than the incoming 
record.
+     * Now check if the incoming record is a delete record.
+     */
+    if (isDeleteRecord(incomingRecord)) {
+      return Option.empty();
+    } else {
+      return Option.of(incomingRecord);
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
new file mode 100644
index 0000000..5d71ec3
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+/**
+ * Holds payload properties that implementation of {@link HoodieRecordPayload} 
can leverage.
+ * Since both payload classes and HoodiePayloadConfig needs to access these 
props, storing it here in hudi-common.
+ */
+public class HoodiePayloadProps {
+
+  // payload ordering field. This could be used to merge incoming record with 
that in storage. Implementations of
+  // {@link HoodieRecordPayload} can leverage if required.
+  public static final String PAYLOAD_ORDERING_FIELD_PROP = 
"hoodie.payload.ordering.field";
+  public static String DEFAULT_PAYLOAD_ORDERING_FIELD_VAL = "ts";
+
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index 1afdd1b..53fcca1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -29,47 +29,84 @@ import org.apache.avro.generic.IndexedRecord;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Properties;
 
 /**
- * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which
- * depend on record specific logic.
+ * Every Hoodie table has an implementation of the 
<code>HoodieRecordPayload</code> This abstracts out callbacks which depend on 
record specific logic.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends 
Serializable {
 
   /**
-   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to
-   * insert/upsert (if combining turned on in HoodieClientConfig).
+   * This method is deprecated. Please use this {@link 
#preCombine(HoodieRecordPayload, Properties)} method.
    */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  @Deprecated
+  @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
   T preCombine(T another);
 
   /**
-   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on
-   * storage and whats contained in this object.
+   * When more than one HoodieRecord have the same HoodieKey, this function 
combines them before attempting to insert/upsert by taking in a property map.
+   * Implementation can leverage the property to decide their business logic 
to do preCombine.
+   * @param another instance of another {@link HoodieRecordPayload} to be 
combined with.
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return the combined value
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default T preCombine(T another, Properties properties) {
+    return preCombine(another);
+  }
+
+  /**
+   * This methods is deprecated. Please refer to {@link 
#combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.
+   */
+  @Deprecated
+  @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
+  Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, 
Schema schema) throws IOException;
+
+  /**
+   * This methods lets you write custom merging/combining logic to produce new 
values as a function of current value on storage and whats contained
+   * in this object. Implementations can leverage properties if required.
    * <p>
-   * eg: 1) You are updating counters, you may want to add counts to 
currentValue and write back updated counts 2) You
-   * may be reading DB redo logs, and merge them with current image for a 
database row on storage
+   * eg:
+   * 1) You are updating counters, you may want to add counts to currentValue 
and write back updated counts
+   * 2) You may be reading DB redo logs, and merge them with current image for 
a database row on storage
+   * </p>
    *
    * @param currentValue Current value in storage, to merge/combine this 
payload with
    * @param schema Schema used for record
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
    * @return new combined/merged value to be written back to storage. EMPTY to 
skip writing this record.
    */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
-  Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, 
Schema schema) throws IOException;
+  default Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
+    return combineAndGetUpdateValue(currentValue, schema);
+  }
 
   /**
-   * Generates an avro record out of the given HoodieRecordPayload, to be 
written out to storage. Called when writing a
-   * new value for the given HoodieKey, wherein there is no existing record in 
storage to be combined against. (i.e
-   * insert) Return EMPTY to skip writing this record.
+   * This method is deprecated. Refer to {@link #getInsertValue(Schema, 
Properties)} for java docs.
+   * @param schema Schema used for record
+   * @return the {@link IndexedRecord} to be inserted.
    */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  @Deprecated
+  @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
   Option<IndexedRecord> getInsertValue(Schema schema) throws IOException;
 
   /**
-   * This method can be used to extract some metadata from 
HoodieRecordPayload. The metadata is passed to
-   * {@code WriteStatus.markSuccess()} and {@code WriteStatus.markFailure()} 
in order to compute some aggregate metrics
-   * using the metadata in the context of a write success or failure.
+   * Generates an avro record out of the given HoodieRecordPayload, to be 
written out to storage. Called when writing a new value for the given
+   * HoodieKey, wherein there is no existing record in storage to be combined 
against. (i.e insert) Return EMPTY to skip writing this record.
+   * Implementations can leverage properties if required.
+   * @param schema Schema used for record
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @return the {@link IndexedRecord} to be inserted.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default Option<IndexedRecord> getInsertValue(Schema schema, Properties 
properties) throws IOException {
+    return getInsertValue(schema);
+  }
+
+  /**
+   * This method can be used to extract some metadata from 
HoodieRecordPayload. The metadata is passed to {@code 
WriteStatus.markSuccess()} and
+   * {@code WriteStatus.markFailure()} in order to compute some aggregate 
metrics using the metadata in the context of a write success or failure.
+   * @return the metadata in the form of Map<String, String> if any.
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   default Option<Map<String, String>> getMetadata() {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
new file mode 100644
index 0000000..7914154
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Unit tests {@link DefaultHoodieRecordPayload}.
+ */
+public class TestDefaultHoodieRecordPayload {
+
+  private Schema schema;
+  private Properties props;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    schema = Schema.createRecord(Arrays.asList(
+        new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null),
+        new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", 
null),
+        new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
+        new Schema.Field("_hoodie_is_deleted", Schema.create(Type.BOOLEAN), 
"", false)
+    ));
+    props = new Properties();
+    props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, "ts");
+  }
+
+  @Test
+  public void testActiveRecords() throws IOException {
+    GenericRecord record1 = new GenericData.Record(schema);
+    record1.put("id", "1");
+    record1.put("partition", "partition0");
+    record1.put("ts", 0L);
+    record1.put("_hoodie_is_deleted", false);
+
+    GenericRecord record2 = new GenericData.Record(schema);
+    record2.put("id", "2");
+    record2.put("partition", "partition1");
+    record2.put("ts", 1L);
+    record2.put("_hoodie_is_deleted", false);
+
+    DefaultHoodieRecordPayload payload1 = new 
DefaultHoodieRecordPayload(record1, 1);
+    DefaultHoodieRecordPayload payload2 = new 
DefaultHoodieRecordPayload(record2, 2);
+    assertEquals(payload1.preCombine(payload2, props), payload2);
+    assertEquals(payload2.preCombine(payload1, props), payload2);
+
+    assertEquals(record1, payload1.getInsertValue(schema).get());
+    assertEquals(record2, payload2.getInsertValue(schema).get());
+
+    assertEquals(payload1.combineAndGetUpdateValue(record2, schema, 
props).get(), record2);
+    assertEquals(payload2.combineAndGetUpdateValue(record1, schema, 
props).get(), record2);
+  }
+
+  @Test
+  public void testDeletedRecord() throws IOException {
+    GenericRecord record1 = new GenericData.Record(schema);
+    record1.put("id", "1");
+    record1.put("partition", "partition0");
+    record1.put("ts", 0L);
+    record1.put("_hoodie_is_deleted", false);
+
+    GenericRecord delRecord1 = new GenericData.Record(schema);
+    delRecord1.put("id", "2");
+    delRecord1.put("partition", "partition1");
+    delRecord1.put("ts", 1L);
+    delRecord1.put("_hoodie_is_deleted", true);
+
+    DefaultHoodieRecordPayload payload1 = new 
DefaultHoodieRecordPayload(record1, 1);
+    DefaultHoodieRecordPayload payload2 = new 
DefaultHoodieRecordPayload(delRecord1, 2);
+    assertEquals(payload1.preCombine(payload2, props), payload2);
+    assertEquals(payload2.preCombine(payload1, props), payload2);
+
+    assertEquals(record1, payload1.getInsertValue(schema).get());
+    assertFalse(payload2.getInsertValue(schema).isPresent());
+
+    assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, 
props).get(), delRecord1);
+    assertFalse(payload2.combineAndGetUpdateValue(record1, schema, 
props).isPresent());
+  }
+
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index f9dacae..2905a88 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -26,6 +26,7 @@ import org.apache.hudi.client.common.EngineType;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -135,6 +136,8 @@ public class StreamerUtil {
     HoodieWriteConfig.Builder builder =
         
HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes,
 true)
             
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
+            
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
+                .build())
             .forTable(cfg.targetTableName)
             .withAutoCommit(false)
             .withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
index bf6fca7..a06c281 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.integ.testsuite;
 
 import org.apache.hadoop.conf.Configuration;
+
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
@@ -31,6 +32,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import 
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
@@ -98,6 +100,8 @@ public class HoodieTestSuiteWriter {
         HoodieWriteConfig.newBuilder().combineInput(true, 
true).withPath(cfg.targetBasePath)
             .withAutoCommit(false)
             
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
+            
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
+                .build())
             .forTable(cfg.targetTableName)
             
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
             .withProps(props);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 1cb63c9..8d3e81b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -39,13 +39,14 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.TablePathUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
-import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
 import org.apache.hudi.table.BulkInsertPartitioner;
@@ -177,10 +178,12 @@ public class DataSourceUtils {
     }
 
     return builder.forTable(tblName)
-        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             
.withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY()))
             .withInlineCompaction(inlineCompact).build())
+        
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY()))
+            .build())
         // override above with Hoodie configs specified as options.
         .withProps(parameters).build();
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 1b6e49b..0a33583 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -205,7 +205,6 @@ object DataSourceWriteOptions {
   val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
   val DEFAULT_PRECOMBINE_FIELD_OPT_VAL = "ts"
 
-
   /**
     * Payload class used. Override this, if you like to roll your own merge 
logic, when upserting/inserting.
     * This will render any value set for `PRECOMBINE_FIELD_OPT_VAL` 
in-effective
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 99e1297..4c69950 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -17,12 +17,14 @@
 
 package org.apache.hudi
 
+import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.model.{EmptyHoodieRecordPayload, HoodieKey, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{BaseAvroPayload, 
DefaultHoodieRecordPayload, EmptyHoodieRecordPayload, HoodieKey, 
HoodiePayloadProps, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.testutils.SchemaTestUtil
 import org.apache.hudi.common.util.Option
+import org.apache.hudi.config.HoodiePayloadConfig
 import org.apache.hudi.exception.{HoodieException, HoodieKeyException}
 import org.apache.hudi.keygen._
 import org.apache.hudi.testutils.KeyGeneratorTestUtilities
@@ -31,6 +33,8 @@ import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.scalatest.Assertions.fail
 
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
 /**
  * Tests on the default key generator, payload classes.
  */
@@ -567,6 +571,62 @@ class TestDataSourceDefaults {
     assertEquals("field2", combinedGR21.get("field1").toString)
   }
 
+  @Test def testOverwriteWithLatestAvroPayloadCombineAndGetUpdateValue() = {
+    val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber")
+    val fieldSchema: Schema = 
baseRecord.getSchema().getField("favoriteIntNumber").schema()
+    val props = new TypedProperties()
+    props.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, 
"favoriteIntNumber");
+
+    val basePayload = new OverwriteWithLatestAvroPayload(baseRecord, 
HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, 
baseOrderingVal).asInstanceOf[Comparable[_]])
+
+    val laterRecord = SchemaTestUtil
+      .generateAvroRecordFromJson(schema, 2, "001", "f1")
+    val laterOrderingVal: Object = laterRecord.get("favoriteIntNumber")
+    val newerPayload = new OverwriteWithLatestAvroPayload(laterRecord, 
HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, 
laterOrderingVal).asInstanceOf[Comparable[_]])
+
+    // it will provide the record with greatest combine value
+    val preCombinedPayload = basePayload.preCombine(newerPayload)
+    val precombinedGR = 
preCombinedPayload.getInsertValue(schema).get().asInstanceOf[GenericRecord]
+    assertEquals("field2", precombinedGR.get("field1").toString)
+  }
+
+  @Test def testDefaultHoodieRecordPayloadCombineAndGetUpdateValue() = {
+    val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber")
+    val fieldSchema: Schema = 
baseRecord.getSchema().getField("favoriteIntNumber").schema()
+    val props = new TypedProperties()
+    props.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, 
"favoriteIntNumber");
+
+    val laterRecord = SchemaTestUtil
+      .generateAvroRecordFromJson(schema, 2, "001", "f1")
+    val laterOrderingVal: Object = laterRecord.get("favoriteIntNumber")
+
+    val earlierRecord = SchemaTestUtil
+      .generateAvroRecordFromJson(schema, 1, "000", "f1")
+    val earlierOrderingVal: Object = earlierRecord.get("favoriteIntNumber")
+
+    val laterPayload = new DefaultHoodieRecordPayload(laterRecord,
+      HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, 
laterOrderingVal).asInstanceOf[Comparable[_]])
+
+    val earlierPayload = new DefaultHoodieRecordPayload(earlierRecord,
+      HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, 
earlierOrderingVal).asInstanceOf[Comparable[_]])
+
+    // it will provide the record with greatest combine value
+    val preCombinedPayload = laterPayload.preCombine(earlierPayload)
+    val precombinedGR = 
preCombinedPayload.getInsertValue(schema).get().asInstanceOf[GenericRecord]
+    assertEquals("field2", precombinedGR.get("field1").toString)
+    assertEquals(laterOrderingVal, precombinedGR.get("favoriteIntNumber"))
+
+    val earlierWithLater = 
earlierPayload.combineAndGetUpdateValue(laterRecord, schema, props)
+    val earlierwithLaterGR = earlierWithLater.get().asInstanceOf[GenericRecord]
+    assertEquals("field2", earlierwithLaterGR.get("field1").toString)
+    assertEquals(laterOrderingVal, earlierwithLaterGR.get("favoriteIntNumber"))
+
+    val laterWithEarlier = 
laterPayload.combineAndGetUpdateValue(earlierRecord, schema, props)
+    val laterWithEarlierGR = laterWithEarlier.get().asInstanceOf[GenericRecord]
+    assertEquals("field2", laterWithEarlierGR.get("field1").toString)
+    assertEquals(laterOrderingVal, laterWithEarlierGR.get("favoriteIntNumber"))
+  }
+
   @Test def testEmptyHoodieRecordPayload() = {
     val emptyPayload1 = new EmptyHoodieRecordPayload(baseRecord, 1)
     val laterRecord = SchemaTestUtil
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index e17c1f0..50488bc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hive.HiveSyncConfig;
@@ -619,6 +620,8 @@ public class DeltaSync implements Serializable {
             
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName)
                 // Inline compaction is disabled for continuous mode. 
otherwise enabled for MOR
                 .withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
+            
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
+                .build())
             .forTable(cfg.targetTableName)
             .withAutoCommit(autoCommit).withProps(props);
 

Reply via email to