This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 899ae70 [HUDI-1587] Add latency and freshness support (#2541)
899ae70 is described below
commit 899ae70fdb70c1511c099a64230fd91b2fe8d4ee
Author: Raymond Xu <[email protected]>
AuthorDate: Wed Mar 3 20:13:12 2021 -0800
[HUDI-1587] Add latency and freshness support (#2541)
Save min and max of event time in each commit and compute the latency and
freshness metrics.
---
.../java/org/apache/hudi/client/WriteStatus.java | 20 ++++++++
.../apache/hudi/config/HoodiePayloadConfig.java | 10 +++-
.../org/apache/hudi/metrics/HoodieMetrics.java | 22 ++++++++-
.../org/apache/hudi/metrics/TestHoodieMetrics.java | 3 ++
.../org/apache/hudi/client/TestWriteStatus.java | 5 +-
.../common/model/DefaultHoodieRecordPayload.java | 18 +++++++
.../hudi/common/model/HoodieCommitMetadata.java | 18 ++++++-
.../hudi/common/model/HoodiePayloadProps.java | 15 +++++-
.../apache/hudi/common/model/HoodieWriteStat.java | 36 ++++++++++++++
.../org/apache/hudi/common/util/DateTimeUtils.java | 40 ++++++++++++++++
.../model/TestDefaultHoodieRecordPayload.java | 38 +++++++++++++++
.../apache/hudi/common/util/TestDateTimeUtils.java | 55 ++++++++++++++++++++++
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 16 +++----
.../org/apache/hudi/TestDataSourceDefaults.scala | 9 ++--
14 files changed, 281 insertions(+), 24 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index a93f268..6d465d4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -21,20 +21,28 @@ package org.apache.hudi.client;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
import java.io.Serializable;
+import java.time.DateTimeException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
+
/**
* Status of a write operation.
*/
public class WriteStatus implements Serializable {
+ private static final Logger LOG = LogManager.getLogger(WriteStatus.class);
private static final long serialVersionUID = 1L;
private static final long RANDOM_SEED = 9038412832L;
@@ -77,6 +85,18 @@ public class WriteStatus implements Serializable {
writtenRecords.add(record);
}
totalRecords++;
+
+ // get the min and max event time for calculating latency and freshness
+ if (optionalRecordMetadata.isPresent()) {
+ String eventTimeVal =
optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null);
+ try {
+ long eventTime =
DateTimeUtils.parseDateTime(eventTimeVal).toEpochMilli();
+ stat.setMinEventTime(eventTime);
+ stat.setMaxEventTime(eventTime);
+ } catch (DateTimeException | IllegalArgumentException e) {
+ LOG.debug(String.format("Fail to parse event time value: %s",
eventTimeVal), e);
+ }
+ }
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index 8ef5575..489d23c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -19,14 +19,15 @@
package org.apache.hudi.config;
import org.apache.hudi.common.config.DefaultHoodieConfig;
-import org.apache.hudi.config.HoodieMemoryConfig.Builder;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
+import static
org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL;
import static
org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_ORDERING_FIELD_VAL;
+import static
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP;
import static
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP;
/**
@@ -63,10 +64,17 @@ public class HoodiePayloadConfig extends
DefaultHoodieConfig {
return this;
}
+ public Builder withPayloadEventTimeField(String payloadEventTimeField) {
+ props.setProperty(PAYLOAD_EVENT_TIME_FIELD_PROP,
String.valueOf(payloadEventTimeField));
+ return this;
+ }
+
public HoodiePayloadConfig build() {
HoodiePayloadConfig config = new HoodiePayloadConfig(props);
setDefaultOnCondition(props,
!props.containsKey(PAYLOAD_ORDERING_FIELD_PROP), PAYLOAD_ORDERING_FIELD_PROP,
String.valueOf(DEFAULT_PAYLOAD_ORDERING_FIELD_VAL));
+ setDefaultOnCondition(props,
!props.containsKey(PAYLOAD_EVENT_TIME_FIELD_PROP),
PAYLOAD_EVENT_TIME_FIELD_PROP,
+ String.valueOf(DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL));
return config;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index c8c112f..5db28e2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -20,6 +20,8 @@ package org.apache.hudi.metrics;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import com.codahale.metrics.Timer;
@@ -130,6 +132,7 @@ public class HoodieMetrics {
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs,
HoodieCommitMetadata metadata,
String actionType) {
+ updateCommitTimingMetrics(commitEpochTimeInMs, durationInMs, metadata,
actionType);
if (config.isMetricsOn()) {
long totalPartitionsWritten = metadata.fetchTotalPartitionsWritten();
long totalFilesInsert = metadata.fetchTotalFilesInsert();
@@ -144,7 +147,6 @@ public class HoodieMetrics {
long totalCompactedRecordsUpdated =
metadata.getTotalCompactedRecordsUpdated();
long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
long totalLogFilesSize = metadata.getTotalLogFilesSize();
- Metrics.registerGauge(getMetricsName(actionType, "duration"),
durationInMs);
Metrics.registerGauge(getMetricsName(actionType,
"totalPartitionsWritten"), totalPartitionsWritten);
Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"),
totalFilesInsert);
Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"),
totalFilesUpdate);
@@ -152,7 +154,6 @@ public class HoodieMetrics {
Metrics.registerGauge(getMetricsName(actionType,
"totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
Metrics.registerGauge(getMetricsName(actionType,
"totalInsertRecordsWritten"), totalInsertRecordsWritten);
Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"),
totalBytesWritten);
- Metrics.registerGauge(getMetricsName(actionType, "commitTime"),
commitEpochTimeInMs);
Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"),
totalTimeTakenByScanner);
Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"),
totalTimeTakenForInsert);
Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"),
totalTimeTakenForUpsert);
@@ -162,6 +163,23 @@ public class HoodieMetrics {
}
}
+ private void updateCommitTimingMetrics(long commitEpochTimeInMs, long
durationInMs, HoodieCommitMetadata metadata,
+ String actionType) {
+ if (config.isMetricsOn()) {
+ Pair<Option<Long>, Option<Long>> eventTimePairMinMax =
metadata.getMinAndMaxEventTime();
+ if (eventTimePairMinMax.getLeft().isPresent()) {
+ long commitLatencyInMs = commitEpochTimeInMs + durationInMs -
eventTimePairMinMax.getLeft().get();
+ Metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"),
commitLatencyInMs);
+ }
+ if (eventTimePairMinMax.getRight().isPresent()) {
+ long commitFreshnessInMs = commitEpochTimeInMs + durationInMs -
eventTimePairMinMax.getRight().get();
+ Metrics.registerGauge(getMetricsName(actionType,
"commitFreshnessInMs"), commitFreshnessInMs);
+ }
+ Metrics.registerGauge(getMetricsName(actionType, "commitTime"),
commitEpochTimeInMs);
+ Metrics.registerGauge(getMetricsName(actionType, "duration"),
durationInMs);
+ }
+ }
+
public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 41842b1..e669a67 100755
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -19,6 +19,8 @@
package org.apache.hudi.metrics;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import com.codahale.metrics.Timer;
@@ -123,6 +125,7 @@ public class TestHoodieMetrics {
when(metadata.getTotalCompactedRecordsUpdated()).thenReturn(randomValue
+ 11);
when(metadata.getTotalLogFilesCompacted()).thenReturn(randomValue + 12);
when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13);
+
when(metadata.getMinAndMaxEventTime()).thenReturn(Pair.of(Option.empty(),
Option.empty()));
metrics.updateCommitMetrics(randomValue + 14, commitTimer.stop(),
metadata, action);
String metricname = metrics.getMetricsName(action, "duration");
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
index 91878e1..78e711e 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.Test;
@@ -44,8 +45,8 @@ public class TestWriteStatus {
WriteStatus status = new WriteStatus(false, 1.0);
Throwable t = new Exception("some error in writing");
for (int i = 0; i < 1000; i++) {
- status.markSuccess(mock(HoodieRecord.class), null);
- status.markFailure(mock(HoodieRecord.class), t, null);
+ status.markSuccess(mock(HoodieRecord.class), Option.empty());
+ status.markFailure(mock(HoodieRecord.class), t, Option.empty());
}
assertEquals(1000, status.getFailedRecords().size());
assertTrue(status.hasErrors());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index 77f65cb..db41a3b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -25,6 +25,8 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
@@ -37,6 +39,9 @@ import static
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;
*/
public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload
{
+ public static final String METADATA_EVENT_TIME_KEY =
"metadata.event_time.key";
+ private Option<Object> eventTime = Option.empty();
+
public DefaultHoodieRecordPayload(GenericRecord record, Comparable
orderingVal) {
super(record, orderingVal);
}
@@ -71,6 +76,10 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
/*
* We reached a point where the value is disk is older than the incoming
record.
+ */
+ eventTime = Option.ofNullable(getNestedFieldVal(incomingRecord,
properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP),
true));
+
+ /*
* Now check if the incoming record is a delete record.
*/
if (isDeleteRecord(incomingRecord)) {
@@ -79,4 +88,13 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
return Option.of(incomingRecord);
}
}
+
+ @Override
+ public Option<Map<String, String>> getMetadata() {
+ Map<String, String> metadata = new HashMap<>();
+ if (eventTime.isPresent()) {
+ metadata.put(METADATA_EVENT_TIME_KEY, String.valueOf(eventTime.get()));
+ }
+ return metadata.isEmpty() ? Option.empty() : Option.of(metadata);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 3e760f6..bc9a4ba 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -18,14 +18,16 @@
package org.apache.hudi.common.model;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -323,6 +325,20 @@ public class HoodieCommitMetadata implements Serializable {
return totalUpsertTime;
}
+ public Pair<Option<Long>, Option<Long>> getMinAndMaxEventTime() {
+ long minEventTime = Long.MAX_VALUE;
+ long maxEventTime = Long.MIN_VALUE;
+ for (Map.Entry<String, List<HoodieWriteStat>> entry :
partitionToWriteStats.entrySet()) {
+ for (HoodieWriteStat writeStat : entry.getValue()) {
+ minEventTime = writeStat.getMinEventTime() != null ?
Math.min(writeStat.getMinEventTime(), minEventTime) : minEventTime;
+ maxEventTime = writeStat.getMaxEventTime() != null ?
Math.max(writeStat.getMaxEventTime(), maxEventTime) : maxEventTime;
+ }
+ }
+ return Pair.of(
+ minEventTime == Long.MAX_VALUE ? Option.empty() :
Option.of(minEventTime),
+ maxEventTime == Long.MIN_VALUE ? Option.empty() :
Option.of(maxEventTime));
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
index 5d71ec3..48dde2a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
@@ -24,9 +24,20 @@ package org.apache.hudi.common.model;
*/
public class HoodiePayloadProps {
- // payload ordering field. This could be used to merge incoming record with
that in storage. Implementations of
- // {@link HoodieRecordPayload} can leverage if required.
+ /**
+ * Property for payload ordering field; to be used to merge incoming record
with that in storage.
+ * Implementations of {@link HoodieRecordPayload} can leverage if required.
+ *
+ * @see DefaultHoodieRecordPayload
+ */
public static final String PAYLOAD_ORDERING_FIELD_PROP =
"hoodie.payload.ordering.field";
public static String DEFAULT_PAYLOAD_ORDERING_FIELD_VAL = "ts";
+ /**
+ * Property for payload event time field; to be used to extract source event
time info.
+ *
+ * @see DefaultHoodieRecordPayload
+ */
+ public static final String PAYLOAD_EVENT_TIME_FIELD_PROP =
"hoodie.payload.event.time.field";
+ public static String DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL = "ts";
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 9a640be..ba43526 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -143,6 +143,18 @@ public class HoodieWriteStat implements Serializable {
*/
private long fileSizeInBytes;
+ /**
+ * The earliest of incoming records' event times (Epoch ms) for calculating
latency.
+ */
+ @Nullable
+ private Long minEventTime;
+
+ /**
+ * The latest of incoming records' event times (Epoch ms) for calculating
freshness.
+ */
+ @Nullable
+ private Long maxEventTime;
+
@Nullable
@JsonIgnore
private RuntimeStats runtimeStats;
@@ -303,6 +315,30 @@ public class HoodieWriteStat implements Serializable {
this.fileSizeInBytes = fileSizeInBytes;
}
+ public Long getMinEventTime() {
+ return minEventTime;
+ }
+
+ public void setMinEventTime(Long minEventTime) {
+ if (this.minEventTime == null) {
+ this.minEventTime = minEventTime;
+ } else {
+ this.minEventTime = Math.min(minEventTime, this.minEventTime);
+ }
+ }
+
+ public Long getMaxEventTime() {
+ return maxEventTime;
+ }
+
+ public void setMaxEventTime(Long maxEventTime) {
+ if (this.maxEventTime == null) {
+ this.maxEventTime = maxEventTime;
+ } else {
+ this.maxEventTime = Math.max(maxEventTime, this.maxEventTime);
+ }
+ }
+
@Nullable
public RuntimeStats getRuntimeStats() {
return runtimeStats;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
new file mode 100644
index 0000000..1c18a77
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.time.Instant;
+import java.time.format.DateTimeParseException;
+import java.util.Objects;
+
+public class DateTimeUtils {
+
+ /**
+ * Parse input String to a {@link java.time.Instant}.
+ * @param s Input String should be Epoch time in millisecond or ISO-8601
format.
+ */
+ public static Instant parseDateTime(String s) throws DateTimeParseException {
+ ValidationUtils.checkArgument(Objects.nonNull(s), "Input String cannot be
null.");
+ try {
+ return Instant.ofEpochMilli(Long.parseLong(s));
+ } catch (NumberFormatException e) {
+ return Instant.parse(s);
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
index 7914154..4c43903 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
@@ -18,12 +18,16 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.common.util.Option;
+
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Arrays;
@@ -31,6 +35,7 @@ import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Unit tests {@link DefaultHoodieRecordPayload}.
@@ -50,6 +55,7 @@ public class TestDefaultHoodieRecordPayload {
));
props = new Properties();
props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, "ts");
+ props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP, "ts");
}
@Test
@@ -104,4 +110,36 @@ public class TestDefaultHoodieRecordPayload {
assertFalse(payload2.combineAndGetUpdateValue(record1, schema,
props).isPresent());
}
+ @Test
+ public void testGetEmptyMetadata() {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "1");
+ record.put("partition", "partition0");
+ record.put("ts", 0L);
+ record.put("_hoodie_is_deleted", false);
+ DefaultHoodieRecordPayload payload = new
DefaultHoodieRecordPayload(Option.of(record));
+ assertFalse(payload.getMetadata().isPresent());
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {1L, 1612542030000L})
+ public void testGetEventTimeInMetadata(long eventTime) throws IOException {
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition0");
+ record1.put("ts", 0L);
+ record1.put("_hoodie_is_deleted", false);
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", "1");
+ record2.put("partition", "partition0");
+ record2.put("ts", eventTime);
+ record2.put("_hoodie_is_deleted", false);
+
+ DefaultHoodieRecordPayload payload2 = new
DefaultHoodieRecordPayload(record2, eventTime);
+ payload2.combineAndGetUpdateValue(record1, schema, props);
+ assertTrue(payload2.getMetadata().isPresent());
+ assertEquals(eventTime,
+
Long.parseLong(payload2.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY)));
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDateTimeUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDateTimeUtils.java
new file mode 100644
index 0000000..996c8ba
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDateTimeUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.format.DateTimeParseException;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestDateTimeUtils {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"0", "1612542030000", "2020-01-01T01:01:00Z",
"1970-01-01T00:00:00.123456Z"})
+ public void testParseStringIntoInstant(String s) {
+ assertDoesNotThrow(() -> {
+ DateTimeUtils.parseDateTime(s);
+ });
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"#", "0L", ""})
+ public void testParseDateTimeThrowsException(String s) {
+ assertThrows(DateTimeParseException.class, () -> {
+ DateTimeUtils.parseDateTime(s);
+ });
+ }
+
+ @Test
+ public void testParseDateTimeWithNull() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ DateTimeUtils.parseDateTime(null);
+ });
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index d204d2d..8c8655b 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -18,24 +18,22 @@
package org.apache.hudi
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
+import org.apache.hadoop.conf.Configuration
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.config.HoodiePayloadConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
-import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.common.model.HoodiePayloadProps
-import org.apache.spark.{Partition, SerializableWritable, SparkContext,
TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeProjection}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import java.util.Properties
+import org.apache.spark.{Partition, SerializableWritable, SparkContext,
TaskContext}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -53,9 +51,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val confBroadcast = sc.broadcast(new SerializableWritable(config))
private val preCombineField = tableState.preCombineField
private val payloadProps = if (preCombineField.isDefined) {
- val properties = new Properties()
- properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP,
preCombineField.get)
- Some(properties)
+
Some(HoodiePayloadConfig.newBuilder.withPayloadOrderingField(preCombineField.get).build.getProps)
} else {
None
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 4c69950..8c3f18c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -21,7 +21,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.model.{BaseAvroPayload,
DefaultHoodieRecordPayload, EmptyHoodieRecordPayload, HoodieKey,
HoodiePayloadProps, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model._
import org.apache.hudi.common.testutils.SchemaTestUtil
import org.apache.hudi.common.util.Option
import org.apache.hudi.config.HoodiePayloadConfig
@@ -33,8 +33,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{BeforeEach, Test}
import org.scalatest.Assertions.fail
-import scala.collection.JavaConverters.mapAsJavaMapConverter
-
/**
* Tests on the default key generator, payload classes.
*/
@@ -591,10 +589,9 @@ class TestDataSourceDefaults {
}
@Test def testDefaultHoodieRecordPayloadCombineAndGetUpdateValue() = {
- val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber")
val fieldSchema: Schema =
baseRecord.getSchema().getField("favoriteIntNumber").schema()
- val props = new TypedProperties()
- props.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP,
"favoriteIntNumber");
+ val props = HoodiePayloadConfig.newBuilder()
+ .withPayloadOrderingField("favoriteIntNumber").build().getProps;
val laterRecord = SchemaTestUtil
.generateAvroRecordFromJson(schema, 2, "001", "f1")