This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 899ae70  [HUDI-1587] Add latency and freshness support (#2541)
899ae70 is described below

commit 899ae70fdb70c1511c099a64230fd91b2fe8d4ee
Author: Raymond Xu <[email protected]>
AuthorDate: Wed Mar 3 20:13:12 2021 -0800

    [HUDI-1587] Add latency and freshness support (#2541)
    
    Save min and max of event time in each commit and compute the latency and 
freshness metrics.
---
 .../java/org/apache/hudi/client/WriteStatus.java   | 20 ++++++++
 .../apache/hudi/config/HoodiePayloadConfig.java    | 10 +++-
 .../org/apache/hudi/metrics/HoodieMetrics.java     | 22 ++++++++-
 .../org/apache/hudi/metrics/TestHoodieMetrics.java |  3 ++
 .../org/apache/hudi/client/TestWriteStatus.java    |  5 +-
 .../common/model/DefaultHoodieRecordPayload.java   | 18 +++++++
 .../hudi/common/model/HoodieCommitMetadata.java    | 18 ++++++-
 .../hudi/common/model/HoodiePayloadProps.java      | 15 +++++-
 .../apache/hudi/common/model/HoodieWriteStat.java  | 36 ++++++++++++++
 .../org/apache/hudi/common/util/DateTimeUtils.java | 40 ++++++++++++++++
 .../model/TestDefaultHoodieRecordPayload.java      | 38 +++++++++++++++
 .../apache/hudi/common/util/TestDateTimeUtils.java | 55 ++++++++++++++++++++++
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     | 16 +++----
 .../org/apache/hudi/TestDataSourceDefaults.scala   |  9 ++--
 14 files changed, 281 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index a93f268..6d465d4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -21,20 +21,28 @@ package org.apache.hudi.client;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.DateTimeUtils;
 import org.apache.hudi.common.util.Option;
 
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
 import java.io.Serializable;
+import java.time.DateTimeException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
+
 /**
  * Status of a write operation.
  */
 public class WriteStatus implements Serializable {
 
+  private static final Logger LOG = LogManager.getLogger(WriteStatus.class);
   private static final long serialVersionUID = 1L;
   private static final long RANDOM_SEED = 9038412832L;
 
@@ -77,6 +85,18 @@ public class WriteStatus implements Serializable {
       writtenRecords.add(record);
     }
     totalRecords++;
+
+    // get the min and max event time for calculating latency and freshness
+    if (optionalRecordMetadata.isPresent()) {
+      String eventTimeVal = 
optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null);
+      try {
+        long eventTime = 
DateTimeUtils.parseDateTime(eventTimeVal).toEpochMilli();
+        stat.setMinEventTime(eventTime);
+        stat.setMaxEventTime(eventTime);
+      } catch (DateTimeException | IllegalArgumentException e) {
+        LOG.debug(String.format("Fail to parse event time value: %s", 
eventTimeVal), e);
+      }
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index 8ef5575..489d23c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -19,14 +19,15 @@
 package org.apache.hudi.config;
 
 import org.apache.hudi.common.config.DefaultHoodieConfig;
-import org.apache.hudi.config.HoodieMemoryConfig.Builder;
 
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
+import static 
org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL;
 import static 
org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_ORDERING_FIELD_VAL;
+import static 
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP;
 import static 
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP;
 
 /**
@@ -63,10 +64,17 @@ public class HoodiePayloadConfig extends 
DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withPayloadEventTimeField(String payloadEventTimeField) {
+      props.setProperty(PAYLOAD_EVENT_TIME_FIELD_PROP, 
String.valueOf(payloadEventTimeField));
+      return this;
+    }
+
     public HoodiePayloadConfig build() {
       HoodiePayloadConfig config = new HoodiePayloadConfig(props);
       setDefaultOnCondition(props, 
!props.containsKey(PAYLOAD_ORDERING_FIELD_PROP), PAYLOAD_ORDERING_FIELD_PROP,
           String.valueOf(DEFAULT_PAYLOAD_ORDERING_FIELD_VAL));
+      setDefaultOnCondition(props, 
!props.containsKey(PAYLOAD_EVENT_TIME_FIELD_PROP), 
PAYLOAD_EVENT_TIME_FIELD_PROP,
+              String.valueOf(DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL));
       return config;
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index c8c112f..5db28e2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -20,6 +20,8 @@ package org.apache.hudi.metrics;
 
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import com.codahale.metrics.Timer;
@@ -130,6 +132,7 @@ public class HoodieMetrics {
 
   public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, 
HoodieCommitMetadata metadata,
       String actionType) {
+    updateCommitTimingMetrics(commitEpochTimeInMs, durationInMs, metadata, 
actionType);
     if (config.isMetricsOn()) {
       long totalPartitionsWritten = metadata.fetchTotalPartitionsWritten();
       long totalFilesInsert = metadata.fetchTotalFilesInsert();
@@ -144,7 +147,6 @@ public class HoodieMetrics {
       long totalCompactedRecordsUpdated = 
metadata.getTotalCompactedRecordsUpdated();
       long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
       long totalLogFilesSize = metadata.getTotalLogFilesSize();
-      Metrics.registerGauge(getMetricsName(actionType, "duration"), 
durationInMs);
       Metrics.registerGauge(getMetricsName(actionType, 
"totalPartitionsWritten"), totalPartitionsWritten);
       Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 
totalFilesInsert);
       Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 
totalFilesUpdate);
@@ -152,7 +154,6 @@ public class HoodieMetrics {
       Metrics.registerGauge(getMetricsName(actionType, 
"totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
       Metrics.registerGauge(getMetricsName(actionType, 
"totalInsertRecordsWritten"), totalInsertRecordsWritten);
       Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 
totalBytesWritten);
-      Metrics.registerGauge(getMetricsName(actionType, "commitTime"), 
commitEpochTimeInMs);
       Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 
totalTimeTakenByScanner);
       Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 
totalTimeTakenForInsert);
       Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 
totalTimeTakenForUpsert);
@@ -162,6 +163,23 @@ public class HoodieMetrics {
     }
   }
 
+  private void updateCommitTimingMetrics(long commitEpochTimeInMs, long 
durationInMs, HoodieCommitMetadata metadata,
+      String actionType) {
+    if (config.isMetricsOn()) {
+      Pair<Option<Long>, Option<Long>> eventTimePairMinMax = 
metadata.getMinAndMaxEventTime();
+      if (eventTimePairMinMax.getLeft().isPresent()) {
+        long commitLatencyInMs = commitEpochTimeInMs + durationInMs - 
eventTimePairMinMax.getLeft().get();
+        Metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), 
commitLatencyInMs);
+      }
+      if (eventTimePairMinMax.getRight().isPresent()) {
+        long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - 
eventTimePairMinMax.getRight().get();
+        Metrics.registerGauge(getMetricsName(actionType, 
"commitFreshnessInMs"), commitFreshnessInMs);
+      }
+      Metrics.registerGauge(getMetricsName(actionType, "commitTime"), 
commitEpochTimeInMs);
+      Metrics.registerGauge(getMetricsName(actionType, "duration"), 
durationInMs);
+    }
+  }
+
   public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
     if (config.isMetricsOn()) {
       LOG.info(
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 41842b1..e669a67 100755
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.metrics;
 
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import com.codahale.metrics.Timer;
@@ -123,6 +125,7 @@ public class TestHoodieMetrics {
       when(metadata.getTotalCompactedRecordsUpdated()).thenReturn(randomValue 
+ 11);
       when(metadata.getTotalLogFilesCompacted()).thenReturn(randomValue + 12);
       when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13);
+      
when(metadata.getMinAndMaxEventTime()).thenReturn(Pair.of(Option.empty(), 
Option.empty()));
       metrics.updateCommitMetrics(randomValue + 14, commitTimer.stop(), 
metadata, action);
 
       String metricname = metrics.getMetricsName(action, "duration");
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
index 91878e1..78e711e 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
 
 import org.junit.jupiter.api.Test;
 
@@ -44,8 +45,8 @@ public class TestWriteStatus {
     WriteStatus status = new WriteStatus(false, 1.0);
     Throwable t = new Exception("some error in writing");
     for (int i = 0; i < 1000; i++) {
-      status.markSuccess(mock(HoodieRecord.class), null);
-      status.markFailure(mock(HoodieRecord.class), t, null);
+      status.markSuccess(mock(HoodieRecord.class), Option.empty());
+      status.markFailure(mock(HoodieRecord.class), t, Option.empty());
     }
     assertEquals(1000, status.getFailedRecords().size());
     assertTrue(status.hasErrors());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index 77f65cb..db41a3b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -25,6 +25,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
@@ -37,6 +39,9 @@ import static 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;
  */
 public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload 
{
 
+  public static final String METADATA_EVENT_TIME_KEY = 
"metadata.event_time.key";
+  private Option<Object> eventTime = Option.empty();
+
   public DefaultHoodieRecordPayload(GenericRecord record, Comparable 
orderingVal) {
     super(record, orderingVal);
   }
@@ -71,6 +76,10 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
 
     /*
      * We reached a point where the value is disk is older than the incoming 
record.
+     */
+    eventTime = Option.ofNullable(getNestedFieldVal(incomingRecord, 
properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP), 
true));
+
+    /*
      * Now check if the incoming record is a delete record.
      */
     if (isDeleteRecord(incomingRecord)) {
@@ -79,4 +88,13 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
       return Option.of(incomingRecord);
     }
   }
+
+  @Override
+  public Option<Map<String, String>> getMetadata() {
+    Map<String, String> metadata = new HashMap<>();
+    if (eventTime.isPresent()) {
+      metadata.put(METADATA_EVENT_TIME_KEY, String.valueOf(eventTime.get()));
+    }
+    return metadata.isEmpty() ? Option.empty() : Option.of(metadata);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 3e760f6..bc9a4ba 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -18,14 +18,16 @@
 
 package org.apache.hudi.common.model;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -323,6 +325,20 @@ public class HoodieCommitMetadata implements Serializable {
     return totalUpsertTime;
   }
 
+  public Pair<Option<Long>, Option<Long>> getMinAndMaxEventTime() {
+    long minEventTime = Long.MAX_VALUE;
+    long maxEventTime = Long.MIN_VALUE;
+    for (Map.Entry<String, List<HoodieWriteStat>> entry : 
partitionToWriteStats.entrySet()) {
+      for (HoodieWriteStat writeStat : entry.getValue()) {
+        minEventTime = writeStat.getMinEventTime() != null ? 
Math.min(writeStat.getMinEventTime(), minEventTime) : minEventTime;
+        maxEventTime = writeStat.getMaxEventTime() != null ? 
Math.max(writeStat.getMaxEventTime(), maxEventTime) : maxEventTime;
+      }
+    }
+    return Pair.of(
+        minEventTime == Long.MAX_VALUE ? Option.empty() : 
Option.of(minEventTime),
+        maxEventTime == Long.MIN_VALUE ? Option.empty() : 
Option.of(maxEventTime));
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
index 5d71ec3..48dde2a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
@@ -24,9 +24,20 @@ package org.apache.hudi.common.model;
  */
 public class HoodiePayloadProps {
 
-  // payload ordering field. This could be used to merge incoming record with 
that in storage. Implementations of
-  // {@link HoodieRecordPayload} can leverage if required.
+  /**
+   * Property for payload ordering field; to be used to merge incoming record 
with that in storage.
+   * Implementations of {@link HoodieRecordPayload} can leverage if required.
+   *
+   * @see DefaultHoodieRecordPayload
+   */
   public static final String PAYLOAD_ORDERING_FIELD_PROP = 
"hoodie.payload.ordering.field";
   public static String DEFAULT_PAYLOAD_ORDERING_FIELD_VAL = "ts";
 
+  /**
+   * Property for payload event time field; to be used to extract source event 
time info.
+   *
+   * @see DefaultHoodieRecordPayload
+   */
+  public static final String PAYLOAD_EVENT_TIME_FIELD_PROP = 
"hoodie.payload.event.time.field";
+  public static String DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL = "ts";
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 9a640be..ba43526 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -143,6 +143,18 @@ public class HoodieWriteStat implements Serializable {
    */
   private long fileSizeInBytes;
 
+  /**
+   * The earliest of incoming records' event times (Epoch ms) for calculating 
latency.
+   */
+  @Nullable
+  private Long minEventTime;
+
+  /**
+   * The latest of incoming records' event times (Epoch ms) for calculating 
freshness.
+   */
+  @Nullable
+  private Long maxEventTime;
+
   @Nullable
   @JsonIgnore
   private RuntimeStats runtimeStats;
@@ -303,6 +315,30 @@ public class HoodieWriteStat implements Serializable {
     this.fileSizeInBytes = fileSizeInBytes;
   }
 
+  public Long getMinEventTime() {
+    return minEventTime;
+  }
+
+  public void setMinEventTime(Long minEventTime) {
+    if (this.minEventTime == null) {
+      this.minEventTime = minEventTime;
+    } else {
+      this.minEventTime = Math.min(minEventTime, this.minEventTime);
+    }
+  }
+
+  public Long getMaxEventTime() {
+    return maxEventTime;
+  }
+
+  public void setMaxEventTime(Long maxEventTime) {
+    if (this.maxEventTime == null) {
+      this.maxEventTime = maxEventTime;
+    } else {
+      this.maxEventTime = Math.max(maxEventTime, this.maxEventTime);
+    }
+  }
+
   @Nullable
   public RuntimeStats getRuntimeStats() {
     return runtimeStats;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
new file mode 100644
index 0000000..1c18a77
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.time.Instant;
+import java.time.format.DateTimeParseException;
+import java.util.Objects;
+
+public class DateTimeUtils {
+
+  /**
+   * Parse input String to a {@link java.time.Instant}.
+   * @param s Input String should be Epoch time in millisecond or ISO-8601 
format.
+   */
+  public static Instant parseDateTime(String s) throws DateTimeParseException {
+    ValidationUtils.checkArgument(Objects.nonNull(s), "Input String cannot be 
null.");
+    try {
+      return Instant.ofEpochMilli(Long.parseLong(s));
+    } catch (NumberFormatException e) {
+      return Instant.parse(s);
+    }
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
index 7914154..4c43903 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
@@ -18,12 +18,16 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.common.util.Option;
+
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -31,6 +35,7 @@ import java.util.Properties;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Unit tests {@link DefaultHoodieRecordPayload}.
@@ -50,6 +55,7 @@ public class TestDefaultHoodieRecordPayload {
     ));
     props = new Properties();
     props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, "ts");
+    props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP, "ts");
   }
 
   @Test
@@ -104,4 +110,36 @@ public class TestDefaultHoodieRecordPayload {
     assertFalse(payload2.combineAndGetUpdateValue(record1, schema, 
props).isPresent());
   }
 
+  @Test
+  public void testGetEmptyMetadata() {
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "1");
+    record.put("partition", "partition0");
+    record.put("ts", 0L);
+    record.put("_hoodie_is_deleted", false);
+    DefaultHoodieRecordPayload payload = new 
DefaultHoodieRecordPayload(Option.of(record));
+    assertFalse(payload.getMetadata().isPresent());
+  }
+
+  @ParameterizedTest
+  @ValueSource(longs = {1L, 1612542030000L})
+  public void testGetEventTimeInMetadata(long eventTime) throws IOException {
+    GenericRecord record1 = new GenericData.Record(schema);
+    record1.put("id", "1");
+    record1.put("partition", "partition0");
+    record1.put("ts", 0L);
+    record1.put("_hoodie_is_deleted", false);
+
+    GenericRecord record2 = new GenericData.Record(schema);
+    record2.put("id", "1");
+    record2.put("partition", "partition0");
+    record2.put("ts", eventTime);
+    record2.put("_hoodie_is_deleted", false);
+
+    DefaultHoodieRecordPayload payload2 = new 
DefaultHoodieRecordPayload(record2, eventTime);
+    payload2.combineAndGetUpdateValue(record1, schema, props);
+    assertTrue(payload2.getMetadata().isPresent());
+    assertEquals(eventTime,
+        
Long.parseLong(payload2.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY)));
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDateTimeUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDateTimeUtils.java
new file mode 100644
index 0000000..996c8ba
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDateTimeUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.format.DateTimeParseException;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestDateTimeUtils {
+
+  @ParameterizedTest
+  @ValueSource(strings = {"0", "1612542030000", "2020-01-01T01:01:00Z", 
"1970-01-01T00:00:00.123456Z"})
+  public void testParseStringIntoInstant(String s) {
+    assertDoesNotThrow(() -> {
+      DateTimeUtils.parseDateTime(s);
+    });
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"#", "0L", ""})
+  public void testParseDateTimeThrowsException(String s) {
+    assertThrows(DateTimeParseException.class, () -> {
+      DateTimeUtils.parseDateTime(s);
+    });
+  }
+
+  @Test
+  public void testParseDateTimeWithNull() {
+    assertThrows(IllegalArgumentException.class, () -> {
+      DateTimeUtils.parseDateTime(null);
+    });
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index d204d2d..8c8655b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -18,24 +18,22 @@
 
 package org.apache.hudi
 
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
+import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.config.HoodiePayloadConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
 import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
-import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.common.model.HoodiePayloadProps
-import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeProjection}
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import java.util.Properties
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
TaskContext}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -53,9 +51,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
   private val confBroadcast = sc.broadcast(new SerializableWritable(config))
   private val preCombineField = tableState.preCombineField
   private val payloadProps = if (preCombineField.isDefined) {
-    val properties = new Properties()
-    properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, 
preCombineField.get)
-    Some(properties)
+    
Some(HoodiePayloadConfig.newBuilder.withPayloadOrderingField(preCombineField.get).build.getProps)
   } else {
     None
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 4c69950..8c3f18c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -21,7 +21,7 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.model.{BaseAvroPayload, 
DefaultHoodieRecordPayload, EmptyHoodieRecordPayload, HoodieKey, 
HoodiePayloadProps, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model._
 import org.apache.hudi.common.testutils.SchemaTestUtil
 import org.apache.hudi.common.util.Option
 import org.apache.hudi.config.HoodiePayloadConfig
@@ -33,8 +33,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.scalatest.Assertions.fail
 
-import scala.collection.JavaConverters.mapAsJavaMapConverter
-
 /**
  * Tests on the default key generator, payload classes.
  */
@@ -591,10 +589,9 @@ class TestDataSourceDefaults {
   }
 
   @Test def testDefaultHoodieRecordPayloadCombineAndGetUpdateValue() = {
-    val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber")
     val fieldSchema: Schema = 
baseRecord.getSchema().getField("favoriteIntNumber").schema()
-    val props = new TypedProperties()
-    props.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, 
"favoriteIntNumber");
+    val props = HoodiePayloadConfig.newBuilder()
+      .withPayloadOrderingField("favoriteIntNumber").build().getProps;
 
     val laterRecord = SchemaTestUtil
       .generateAvroRecordFromJson(schema, 2, "001", "f1")

Reply via email to