This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 03510ee736e9 [HUDI-9560] Add support event_time metadata (#13517)
03510ee736e9 is described below
commit 03510ee736e92b7a2335083b6c2429a99bfd48f0
Author: Lin Liu <[email protected]>
AuthorDate: Wed Jul 30 03:56:15 2025 -0700
[HUDI-9560] Add support event_time metadata (#13517)
* Track event_time metadata
* Use writeHandle to track event time
* Add support for other engine record types
* Add functional test
---------
Co-authored-by: danny0405 <[email protected]>
---
.../org/apache/hudi/io/HoodieAppendHandle.java | 2 +-
.../org/apache/hudi/io/HoodieCreateHandle.java | 2 +-
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 35 ++
.../org/apache/hudi/io/HoodieWriteMergeHandle.java | 8 +-
.../org/apache/hudi/io/TestHoodieWriteHandle.java | 356 +++++++++++++++++++++
.../hudi/client/model/HoodieFlinkRecord.java | 28 ++
.../hudi/common/model/HoodieSparkRecord.java | 25 ++
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 2 +-
.../hudi/common/model/HoodieAvroIndexedRecord.java | 13 +-
.../apache/hudi/common/model/HoodieAvroRecord.java | 8 +
.../hudi/common/model/HoodieEmptyRecord.java | 7 +
.../org/apache/hudi/common/model/HoodieRecord.java | 12 +
.../org/apache/hudi/common/util/ConfigUtils.java | 28 +-
.../apache/hudi/common/util/TestConfigUtils.java | 79 ++++-
.../org/apache/hudi/hadoop/HoodieHiveRecord.java | 27 ++
.../apache/hudi/hadoop/TestHoodieHiveRecord.java | 178 +++++++++++
.../hudi/functional/TestHoodieActiveTimeline.scala | 60 +++-
17 files changed, 852 insertions(+), 18 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 8dfa02d3bc8f..e6707a8688ed 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -285,8 +285,8 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
}
private void bufferRecord(HoodieRecord<T> hoodieRecord) {
- Option<Map<String, String>> recordMetadata = hoodieRecord.getMetadata();
Schema schema = useWriterSchema ? writeSchemaWithMetaFields : writeSchema;
+ Option<Map<String, String>> recordMetadata =
getRecordMetadata(hoodieRecord, schema, recordProperties);
try {
// Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
// Whether it is an update or insert record.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 16714b2446fe..d51c823e6e8b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -132,7 +132,7 @@ public class HoodieCreateHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
*/
@Override
protected void doWrite(HoodieRecord record, Schema schema, TypedProperties
props) {
- Option<Map<String, String>> recordMetadata = record.getMetadata();
+ Option<Map<String, String>> recordMetadata = getRecordMetadata(record,
schema, props);
try {
if (!HoodieOperation.isDelete(record.getOperation()) &&
!record.isDelete(schema, config.getProps())) {
if (record.shouldIgnore(schema, config.getProps())) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 61f414b45051..9d631a505ce9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -19,6 +19,7 @@
package org.apache.hudi.io;
import org.apache.hudi.avro.AvroSchemaCache;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
@@ -36,6 +37,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.LogFileCreationCallback;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -56,9 +58,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
/**
@@ -92,6 +99,9 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends
HoodieIOHandle<T, I,
List<HoodieIndexDefinition> secondaryIndexDefns = Collections.emptyList();
private boolean closed = false;
+ protected boolean isTrackingEventTimeWatermark;
+ protected boolean keepConsistentLogicalTimestamp;
+ protected String eventTimeFieldName;
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime,
String partitionPath,
String fileId, HoodieTable<T, I, K, O> hoodieTable,
TaskContextSupplier taskContextSupplier, boolean preserveMetadata) {
@@ -123,6 +133,13 @@ public abstract class HoodieWriteHandle<T, I, K, O>
extends HoodieIOHandle<T, I,
} else {
this.isSecondaryIndexStatsStreamingWritesEnabled = false;
}
+
+ // For tracking event time watermark.
+ this.eventTimeFieldName =
ConfigUtils.getEventTimeFieldName(config.getProps());
+ this.isTrackingEventTimeWatermark = this.eventTimeFieldName != null
+ && hoodieTable.getMetaClient().getTableConfig().getRecordMergeMode()
== EVENT_TIME_ORDERING
+ && ConfigUtils.isTrackingEventTimeWatermark(config.getProps());
+ this.keepConsistentLogicalTimestamp = isTrackingEventTimeWatermark &&
ConfigUtils.shouldKeepConsistentLogicalTimestamp(config.getProps());
}
private void initSecondaryIndexStats(boolean preserveMetadata) {
@@ -336,4 +353,22 @@ public abstract class HoodieWriteHandle<T, I, K, O>
extends HoodieIOHandle<T, I,
return Option.empty();
}
}
+
+ protected Option<Map<String, String>> getRecordMetadata(HoodieRecord record,
Schema schema, Properties props) {
+ Option<Map<String, String>> recordMetadata = record.getMetadata();
+ if (isTrackingEventTimeWatermark) {
+ Object eventTime = record.getColumnValueAsJava(schema,
eventTimeFieldName, props);
+ if (eventTime != null) {
+ // Append event_time.
+ Option<Schema.Field> field = AvroSchemaUtils.findNestedField(schema,
eventTimeFieldName);
+ // Field should definitely exist.
+ eventTime = record.convertColumnValueForLogicalType(
+ field.get().schema(), eventTime, keepConsistentLogicalTimestamp);
+ Map<String, String> metadata = recordMetadata.orElse(new HashMap<>());
+ metadata.put(METADATA_EVENT_TIME_KEY, String.valueOf(eventTime));
+ return Option.of(metadata);
+ }
+ }
+ return recordMetadata;
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
index 8d389d7ba88e..1c6438fb7abd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
@@ -280,7 +280,7 @@ public class HoodieWriteMergeHandle<T, I, K, O> extends
HoodieAbstractMergeHandl
* @param oldRecord The value of old record
* @param combineRecord Record created by merging the old record with the
new incoming record
* @param schema Record schema
- * @param prop Properties
+ * @param props Properties
* @param isDelete Whether the new record is a delete record
*
* @return true if the record was written successfully
@@ -290,9 +290,9 @@ public class HoodieWriteMergeHandle<T, I, K, O> extends
HoodieAbstractMergeHandl
@Nullable HoodieRecord<T> oldRecord,
Option<HoodieRecord> combineRecord,
Schema schema,
- Properties prop,
+ Properties props,
boolean isDelete) {
- Option recordMetadata = newRecord.getMetadata();
+ Option<Map<String, String>> recordMetadata = getRecordMetadata(newRecord,
schema, props);
if (!partitionPath.equals(newRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched
partition path, record partition: "
+ newRecord.getPartitionPath() + " but trying to insert into
partition: " + partitionPath);
@@ -311,7 +311,7 @@ public class HoodieWriteMergeHandle<T, I, K, O> extends
HoodieAbstractMergeHandl
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(hoodieKey,
combineRecord, oldRecord, false, writeStatus,
writeSchemaWithMetaFields, this::getNewSchema,
secondaryIndexDefns, keyGeneratorOpt, config);
}
- writeToFile(hoodieKey, combineRecord.get(), schema, prop,
preserveMetadata);
+ writeToFile(hoodieKey, combineRecord.get(), schema, props,
preserveMetadata);
recordsWritten++;
} else {
// CASE (2): A delete operation.
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
new file mode 100644
index 000000000000..f32f89eed241
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class TestHoodieWriteHandle {
+ @Mock
+ private HoodieTable mockHoodieTable;
+ @Mock
+ private HoodieTableMetaClient mockMetaClient;
+ @Mock
+ private HoodieTableConfig mockTableConfig;
+ @Mock
+ private HoodieRecordMerger mockRecordMerger;
+ @Mock
+ private HoodieWriteConfig mockWriteConfig;
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ when(mockHoodieTable.getMetaClient()).thenReturn(mockMetaClient);
+ when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig);
+ when(mockWriteConfig.getRecordMerger()).thenReturn(mockRecordMerger);
+
+ // Set up a basic schema for the write config
+ String basicSchema =
"{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}]}";
+ when(mockWriteConfig.getWriteSchema()).thenReturn(basicSchema);
+ when(mockWriteConfig.getProps()).thenReturn(new TypedProperties());
+ when(mockWriteConfig.allowOperationMetadataField()).thenReturn(false);
+
when(mockWriteConfig.getWriteStatusClassName()).thenReturn("org.apache.hudi.client.WriteStatus");
+ when(mockWriteConfig.getWriteStatusFailureFraction()).thenReturn(0.0);
+ when(mockHoodieTable.shouldTrackSuccessRecords()).thenReturn(true);
+ when(mockHoodieTable.isMetadataTable()).thenReturn(false);
+ when(mockHoodieTable.getConfig()).thenReturn(mockWriteConfig);
+
when(mockTableConfig.getTableVersion()).thenReturn(org.apache.hudi.common.table.HoodieTableVersion.EIGHT);
+
+ when(mockHoodieTable.getBaseFileExtension()).thenReturn(".parquet");
+ }
+
+ @Test
+ void
testShouldTrackEventTimeWaterMarkerAvroRecordTypeWithEventTimeOrderingAndConfigEnabled()
{
+ // Setup: AVRO record type with event time ordering and config enabled
+ boolean result = mockWriteHandle(true,
"ts").isTrackingEventTimeWaterMarker();
+ assertTrue(result, "Should track event time watermark for AVRO records
with event time ordering and config enabled");
+ }
+
+ @Test
+ void
testShouldTrackEventTimeWaterMarkerAvroRecordTypeWithEventTimeOrderingAndConfigDisabled()
{
+ // Setup: AVRO record type with event time ordering but config disabled
+ boolean result = mockWriteHandle(false,
null).isTrackingEventTimeWaterMarker();
+ assertFalse(result, "Should not track event time watermark when config is
disabled");
+ }
+
+ @Test
+ void testShouldTrackEventTimeWaterMarkerNonAvroRecordType() {
+ // Setup: Non-AVRO record type
+ boolean result = mockWriteHandle(true, "ts", false,
HoodieRecord.HoodieRecordType.SPARK, RecordMergeMode.EVENT_TIME_ORDERING)
+ .isTrackingEventTimeWaterMarker();
+ assertTrue(result, "Should track event time watermark for SPARK record
type");
+ }
+
+ @Test
+ void
testShouldTrackEventTimeWaterMarkerAvroRecordTypeWithCommitTimeOrdering() {
+ // Setup: AVRO record type but with commit time ordering
+ boolean result = mockWriteHandle(true, null, false,
HoodieRecord.HoodieRecordType.AVRO, RecordMergeMode.COMMIT_TIME_ORDERING)
+ .isTrackingEventTimeWaterMarker();
+ assertFalse(result, "Should not track event time watermark when using
commit time ordering");
+ }
+
+ @Test
+ void testAppendEventTimeMetadataWithEventTimeField() {
+ // Setup: Create a test record with event time field
+ Schema schema = Schema.createRecord("test", null, null, false);
+ schema.setFields(java.util.Arrays.asList(
+ new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
+ new Schema.Field("event_time", Schema.create(Schema.Type.LONG), null,
null)
+ ));
+
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "test_id");
+ record.put("event_time", 1234567890L);
+
+ HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record);
+
+ DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true,
"event_time");
+
+ // Test with empty metadata
+ Option<Map<String, String>> result =
+ testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new
Properties());
+
+ assertTrue(result.isPresent(), "Should return metadata when event time is
present");
+ Map<String, String> metadata = result.get();
+ assertEquals(
+ "1234567890",
+ metadata.get(METADATA_EVENT_TIME_KEY),
+ "Event time should be correctly extracted");
+ }
+
+ @Test
+ void testAppendEventTimeMetadataWithExistingMetadata() {
+ // Setup: Create a test record with event time field
+ Schema schema = Schema.createRecord("test", null, null, false);
+ schema.setFields(java.util.Arrays.asList(
+ new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
+ new Schema.Field("event_time", Schema.create(Schema.Type.LONG), null,
null)
+ ));
+
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "test_id");
+ record.put("event_time", 1234567890L);
+
+ Map<String, String> existingMetadata = new HashMap<>();
+ existingMetadata.put("existing_key", "existing_value");
+ HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record,
HoodieOperation.INSERT, Option.of(existingMetadata));
+
+ DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true,
"event_time");
+
+ // Test with existing metadata
+ Option<Map<String, String>> result =
+ testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new
Properties());
+
+ assertTrue(result.isPresent(), "Should return metadata when event time is
present");
+ Map<String, String> metadata = result.get();
+ assertEquals(
+ "1234567890",
+ metadata.get(METADATA_EVENT_TIME_KEY),
+ "Event time should be correctly extracted");
+ assertEquals(
+ "existing_value",
+ metadata.get("existing_key"),
+ "Existing metadata should be preserved");
+ }
+
+ @Test
+ void testAppendEventTimeMetadataWithoutEventTimeField() {
+ // Setup: Create a test record without event time field
+ Schema schema = Schema.createRecord("test", null, null, false);
+ schema.setFields(java.util.Arrays.asList(
+ new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null)
+ ));
+
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "test_id");
+
+ HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record);
+
+ DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true,
"event_time");
+
+ Option<Map<String, String>> result =
+ testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new
Properties());
+
+ assertFalse(result.isPresent(), "Should return empty when event time field
is not present");
+ }
+
+ @Test
+ void testAppendEventTimeMetadataWithNullEventTimeValue() {
+ // Setup: Create a test record with null event time value
+ Schema schema = Schema.createRecord("test", null, null, false);
+ schema.setFields(java.util.Arrays.asList(
+ new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
+ new Schema.Field("event_time", Schema.create(Schema.Type.LONG), null,
null)
+ ));
+
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "test_id");
+ record.put("event_time", null);
+
+ HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record);
+
+ DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true,
"event_time");
+
+ Option<Map<String, String>> result =
+ testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new
Properties());
+
+ assertFalse(result.isPresent(), "Should return empty when event time value
is null");
+ }
+
+ @Test
+ void testAppendEventTimeMetadataWithStringEventTime() {
+ // Setup: Create a test record with string event time
+ Schema schema = Schema.createRecord("test", null, null, false);
+ schema.setFields(java.util.Arrays.asList(
+ new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
+ new Schema.Field("event_time", Schema.create(Schema.Type.STRING),
null, null)
+ ));
+
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "test_id");
+ record.put("event_time", "2023-01-01T00:00:00Z");
+
+ HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record);
+
+ DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true,
"event_time");
+
+ Option<Map<String, String>> result =
+ testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new
Properties());
+
+ assertTrue(result.isPresent(), "Should return metadata when event time is
present");
+ Map<String, String> metadata = result.get();
+ assertEquals(
+ "2023-01-01T00:00:00Z",
+ metadata.get(METADATA_EVENT_TIME_KEY),
+ "String event time should be correctly extracted");
+ }
+
+ @Test
+ void testAppendEventTimeMetadataWithNestedEventTimeField() {
+ // Setup: Create a test record with nested event time field
+ Schema nestedSchema = Schema.createRecord("nested", null, null, false);
+ nestedSchema.setFields(java.util.Arrays.asList(
+ new Schema.Field("event_time", Schema.create(Schema.Type.LONG), null,
null)
+ ));
+
+ Schema schema = Schema.createRecord("test", null, null, false);
+ schema.setFields(java.util.Arrays.asList(
+ new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
+ new Schema.Field("nested", nestedSchema, null, null)
+ ));
+
+ GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+ nestedRecord.put("event_time", 1234567890L);
+
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "test_id");
+ record.put("nested", nestedRecord);
+
+ DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true,
"nested.event_time");
+
+ HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record);
+
+ Option<Map<String, String>> result =
testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new
Properties());
+
+ assertTrue(result.isPresent(), "Should return metadata when nested event
time is present");
+ Map<String, String> metadata = result.get();
+ assertEquals(
+ "1234567890",
+ metadata.get(METADATA_EVENT_TIME_KEY),
+ "Nested event time should be correctly extracted");
+ }
+
+ private DummyHoodieWriteHandle mockWriteHandle(boolean
isTrackingEventTimeMetadata, String eventTimeField) {
+ return mockWriteHandle(isTrackingEventTimeMetadata, eventTimeField, false,
HoodieRecord.HoodieRecordType.AVRO, RecordMergeMode.EVENT_TIME_ORDERING);
+ }
+
+ private DummyHoodieWriteHandle mockWriteHandle(
+ boolean isTrackingEventTimeMetadata,
+ String eventTimeField,
+ boolean keepConsistentLogicalTimestamp,
+ HoodieRecord.HoodieRecordType recordType,
+ RecordMergeMode mergeMode) {
+ when(mockRecordMerger.getRecordType()).thenReturn(recordType);
+ when(mockTableConfig.getRecordMergeMode()).thenReturn(mergeMode);
+ TypedProperties props = new TypedProperties();
+ props.put("hoodie.write.track.event.time.watermark",
String.valueOf(isTrackingEventTimeMetadata));
+ if (eventTimeField != null) {
+ props.put("hoodie.payload.event.time.field", eventTimeField);
+ }
+
props.put("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled",
String.valueOf(keepConsistentLogicalTimestamp));
+ when(mockWriteConfig.getProps()).thenReturn(props);
+
+ TaskContextSupplier taskContextSupplier = new LocalTaskContextSupplier();
+ return new DummyHoodieWriteHandle(
+ mockWriteConfig,
+ "test_instant",
+ "test_partition",
+ "test_file_id",
+ mockHoodieTable,
+ taskContextSupplier,
+ false);
+ }
+
+ // Test implementation class to access private methods
+ private static class DummyHoodieWriteHandle extends
HoodieWriteHandle<Object, Object, Object, Object> {
+ public DummyHoodieWriteHandle(HoodieWriteConfig config,
+ String instantTime,
+ String partitionPath,
+ String fileId,
+ HoodieTable<Object, Object, Object, Object>
hoodieTable,
+ TaskContextSupplier taskContextSupplier,
+ boolean preserveMetadata) {
+ super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier, preserveMetadata);
+ }
+
+ public boolean isTrackingEventTimeWaterMarker() {
+ return isTrackingEventTimeWatermark;
+ }
+
+ public Option<Map<String, String>>
testAppendEventTimeMetadata(HoodieRecord record, Schema schema, Properties
props) {
+ return getRecordMetadata(record, schema, props);
+ }
+
+ @Override
+ public java.util.List<org.apache.hudi.client.WriteStatus> close() {
+ return java.util.Collections.emptyList();
+ }
+
+ @Override
+ public org.apache.hudi.common.model.IOType getIOType() {
+ return org.apache.hudi.common.model.IOType.MERGE;
+ }
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index 1231bab56b27..f942d4cf8714 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -38,14 +38,19 @@ import org.apache.hudi.util.RowProjection;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.utils.JoinedRowData;
import java.io.ByteArrayOutputStream;
+import java.time.LocalDate;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
@@ -145,6 +150,29 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
throw new UnsupportedOperationException("Not supported for " +
this.getClass().getSimpleName());
}
+ @Override
+ public Object convertColumnValueForLogicalType(Schema fieldSchema,
+ Object fieldValue,
+ boolean
keepConsistentLogicalTimestamp) {
+ if (fieldValue == null) {
+ return null;
+ }
+ LogicalType logicalType = fieldSchema.getLogicalType();
+
+ if (logicalType == LogicalTypes.date()) {
+ return LocalDate.ofEpochDay(((Integer) fieldValue).longValue());
+ } else if (logicalType == LogicalTypes.timestampMillis() &&
keepConsistentLogicalTimestamp) {
+ TimestampData ts = (TimestampData) fieldValue;
+ return ts.getMillisecond();
+ } else if (logicalType == LogicalTypes.timestampMicros() &&
keepConsistentLogicalTimestamp) {
+ TimestampData ts = (TimestampData) fieldValue;
+ return ts.getMillisecond() / 1000;
+ } else if (logicalType instanceof LogicalTypes.Decimal) {
+ return ((DecimalData) fieldValue).toBigDecimal();
+ }
+ return fieldValue;
+ }
+
@Override
public Object[] getColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
throw new UnsupportedOperationException("Not supported for " +
this.getClass().getSimpleName());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index c0a62f2b4bc5..c0e966cc0de4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -34,6 +34,8 @@ import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.HoodieInternalRowUtils;
@@ -47,11 +49,13 @@ import
org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.time.LocalDate;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
@@ -383,6 +387,27 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
return kryo.readObjectOrNull(input, UnsafeRow.class);
}
+ @Override
+ public Object convertColumnValueForLogicalType(Schema fieldSchema,
+ Object fieldValue,
+ boolean
keepConsistentLogicalTimestamp) {
+ if (fieldValue == null) {
+ return null;
+ }
+ LogicalType logicalType = fieldSchema.getLogicalType();
+
+ if (logicalType == LogicalTypes.date()) {
+ return LocalDate.ofEpochDay(((Integer) fieldValue).longValue());
+ } else if (logicalType == LogicalTypes.timestampMillis() &&
keepConsistentLogicalTimestamp) {
+ return (Long) fieldValue;
+ } else if (logicalType == LogicalTypes.timestampMicros() &&
keepConsistentLogicalTimestamp) {
+ return ((Long) fieldValue) / 1000;
+ } else if (logicalType instanceof LogicalTypes.Decimal) {
+ return ((Decimal) fieldValue).toJavaBigDecimal();
+ }
+ return fieldValue;
+ }
+
private static UnsafeRow convertToUnsafeRow(InternalRow payload, StructType
schema) {
if (payload == null) {
return null;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index d462d63fd340..ca39dad56117 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -879,7 +879,7 @@ public class HoodieAvroUtils {
* @param fieldValue avro field value
* @return field value either converted (for certain data types) or as it is.
*/
- private static Object convertValueForAvroLogicalTypes(Schema fieldSchema,
Object fieldValue, boolean consistentLogicalTimestampEnabled) {
+ public static Object convertValueForAvroLogicalTypes(Schema fieldSchema,
Object fieldValue, boolean consistentLogicalTimestampEnabled) {
if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
} else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMillis()
&& consistentLogicalTimestampEnabled) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index c5d8ee89094e..83bf23c606dd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -206,7 +206,10 @@ public class HoodieAvroIndexedRecord extends
HoodieRecord<IndexedRecord> {
@Override
public Option<Map<String, String>> getMetadata() {
- return Option.empty();
+ if (metaData == null) {
+ return Option.empty();
+ }
+ return metaData;
}
@Override
@@ -259,6 +262,14 @@ public class HoodieAvroIndexedRecord extends
HoodieRecord<IndexedRecord> {
return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer);
}
+ @Override
+ public Object convertColumnValueForLogicalType(Schema fieldSchema,
+ Object fieldValue,
+ boolean
keepConsistentLogicalTimestamp) {
+ return HoodieAvroUtils.convertValueForAvroLogicalTypes(
+ fieldSchema, fieldValue, keepConsistentLogicalTimestamp);
+ }
+
static void updateMetadataValuesInternal(GenericRecord avroRecord,
MetadataValues metadataValues) {
if (metadataValues.isEmpty()) {
return; // no-op
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 07b54229534d..6a871d5b3c19 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -279,4 +279,12 @@ public class HoodieAvroRecord<T extends
HoodieRecordPayload> extends HoodieRecor
protected final T readRecordPayload(Kryo kryo, Input input) {
return (T) kryo.readClassAndObject(input);
}
+
+ @Override
+ public Object convertColumnValueForLogicalType(Schema fieldSchema,
+ Object fieldValue,
+ boolean
keepConsistentLogicalTimestamp) {
+ return HoodieAvroUtils.convertValueForAvroLogicalTypes(
+ fieldSchema, fieldValue, keepConsistentLogicalTimestamp);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index 5dadeeb14ed4..a7d0d4903e3d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -188,4 +188,11 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
// NOTE: [[EmptyRecord]]'s payload is always null
return null;
}
+
+ @Override
+ public Object convertColumnValueForLogicalType(Schema fieldSchema,
+ Object fieldValue,
+ boolean
keepConsistentLogicalTimestamp) {
+ return null;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index bca0e5c963c0..48b0042b5aaa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -410,6 +410,18 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
this.sealed = true;
}
+ /**
+ * This method converts a value for a column with certain Avro Logical data
types that require special handling.
+ * <p>
+ * E.g., Logical Date Type is converted to actual Date value instead of
Epoch Integer which is how it is
+ * represented/stored in parquet.
+ * <p>
+ * E.g., Decimal Data Type is converted to actual decimal value instead of
bytes/fixed which is how it is
+ * represented/stored in parquet.
+ */
+ public abstract Object convertColumnValueForLogicalType(
+ Schema fieldSchema, Object fieldValue, boolean
keepConsistentLogicalTimestamp);
+
/**
* Get column in record to support RDDCustomColumnsSortPartitioner
* @return
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 706d6ac633cd..937748677b45 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -32,11 +32,10 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
+import edu.umd.cs.findbugs.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -53,6 +52,7 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
+import static
org.apache.hudi.keygen.constant.KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED;
public class ConfigUtils {
public static final String STREAMER_CONFIG_PREFIX = "hoodie.streamer.";
@@ -113,6 +113,30 @@ public class ConfigUtils {
return HoodieRecordPayload.getPayloadClassName(props);
}
+ /**
+ * Check if event time metadata should be tracked.
+ */
+ public static boolean isTrackingEventTimeWatermark(TypedProperties props) {
+ return props.getBoolean("hoodie.write.track.event.time.watermark", false);
+ }
+
+ /**
+ * Check if logical timestamp should be made consistent.
+ */
+ public static boolean shouldKeepConsistentLogicalTimestamp(TypedProperties
props) {
+ return Boolean.parseBoolean(props.getProperty(
+ KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+ KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+ }
+
+ /**
+ * Extract event_time field name from configuration.
+ */
+ @Nullable
+ public static String getEventTimeFieldName(TypedProperties props) {
+ return
props.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY);
+ }
+
public static List<String> split2List(String param) {
return StringUtils.split(param, ",").stream()
.map(String::trim).distinct().collect(Collectors.toList());
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index 46ad7910af13..a287092c88c3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -22,7 +22,9 @@ package org.apache.hudi.common.util;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.util.collection.ExternalSpillableMap.DiskMapType;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -38,6 +40,8 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -59,7 +63,7 @@ public class TestConfigUtils {
@ParameterizedTest
@MethodSource("separatorArgs")
- public void testToMapSucceeds(Option<String> separator) {
+ void testToMapSucceeds(Option<String> separator) {
String sepString = separator.isPresent() ? separator.get() : "\n";
Map<String, String> expectedMap = new HashMap<>();
expectedMap.put("k.1.1.2", "v1");
@@ -111,13 +115,84 @@ public class TestConfigUtils {
@ParameterizedTest
@MethodSource("separatorArgs")
- public void testToMapThrowError(Option<String> separator) {
+ void testToMapThrowError(Option<String> separator) {
String sepString = separator.isPresent() ? separator.get() : "\n";
String srcKv = String.format(
"k.1.1.2=v1=v1.1%sk.2.1.2=v2%sk.3.1.2=v3", sepString, sepString);
assertThrows(IllegalArgumentException.class, () -> toMap(srcKv,
separator));
}
+ @Test
+ void testShouldTrackEventTimeWaterMarkByConfig() {
+ TypedProperties props = new TypedProperties();
+
+ // Test default value (should be false)
+ assertFalse(ConfigUtils.isTrackingEventTimeWatermark(props));
+
+ // Test when explicitly set to true
+ props.put("hoodie.write.track.event.time.watermark", "true");
+ assertTrue(ConfigUtils.isTrackingEventTimeWatermark(props));
+
+ // Test when explicitly set to false
+ props.put("hoodie.write.track.event.time.watermark", "false");
+ assertFalse(ConfigUtils.isTrackingEventTimeWatermark(props));
+
+ // Test with boolean value
+ props.put("hoodie.write.track.event.time.watermark", true);
+ assertTrue(ConfigUtils.isTrackingEventTimeWatermark(props));
+
+ props.put("hoodie.write.track.event.time.watermark", false);
+ assertFalse(ConfigUtils.isTrackingEventTimeWatermark(props));
+ }
+
+ @Test
+ void testShouldKeepConsistentLogicalTimestamp() {
+ TypedProperties props = new TypedProperties();
+
+ // Test default value (should be false based on KeyGeneratorOptions)
+ assertFalse(ConfigUtils.shouldKeepConsistentLogicalTimestamp(props));
+
+ // Test when explicitly set to true
+
props.put(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
"true");
+ assertTrue(ConfigUtils.shouldKeepConsistentLogicalTimestamp(props));
+
+ // Test when explicitly set to false
+
props.put(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
"false");
+ assertFalse(ConfigUtils.shouldKeepConsistentLogicalTimestamp(props));
+
+ // Test with boolean value
+
props.put(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
true);
+ assertTrue(ConfigUtils.shouldKeepConsistentLogicalTimestamp(props));
+
+
props.put(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
false);
+ assertFalse(ConfigUtils.shouldKeepConsistentLogicalTimestamp(props));
+ }
+
+ @Test
+ void testGetEventTimeFieldName() {
+ TypedProperties props = new TypedProperties();
+
+ // Test when property is not set (should return empty Option)
+ assertFalse(ConfigUtils.getEventTimeFieldName(props) != null);
+
+ // Test when property is set to a field name
+ String eventTimeField = "event_timestamp";
+ props.put(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
eventTimeField);
+ assertNotNull(ConfigUtils.getEventTimeFieldName(props));
+ assertEquals(eventTimeField, ConfigUtils.getEventTimeFieldName(props));
+
+ // Test with different field name
+ String anotherField = "created_at";
+ props.put(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
anotherField);
+ assertNotNull(ConfigUtils.getEventTimeFieldName(props));
+ assertEquals(anotherField, ConfigUtils.getEventTimeFieldName(props));
+
+ // Test with empty string
+ props.put(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, "");
+ assertNotNull(ConfigUtils.getEventTimeFieldName(props));
+ assertEquals("", ConfigUtils.getEventTimeFieldName(props));
+ }
+
private Map<String, String> toMap(String config, Option<String> separator) {
if (separator.isEmpty()) {
return ConfigUtils.toMap(config);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index ddb5dbb2f24d..faa4384f7fc9 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -35,15 +35,21 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.time.LocalDate;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
@@ -135,6 +141,27 @@ public class HoodieHiveRecord extends
HoodieRecord<ArrayWritable> {
throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
}
+ @Override
+ public Object convertColumnValueForLogicalType(Schema fieldSchema,
+ Object fieldValue,
+ boolean
keepConsistentLogicalTimestamp) {
+ if (fieldValue == null) {
+ return null;
+ }
+ LogicalType logicalType = fieldSchema.getLogicalType();
+
+ if (logicalType == LogicalTypes.date()) {
+ return LocalDate.ofEpochDay(((IntWritable) fieldValue).get());
+ } else if (logicalType == LogicalTypes.timestampMillis() &&
keepConsistentLogicalTimestamp) {
+ return ((LongWritable) fieldValue).get();
+ } else if (logicalType == LogicalTypes.timestampMicros() &&
keepConsistentLogicalTimestamp) {
+ return ((LongWritable) fieldValue).get() / 1000;
+ } else if (logicalType instanceof LogicalTypes.Decimal) {
+ return ((HiveDecimalWritable)
fieldValue).getHiveDecimal().bigDecimalValue();
+ }
+ return fieldValue;
+ }
+
@Override
public Object[] getColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
Object[] objects = new Object[columns.length];
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
new file mode 100644
index 000000000000..d759d06c14c5
--- /dev/null
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the specific language
governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class TestHoodieHiveRecord {
+ private HoodieHiveRecord hoodieHiveRecord;
+ @Mock
+ private ObjectInspectorCache mockObjectInspectorCache;
+
+ @BeforeEach
+ void setUp() {
+ MockitoAnnotations.openMocks(this);
+
+ // Create a minimal HoodieHiveRecord instance with mocked dependencies
+ HoodieKey key = new HoodieKey("test-key", "test-partition");
+ ArrayWritable data = new ArrayWritable(Writable.class, new Writable[]{new
Text("test")});
+ Schema schema = Schema.create(Schema.Type.STRING);
+
+ // Create HoodieHiveRecord with mocked dependencies
+ hoodieHiveRecord = new HoodieHiveRecord(key, data, schema,
mockObjectInspectorCache);
+ }
+
+ @Test
+ void testConvertColumnValueForLogicalTypeWithNullValue() {
+ Schema dateSchema = Schema.create(Schema.Type.INT);
+ LogicalTypes.date().addToSchema(dateSchema);
+
+ Object result =
hoodieHiveRecord.convertColumnValueForLogicalType(dateSchema, null, true);
+ assertNull(result);
+ }
+
+ @Test
+ void testConvertColumnValueForLogicalTypeWithDate() {
+ Schema dateSchema = Schema.create(Schema.Type.INT);
+ LogicalTypes.date().addToSchema(dateSchema);
+
+ IntWritable dateValue = new IntWritable(18628); // 2021-01-01
+ Object result =
hoodieHiveRecord.convertColumnValueForLogicalType(dateSchema, dateValue, true);
+
+ assertEquals(LocalDate.class, result.getClass());
+ assertEquals("2021-01-01", String.valueOf(result));
+ }
+
+ @Test
+ void testConvertColumnValueForLogicalTypeWithTimestampMillis() {
+ Schema timestampMillisSchema = Schema.create(Schema.Type.LONG);
+ LogicalTypes.timestampMillis().addToSchema(timestampMillisSchema);
+
+ LongWritable timestampValue = new LongWritable(1609459200000L); //
2021-01-01 00:00:00 UTC
+ Object result =
hoodieHiveRecord.convertColumnValueForLogicalType(timestampMillisSchema,
timestampValue, true);
+
+ assertEquals(Long.class, result.getClass());
+ assertEquals("1609459200000", String.valueOf(result));
+ }
+
+ @Test
+ void testConvertColumnValueForLogicalTypeWithTimestampMillisDisabled() {
+ Schema timestampMillisSchema = Schema.create(Schema.Type.LONG);
+ LogicalTypes.timestampMillis().addToSchema(timestampMillisSchema);
+
+ LongWritable timestampValue = new LongWritable(1609459200000L);
+ Object result =
hoodieHiveRecord.convertColumnValueForLogicalType(timestampMillisSchema,
timestampValue, false);
+
+ // Should return original value when keepConsistentLogicalTimestamp is
false
+ assertEquals(LongWritable.class, result.getClass());
+ assertEquals("1609459200000", String.valueOf(result));
+ }
+
+ @Test
+ void testConvertColumnValueForLogicalTypeWithTimestampMicros() {
+ Schema timestampMicrosSchema = Schema.create(Schema.Type.LONG);
+ LogicalTypes.timestampMicros().addToSchema(timestampMicrosSchema);
+
+ LongWritable timestampValue = new LongWritable(1609459200000000L); //
2021-01-01 00:00:00 UTC in microseconds
+ Object result =
hoodieHiveRecord.convertColumnValueForLogicalType(timestampMicrosSchema,
timestampValue, true);
+
+ assertEquals(Long.class, result.getClass());
+ assertEquals("1609459200000", String.valueOf(result));
+ }
+
+ @Test
+ void testConvertColumnValueForLogicalTypeWithTimestampMicrosDisabled() {
+ Schema timestampMicrosSchema = Schema.create(Schema.Type.LONG);
+ LogicalTypes.timestampMicros().addToSchema(timestampMicrosSchema);
+
+ LongWritable timestampValue = new LongWritable(1609459200000000L);
+ Object result =
hoodieHiveRecord.convertColumnValueForLogicalType(timestampMicrosSchema,
timestampValue, false);
+
+ // Should return original value when keepConsistentLogicalTimestamp is
false
+ assertEquals(LongWritable.class, result.getClass());
+ assertEquals("1609459200000000", String.valueOf(result));
+ }
+
+ @Test
+ void testConvertColumnValueForLogicalTypeWithDecimal() {
+ Schema decimalSchema = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(10, 2).addToSchema(decimalSchema);
+
+ HiveDecimalWritable decimalValue = new HiveDecimalWritable("123.45");
+ Object result =
hoodieHiveRecord.convertColumnValueForLogicalType(decimalSchema, decimalValue,
true);
+
+ assertEquals(BigDecimal.class, result.getClass());
+ assertEquals("123.45", String.valueOf(result));
+ }
+
+ @Test
+ void testConvertColumnValueForLogicalTypeWithString() {
+ Schema stringSchema = Schema.create(Schema.Type.STRING);
+
+ Text stringValue = new Text("test string");
+ Object result =
hoodieHiveRecord.convertColumnValueForLogicalType(stringSchema, stringValue,
true);
+
+ // Should return original value for non-logical types
+ assertEquals(Text.class, result.getClass());
+ assertEquals("test string", result.toString());
+ }
+
+ @Test
+ void testConvertColumnValueForLogicalTypeWithIntWritable() {
+ Schema stringSchema = Schema.create(Schema.Type.STRING);
+
+ IntWritable intValue = new IntWritable(42);
+ Object result =
hoodieHiveRecord.convertColumnValueForLogicalType(stringSchema, intValue, true);
+
+ // Should return original value for non-logical types
+ assertEquals(IntWritable.class, result.getClass());
+ assertEquals("42", String.valueOf(result));
+ }
+
+ @Test
+ void testConvertColumnValueForLogicalTypeWithLongWritable() {
+ Schema stringSchema = Schema.create(Schema.Type.STRING);
+
+ LongWritable longValue = new LongWritable(12345L);
+ Object result =
hoodieHiveRecord.convertColumnValueForLogicalType(stringSchema, longValue,
true);
+
+ // Should return original value for non-logical types
+ assertEquals(LongWritable.class, result.getClass());
+ assertEquals("12345", String.valueOf(result));
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala
index 9aaee8b0a8c8..0e581fdbfbf1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala
@@ -30,8 +30,7 @@ import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.spark.sql._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
-import org.slf4j.LoggerFactory
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertTrue, fail}
import java.io.FileNotFoundException
@@ -41,11 +40,7 @@ import scala.collection.JavaConverters._
* Tests on HoodieTimeline using the real hudi table.
*/
class TestHoodieActiveTimeline extends HoodieSparkClientTestBase {
-
var spark: SparkSession = null
-
- private val log = LoggerFactory.getLogger(classOf[TestHoodieActiveTimeline])
-
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
@@ -259,4 +254,57 @@ class TestHoodieActiveTimeline extends
HoodieSparkClientTestBase {
case _ => fail("Should have failed with FileNotFoundException")
}
}
+
+ @Test
+ def testEventTimeTracking(): Unit = {
+ val eventTimeOpts = commonOpts ++ Map(
+ HoodieWriteConfig.TRACK_EVENT_TIME_WATERMARK.key -> "true",
+ HoodieWriteConfig.RECORD_MERGE_MODE.key -> "EVENT_TIME_ORDERING",
+ "hoodie.payload.event.time.field" -> "current_ts"
+ )
+
+ // Initial insert
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
100)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(eventTimeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ val metaClient = createMetaClient(basePath)
+ validateEventTimeMetadata(metaClient)
+
+ // Insert again
+ inputDF1.write.format("org.apache.hudi")
+ .options(eventTimeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ validateEventTimeMetadata(metaClient)
+
+ // Update records
+ val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003",
50)).asScala.toList
+ val inputDF3: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(records3, 2))
+ inputDF3.write.format("org.apache.hudi")
+ .options(eventTimeOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ validateEventTimeMetadata(metaClient)
+ }
+
+ def validateEventTimeMetadata(metaClient: HoodieTableMetaClient): Unit = {
+ val localMetaClient = HoodieTableMetaClient.reload(metaClient)
+ val ret =
localMetaClient.getActiveTimeline.getLastCommitMetadataWithValidData
+ assertTrue(ret.isPresent)
+ val stats = ret.get().getRight.getPartitionToWriteStats.values().asScala
+ for {
+ statList <- stats
+ stat <- statList.asScala
+ } {
+ assertNotNull(stat.getMaxEventTime)
+ assertNotNull(stat.getMinEventTime)
+ }
+ }
}