This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 46aa15bd564e823ca6984992a110534de9611ece Author: 苏承祥 <[email protected]> AuthorDate: Fri Feb 3 16:02:41 2023 +0800 [HUDI-5551] support seconds unit on event_time metrics (#7664) * event_time support seconds Co-authored-by: 苏承祥 <[email protected]> --- .../java/org/apache/hudi/client/WriteStatus.java | 13 ++- .../org/apache/hudi/client/TestWriteStatus.java | 92 ++++++++++++++++++++++ 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java index b306d6c5400..54e88fcca22 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java @@ -101,7 +101,18 @@ public class WriteStatus implements Serializable { String eventTimeVal = optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null); try { if (!StringUtils.isNullOrEmpty(eventTimeVal)) { - long eventTime = DateTimeUtils.parseDateTime(eventTimeVal).toEpochMilli(); + int length = eventTimeVal.length(); + long millisEventTime; + // eventTimeVal in seconds unit + if (length == 10) { + millisEventTime = Long.parseLong(eventTimeVal) * 1000; + } else if (length == 13) { + // eventTimeVal in millis unit + millisEventTime = Long.parseLong(eventTimeVal); + } else { + throw new IllegalArgumentException("not support event_time format:" + eventTimeVal); + } + long eventTime = DateTimeUtils.parseDateTime(Long.toString(millisEventTime)).toEpochMilli(); stat.setMinEventTime(eventTime); stat.setMaxEventTime(eventTime); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java index 78e711ed701..99fb76650f9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java @@ -18,12 +18,19 @@ package org.apache.hudi.client; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.util.Option; import org.junit.jupiter.api.Test; +import java.util.HashMap; +import java.util.Map; + import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -53,4 +60,89 @@ public class TestWriteStatus { assertTrue(status.getWrittenRecords().isEmpty()); assertEquals(2000, status.getTotalRecords()); } + + @Test + public void testSuccessWithEventTime() { + // test with empty eventTime + WriteStatus status = new WriteStatus(false, 1.0); + status.setStat(new HoodieWriteStat()); + for (int i = 0; i < 1000; i++) { + Map<String, String> metadata = new HashMap<>(); + metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, ""); + status.markSuccess(mock(HoodieRecord.class), Option.of(metadata)); + } + assertEquals(1000, status.getTotalRecords()); + assertFalse(status.hasErrors()); + assertNull(status.getStat().getMaxEventTime()); + assertNull(status.getStat().getMinEventTime()); + + // test with null eventTime + status = new WriteStatus(false, 1.0); + status.setStat(new HoodieWriteStat()); + for (int i = 0; i < 1000; i++) { + Map<String, String> metadata = new HashMap<>(); + metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, null); + status.markSuccess(mock(HoodieRecord.class), Option.of(metadata)); + } + assertEquals(1000, status.getTotalRecords()); + assertFalse(status.hasErrors()); + assertNull(status.getStat().getMaxEventTime()); + assertNull(status.getStat().getMinEventTime()); + + // test with seconds eventTime + status = new WriteStatus(false, 1.0); + status.setStat(new HoodieWriteStat()); + long minSeconds = 0L; + long maxSeconds = 0L; + for (int i = 0; i < 1000; i++) { + Map<String, String> metadata = new HashMap<>(); + long eventTime = System.currentTimeMillis() / 1000; + if (i == 0) { + minSeconds = eventTime; + } else if (i == 999) { + maxSeconds = eventTime; + } + metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(eventTime)); + status.markSuccess(mock(HoodieRecord.class), Option.of(metadata)); + } + assertEquals(1000, status.getTotalRecords()); + assertFalse(status.hasErrors()); + assertEquals(maxSeconds * 1000L, status.getStat().getMaxEventTime()); + assertEquals(minSeconds * 1000L, status.getStat().getMinEventTime()); + + // test with millis eventTime + status = new WriteStatus(false, 1.0); + status.setStat(new HoodieWriteStat()); + minSeconds = 0L; + maxSeconds = 0L; + for (int i = 0; i < 1000; i++) { + Map<String, String> metadata = new HashMap<>(); + long eventTime = System.currentTimeMillis(); + if (i == 0) { + minSeconds = eventTime; + } else if (i == 999) { + maxSeconds = eventTime; + } + metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(eventTime)); + status.markSuccess(mock(HoodieRecord.class), Option.of(metadata)); + } + assertEquals(1000, status.getTotalRecords()); + assertFalse(status.hasErrors()); + assertEquals(maxSeconds, status.getStat().getMaxEventTime()); + assertEquals(minSeconds, status.getStat().getMinEventTime()); + + // test with error format eventTime + status = new WriteStatus(false, 1.0); + status.setStat(new HoodieWriteStat()); + for (int i = 0; i < 1000; i++) { + Map<String, String> metadata = new HashMap<>(); + metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(i)); + status.markSuccess(mock(HoodieRecord.class), Option.of(metadata)); + } + assertEquals(1000, status.getTotalRecords()); + assertFalse(status.hasErrors()); + assertNull(status.getStat().getMaxEventTime()); + assertNull(status.getStat().getMinEventTime()); + + } }
