This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 03510ee736e9 [HUDI-9560] Add support event_time metadata (#13517)
03510ee736e9 is described below

commit 03510ee736e92b7a2335083b6c2429a99bfd48f0
Author: Lin Liu <[email protected]>
AuthorDate: Wed Jul 30 03:56:15 2025 -0700

    [HUDI-9560] Add support event_time metadata (#13517)
    
    * Track event_time metadata
    * Use writeHandle to track event time
    * Add support for other engine record types
    * Add functional test
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   2 +-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |   2 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  35 ++
 .../org/apache/hudi/io/HoodieWriteMergeHandle.java |   8 +-
 .../org/apache/hudi/io/TestHoodieWriteHandle.java  | 356 +++++++++++++++++++++
 .../hudi/client/model/HoodieFlinkRecord.java       |  28 ++
 .../hudi/common/model/HoodieSparkRecord.java       |  25 ++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |   2 +-
 .../hudi/common/model/HoodieAvroIndexedRecord.java |  13 +-
 .../apache/hudi/common/model/HoodieAvroRecord.java |   8 +
 .../hudi/common/model/HoodieEmptyRecord.java       |   7 +
 .../org/apache/hudi/common/model/HoodieRecord.java |  12 +
 .../org/apache/hudi/common/util/ConfigUtils.java   |  28 +-
 .../apache/hudi/common/util/TestConfigUtils.java   |  79 ++++-
 .../org/apache/hudi/hadoop/HoodieHiveRecord.java   |  27 ++
 .../apache/hudi/hadoop/TestHoodieHiveRecord.java   | 178 +++++++++++
 .../hudi/functional/TestHoodieActiveTimeline.scala |  60 +++-
 17 files changed, 852 insertions(+), 18 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 8dfa02d3bc8f..e6707a8688ed 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -285,8 +285,8 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   }
 
   private void bufferRecord(HoodieRecord<T> hoodieRecord) {
-    Option<Map<String, String>> recordMetadata = hoodieRecord.getMetadata();
     Schema schema = useWriterSchema ? writeSchemaWithMetaFields : writeSchema;
+    Option<Map<String, String>> recordMetadata = 
getRecordMetadata(hoodieRecord, schema, recordProperties);
     try {
       // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
       // Whether it is an update or insert record.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 16714b2446fe..d51c823e6e8b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -132,7 +132,7 @@ public class HoodieCreateHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
    */
   @Override
   protected void doWrite(HoodieRecord record, Schema schema, TypedProperties 
props) {
-    Option<Map<String, String>> recordMetadata = record.getMetadata();
+    Option<Map<String, String>> recordMetadata = getRecordMetadata(record, 
schema, props);
     try {
       if (!HoodieOperation.isDelete(record.getOperation()) && 
!record.isDelete(schema, config.getProps())) {
         if (record.shouldIgnore(schema, config.getProps())) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 61f414b45051..9d631a505ce9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.avro.AvroSchemaCache;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
@@ -36,6 +37,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.LogFileCreationCallback;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -56,9 +58,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 
 /**
@@ -92,6 +99,9 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
   List<HoodieIndexDefinition> secondaryIndexDefns = Collections.emptyList();
 
   private boolean closed = false;
+  protected boolean isTrackingEventTimeWatermark;
+  protected boolean keepConsistentLogicalTimestamp;
+  protected String eventTimeFieldName;
 
   public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, 
String partitionPath,
                            String fileId, HoodieTable<T, I, K, O> hoodieTable, 
TaskContextSupplier taskContextSupplier, boolean preserveMetadata) {
@@ -123,6 +133,13 @@ public abstract class HoodieWriteHandle<T, I, K, O> 
extends HoodieIOHandle<T, I,
     } else {
       this.isSecondaryIndexStatsStreamingWritesEnabled = false;
     }
+
+    // For tracking event time watermark.
+    this.eventTimeFieldName = 
ConfigUtils.getEventTimeFieldName(config.getProps());
+    this.isTrackingEventTimeWatermark = this.eventTimeFieldName != null
+        && hoodieTable.getMetaClient().getTableConfig().getRecordMergeMode() 
== EVENT_TIME_ORDERING
+        && ConfigUtils.isTrackingEventTimeWatermark(config.getProps());
+    this.keepConsistentLogicalTimestamp = isTrackingEventTimeWatermark && 
ConfigUtils.shouldKeepConsistentLogicalTimestamp(config.getProps());
   }
 
   private void initSecondaryIndexStats(boolean preserveMetadata) {
@@ -336,4 +353,22 @@ public abstract class HoodieWriteHandle<T, I, K, O> 
extends HoodieIOHandle<T, I,
       return Option.empty();
     }
   }
+
+  protected Option<Map<String, String>> getRecordMetadata(HoodieRecord record, 
Schema schema, Properties props) {
+    Option<Map<String, String>> recordMetadata = record.getMetadata();
+    if (isTrackingEventTimeWatermark) {
+      Object eventTime = record.getColumnValueAsJava(schema, 
eventTimeFieldName, props);
+      if (eventTime != null) {
+        // Append event_time.
+        Option<Schema.Field> field = AvroSchemaUtils.findNestedField(schema, 
eventTimeFieldName);
+        // Field should definitely exist.
+        eventTime = record.convertColumnValueForLogicalType(
+            field.get().schema(), eventTime, keepConsistentLogicalTimestamp);
+        Map<String, String> metadata = recordMetadata.orElse(new HashMap<>());
+        metadata.put(METADATA_EVENT_TIME_KEY, String.valueOf(eventTime));
+        return Option.of(metadata);
+      }
+    }
+    return recordMetadata;
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
index 8d389d7ba88e..1c6438fb7abd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
@@ -280,7 +280,7 @@ public class HoodieWriteMergeHandle<T, I, K, O> extends 
HoodieAbstractMergeHandl
    * @param oldRecord     The value of old record
    * @param combineRecord Record created by merging the old record with the 
new incoming record
    * @param schema        Record schema
-   * @param prop          Properties
+   * @param props         Properties
    * @param isDelete      Whether the new record is a delete record
    *
    * @return true if the record was written successfully
@@ -290,9 +290,9 @@ public class HoodieWriteMergeHandle<T, I, K, O> extends 
HoodieAbstractMergeHandl
                               @Nullable HoodieRecord<T> oldRecord,
                               Option<HoodieRecord> combineRecord,
                               Schema schema,
-                              Properties prop,
+                              Properties props,
                               boolean isDelete) {
-    Option recordMetadata = newRecord.getMetadata();
+    Option<Map<String, String>> recordMetadata = getRecordMetadata(newRecord, 
schema, props);
     if (!partitionPath.equals(newRecord.getPartitionPath())) {
       HoodieUpsertException failureEx = new HoodieUpsertException("mismatched 
partition path, record partition: "
           + newRecord.getPartitionPath() + " but trying to insert into 
partition: " + partitionPath);
@@ -311,7 +311,7 @@ public class HoodieWriteMergeHandle<T, I, K, O> extends 
HoodieAbstractMergeHandl
             SecondaryIndexStreamingTracker.trackSecondaryIndexStats(hoodieKey, 
combineRecord, oldRecord, false, writeStatus,
                 writeSchemaWithMetaFields, this::getNewSchema, 
secondaryIndexDefns, keyGeneratorOpt, config);
           }
-          writeToFile(hoodieKey, combineRecord.get(), schema, prop, 
preserveMetadata);
+          writeToFile(hoodieKey, combineRecord.get(), schema, props, 
preserveMetadata);
           recordsWritten++;
         } else {
           // CASE (2): A delete operation.
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
new file mode 100644
index 000000000000..f32f89eed241
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class TestHoodieWriteHandle {
+  @Mock
+  private HoodieTable mockHoodieTable;
+  @Mock
+  private HoodieTableMetaClient mockMetaClient;
+  @Mock
+  private HoodieTableConfig mockTableConfig;
+  @Mock
+  private HoodieRecordMerger mockRecordMerger;
+  @Mock
+  private HoodieWriteConfig mockWriteConfig;
+
+  @BeforeEach
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    when(mockHoodieTable.getMetaClient()).thenReturn(mockMetaClient);
+    when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig);
+    when(mockWriteConfig.getRecordMerger()).thenReturn(mockRecordMerger);
+
+    // Set up a basic schema for the write config
+    String basicSchema = 
"{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}]}";
+    when(mockWriteConfig.getWriteSchema()).thenReturn(basicSchema);
+    when(mockWriteConfig.getProps()).thenReturn(new TypedProperties());
+    when(mockWriteConfig.allowOperationMetadataField()).thenReturn(false);
+    
when(mockWriteConfig.getWriteStatusClassName()).thenReturn("org.apache.hudi.client.WriteStatus");
+    when(mockWriteConfig.getWriteStatusFailureFraction()).thenReturn(0.0);
+    when(mockHoodieTable.shouldTrackSuccessRecords()).thenReturn(true);
+    when(mockHoodieTable.isMetadataTable()).thenReturn(false);
+    when(mockHoodieTable.getConfig()).thenReturn(mockWriteConfig);
+    
when(mockTableConfig.getTableVersion()).thenReturn(org.apache.hudi.common.table.HoodieTableVersion.EIGHT);
+
+    when(mockHoodieTable.getBaseFileExtension()).thenReturn(".parquet");
+  }
+
+  @Test
+  void 
testShouldTrackEventTimeWaterMarkerAvroRecordTypeWithEventTimeOrderingAndConfigEnabled()
 {
+    // Setup: AVRO record type with event time ordering and config enabled
+    boolean result = mockWriteHandle(true, 
"ts").isTrackingEventTimeWaterMarker();
+    assertTrue(result, "Should track event time watermark for AVRO records 
with event time ordering and config enabled");
+  }
+
+  @Test
+  void 
testShouldTrackEventTimeWaterMarkerAvroRecordTypeWithEventTimeOrderingAndConfigDisabled()
 {
+    // Setup: AVRO record type with event time ordering but config disabled
+    boolean result = mockWriteHandle(false, 
null).isTrackingEventTimeWaterMarker();
+    assertFalse(result, "Should not track event time watermark when config is 
disabled");
+  }
+
+  @Test
+  void testShouldTrackEventTimeWaterMarkerNonAvroRecordType() {
+    // Setup: Non-AVRO record type
+    boolean result = mockWriteHandle(true, "ts", false, 
HoodieRecord.HoodieRecordType.SPARK, RecordMergeMode.EVENT_TIME_ORDERING)
+        .isTrackingEventTimeWaterMarker();
+    assertTrue(result, "Should track event time watermark for SPARK record 
type");
+  }
+
+  @Test
+  void 
testShouldTrackEventTimeWaterMarkerAvroRecordTypeWithCommitTimeOrdering() {
+    // Setup: AVRO record type but with commit time ordering
+    boolean result = mockWriteHandle(true, null, false, 
HoodieRecord.HoodieRecordType.AVRO, RecordMergeMode.COMMIT_TIME_ORDERING)
+        .isTrackingEventTimeWaterMarker();
+    assertFalse(result, "Should not track event time watermark when using 
commit time ordering");
+  }
+
+  @Test
+  void testAppendEventTimeMetadataWithEventTimeField() {
+    // Setup: Create a test record with event time field
+    Schema schema = Schema.createRecord("test", null, null, false);
+    schema.setFields(java.util.Arrays.asList(
+        new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
+        new Schema.Field("event_time", Schema.create(Schema.Type.LONG), null, 
null)
+    ));
+
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "test_id");
+    record.put("event_time", 1234567890L);
+
+    HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record);
+
+    DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true, 
"event_time");
+
+    // Test with empty metadata
+    Option<Map<String, String>> result =
+        testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new 
Properties());
+
+    assertTrue(result.isPresent(), "Should return metadata when event time is 
present");
+    Map<String, String> metadata = result.get();
+    assertEquals(
+        "1234567890",
+        metadata.get(METADATA_EVENT_TIME_KEY),
+        "Event time should be correctly extracted");
+  }
+
+  @Test
+  void testAppendEventTimeMetadataWithExistingMetadata() {
+    // Setup: Create a test record with event time field
+    Schema schema = Schema.createRecord("test", null, null, false);
+    schema.setFields(java.util.Arrays.asList(
+        new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
+        new Schema.Field("event_time", Schema.create(Schema.Type.LONG), null, 
null)
+    ));
+
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "test_id");
+    record.put("event_time", 1234567890L);
+
+    Map<String, String> existingMetadata = new HashMap<>();
+    existingMetadata.put("existing_key", "existing_value");
+    HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record, 
HoodieOperation.INSERT, Option.of(existingMetadata));
+
+    DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true, 
"event_time");
+
+    // Test with existing metadata
+    Option<Map<String, String>> result =
+        testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new 
Properties());
+
+    assertTrue(result.isPresent(), "Should return metadata when event time is 
present");
+    Map<String, String> metadata = result.get();
+    assertEquals(
+        "1234567890",
+        metadata.get(METADATA_EVENT_TIME_KEY),
+        "Event time should be correctly extracted");
+    assertEquals(
+        "existing_value",
+        metadata.get("existing_key"),
+        "Existing metadata should be preserved");
+  }
+
+  @Test
+  void testAppendEventTimeMetadataWithoutEventTimeField() {
+    // Setup: Create a test record without event time field
+    Schema schema = Schema.createRecord("test", null, null, false);
+    schema.setFields(java.util.Arrays.asList(
+        new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null)
+    ));
+
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "test_id");
+
+    HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record);
+
+    DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true, 
"event_time");
+
+    Option<Map<String, String>> result =
+        testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new 
Properties());
+
+    assertFalse(result.isPresent(), "Should return empty when event time field 
is not present");
+  }
+
+  @Test
+  void testAppendEventTimeMetadataWithNullEventTimeValue() {
+    // Setup: Create a test record with null event time value
+    Schema schema = Schema.createRecord("test", null, null, false);
+    schema.setFields(java.util.Arrays.asList(
+        new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
+        new Schema.Field("event_time", Schema.create(Schema.Type.LONG), null, 
null)
+    ));
+
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "test_id");
+    record.put("event_time", null);
+
+    HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record);
+
+    DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true, 
"event_time");
+
+    Option<Map<String, String>> result =
+        testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new 
Properties());
+
+    assertFalse(result.isPresent(), "Should return empty when event time value 
is null");
+  }
+
+  @Test
+  void testAppendEventTimeMetadataWithStringEventTime() {
+    // Setup: Create a test record with string event time
+    Schema schema = Schema.createRecord("test", null, null, false);
+    schema.setFields(java.util.Arrays.asList(
+        new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
+        new Schema.Field("event_time", Schema.create(Schema.Type.STRING), 
null, null)
+    ));
+
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "test_id");
+    record.put("event_time", "2023-01-01T00:00:00Z");
+
+    HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record);
+
+    DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true, 
"event_time");
+
+    Option<Map<String, String>> result =
+        testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new 
Properties());
+
+    assertTrue(result.isPresent(), "Should return metadata when event time is 
present");
+    Map<String, String> metadata = result.get();
+    assertEquals(
+        "2023-01-01T00:00:00Z",
+        metadata.get(METADATA_EVENT_TIME_KEY),
+        "String event time should be correctly extracted");
+  }
+
+  @Test
+  void testAppendEventTimeMetadataWithNestedEventTimeField() {
+    // Setup: Create a test record with nested event time field
+    Schema nestedSchema = Schema.createRecord("nested", null, null, false);
+    nestedSchema.setFields(java.util.Arrays.asList(
+        new Schema.Field("event_time", Schema.create(Schema.Type.LONG), null, 
null)
+    ));
+
+    Schema schema = Schema.createRecord("test", null, null, false);
+    schema.setFields(java.util.Arrays.asList(
+        new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
+        new Schema.Field("nested", nestedSchema, null, null)
+    ));
+
+    GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+    nestedRecord.put("event_time", 1234567890L);
+
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "test_id");
+    record.put("nested", nestedRecord);
+
+    DummyHoodieWriteHandle testWriteHandle = mockWriteHandle(true, 
"nested.event_time");
+
+    HoodieRecord hoodieRecord = new HoodieAvroIndexedRecord(null, record);
+
+    Option<Map<String, String>> result = 
testWriteHandle.testAppendEventTimeMetadata(hoodieRecord, schema, new 
Properties());
+
+    assertTrue(result.isPresent(), "Should return metadata when nested event 
time is present");
+    Map<String, String> metadata = result.get();
+    assertEquals(
+        "1234567890",
+        metadata.get(METADATA_EVENT_TIME_KEY),
+        "Nested event time should be correctly extracted");
+  }
+
+  private DummyHoodieWriteHandle mockWriteHandle(boolean 
isTrackingEventTimeMetadata, String eventTimeField) {
+    return mockWriteHandle(isTrackingEventTimeMetadata, eventTimeField, false, 
HoodieRecord.HoodieRecordType.AVRO, RecordMergeMode.EVENT_TIME_ORDERING);
+  }
+
+  private DummyHoodieWriteHandle mockWriteHandle(
+      boolean isTrackingEventTimeMetadata,
+      String eventTimeField,
+      boolean keepConsistentLogicalTimestamp,
+      HoodieRecord.HoodieRecordType recordType,
+      RecordMergeMode mergeMode) {
+    when(mockRecordMerger.getRecordType()).thenReturn(recordType);
+    when(mockTableConfig.getRecordMergeMode()).thenReturn(mergeMode);
+    TypedProperties props = new TypedProperties();
+    props.put("hoodie.write.track.event.time.watermark", 
String.valueOf(isTrackingEventTimeMetadata));
+    if (eventTimeField != null) {
+      props.put("hoodie.payload.event.time.field", eventTimeField);
+    }
+    
props.put("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled",
 String.valueOf(keepConsistentLogicalTimestamp));
+    when(mockWriteConfig.getProps()).thenReturn(props);
+
+    TaskContextSupplier taskContextSupplier = new LocalTaskContextSupplier();
+    return new DummyHoodieWriteHandle(
+        mockWriteConfig,
+        "test_instant",
+        "test_partition",
+        "test_file_id",
+        mockHoodieTable,
+        taskContextSupplier,
+        false);
+  }
+
+  // Test implementation class to access private methods
+  private static class DummyHoodieWriteHandle extends 
HoodieWriteHandle<Object, Object, Object, Object> {
+    public DummyHoodieWriteHandle(HoodieWriteConfig config,
+                                  String instantTime,
+                                  String partitionPath,
+                                  String fileId,
+                                  HoodieTable<Object, Object, Object, Object> 
hoodieTable,
+                                  TaskContextSupplier taskContextSupplier,
+                                  boolean preserveMetadata) {
+      super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, preserveMetadata);
+    }
+
+    public boolean isTrackingEventTimeWaterMarker() {
+      return isTrackingEventTimeWatermark;
+    }
+
+    public Option<Map<String, String>> 
testAppendEventTimeMetadata(HoodieRecord record, Schema schema, Properties 
props) {
+      return getRecordMetadata(record, schema, props);
+    }
+
+    @Override
+    public java.util.List<org.apache.hudi.client.WriteStatus> close() {
+      return java.util.Collections.emptyList();
+    }
+
+    @Override
+    public org.apache.hudi.common.model.IOType getIOType() {
+      return org.apache.hudi.common.model.IOType.MERGE;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index 1231bab56b27..f942d4cf8714 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -38,14 +38,19 @@ import org.apache.hudi.util.RowProjection;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.utils.JoinedRowData;
 
 import java.io.ByteArrayOutputStream;
+import java.time.LocalDate;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
@@ -145,6 +150,29 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
     throw new UnsupportedOperationException("Not supported for " + 
this.getClass().getSimpleName());
   }
 
+  @Override
+  public Object convertColumnValueForLogicalType(Schema fieldSchema,
+                                                 Object fieldValue,
+                                                 boolean 
keepConsistentLogicalTimestamp) {
+    if (fieldValue == null) {
+      return null;
+    }
+    LogicalType logicalType = fieldSchema.getLogicalType();
+
+    if (logicalType == LogicalTypes.date()) {
+      return LocalDate.ofEpochDay(((Integer) fieldValue).longValue());
+    } else if (logicalType == LogicalTypes.timestampMillis() && 
keepConsistentLogicalTimestamp) {
+      TimestampData ts = (TimestampData) fieldValue;
+      return ts.getMillisecond();
+    } else if (logicalType == LogicalTypes.timestampMicros() && 
keepConsistentLogicalTimestamp) {
+      TimestampData ts = (TimestampData) fieldValue;
+      return ts.getMillisecond() / 1000;
+    } else if (logicalType instanceof LogicalTypes.Decimal) {
+      return ((DecimalData) fieldValue).toBigDecimal();
+    }
+    return fieldValue;
+  }
+
   @Override
   public Object[] getColumnValues(Schema recordSchema, String[] columns, 
boolean consistentLogicalTimestampEnabled) {
     throw new UnsupportedOperationException("Not supported for " + 
this.getClass().getSimpleName());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index c0a62f2b4bc5..c0e966cc0de4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -34,6 +34,8 @@ import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.HoodieInternalRowUtils;
@@ -47,11 +49,13 @@ import 
org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.time.LocalDate;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
@@ -383,6 +387,27 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
     return kryo.readObjectOrNull(input, UnsafeRow.class);
   }
 
+  @Override
+  public Object convertColumnValueForLogicalType(Schema fieldSchema,
+                                                 Object fieldValue,
+                                                 boolean 
keepConsistentLogicalTimestamp) {
+    if (fieldValue == null) {
+      return null;
+    }
+    LogicalType logicalType = fieldSchema.getLogicalType();
+
+    if (logicalType == LogicalTypes.date()) {
+      return LocalDate.ofEpochDay(((Integer) fieldValue).longValue());
+    } else if (logicalType == LogicalTypes.timestampMillis() && 
keepConsistentLogicalTimestamp) {
+      return (Long) fieldValue;
+    } else if (logicalType == LogicalTypes.timestampMicros() && 
keepConsistentLogicalTimestamp) {
+      return ((Long) fieldValue) / 1000;
+    } else if (logicalType instanceof LogicalTypes.Decimal) {
+      return ((Decimal) fieldValue).toJavaBigDecimal();
+    }
+    return fieldValue;
+  }
+
   private static UnsafeRow convertToUnsafeRow(InternalRow payload, StructType 
schema) {
     if (payload == null) {
       return null;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index d462d63fd340..ca39dad56117 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -879,7 +879,7 @@ public class HoodieAvroUtils {
    * @param fieldValue  avro field value
    * @return field value either converted (for certain data types) or as it is.
    */
-  private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, 
Object fieldValue, boolean consistentLogicalTimestampEnabled) {
+  public static Object convertValueForAvroLogicalTypes(Schema fieldSchema, 
Object fieldValue, boolean consistentLogicalTimestampEnabled) {
     if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
       return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
     } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMillis() 
&& consistentLogicalTimestampEnabled) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index c5d8ee89094e..83bf23c606dd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -206,7 +206,10 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
 
   @Override
   public Option<Map<String, String>> getMetadata() {
-    return Option.empty();
+    if (metaData == null) {
+      return Option.empty();
+    }
+    return metaData;
   }
 
   @Override
@@ -259,6 +262,14 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
     return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer);
   }
 
+  @Override
+  public Object convertColumnValueForLogicalType(Schema fieldSchema,
+                                                 Object fieldValue,
+                                                 boolean 
keepConsistentLogicalTimestamp) {
+    return HoodieAvroUtils.convertValueForAvroLogicalTypes(
+        fieldSchema, fieldValue, keepConsistentLogicalTimestamp);
+  }
+
   static void updateMetadataValuesInternal(GenericRecord avroRecord, 
MetadataValues metadataValues) {
     if (metadataValues.isEmpty()) {
       return; // no-op
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 07b54229534d..6a871d5b3c19 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -279,4 +279,12 @@ public class HoodieAvroRecord<T extends 
HoodieRecordPayload> extends HoodieRecor
   protected final T readRecordPayload(Kryo kryo, Input input) {
     return (T) kryo.readClassAndObject(input);
   }
+
+  @Override
+  public Object convertColumnValueForLogicalType(Schema fieldSchema,
+                                                 Object fieldValue,
+                                                 boolean 
keepConsistentLogicalTimestamp) {
+    return HoodieAvroUtils.convertValueForAvroLogicalTypes(
+        fieldSchema, fieldValue, keepConsistentLogicalTimestamp);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index 5dadeeb14ed4..a7d0d4903e3d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -188,4 +188,11 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
     // NOTE: [[EmptyRecord]]'s payload is always null
     return null;
   }
+
+  @Override
+  public Object convertColumnValueForLogicalType(Schema fieldSchema,
+                                                 Object fieldValue,
+                                                 boolean 
keepConsistentLogicalTimestamp) {
+    return null;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index bca0e5c963c0..48b0042b5aaa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -410,6 +410,18 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     this.sealed = true;
   }
 
+  /**
+   * This method converts a value for a column with certain Avro Logical data 
types that require special handling.
+   * <p>
+   * E.g., Logical Date Type is converted to actual Date value instead of 
Epoch Integer which is how it is
+   * represented/stored in parquet.
+   * <p>
+   * E.g., Decimal Data Type is converted to actual decimal value instead of 
bytes/fixed which is how it is
+   * represented/stored in parquet.
+   */
+  public abstract Object convertColumnValueForLogicalType(
+      Schema fieldSchema, Object fieldValue, boolean 
keepConsistentLogicalTimestamp);
+
   /**
    * Get column in record to support RDDCustomColumnsSortPartitioner
    * @return
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 706d6ac633cd..937748677b45 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -32,11 +32,10 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
+import edu.umd.cs.findbugs.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -53,6 +52,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
 import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
+import static 
org.apache.hudi.keygen.constant.KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED;
 
 public class ConfigUtils {
   public static final String STREAMER_CONFIG_PREFIX = "hoodie.streamer.";
@@ -113,6 +113,30 @@ public class ConfigUtils {
     return HoodieRecordPayload.getPayloadClassName(props);
   }
 
+  /**
+   * Check if event time metadata should be tracked.
+   */
+  public static boolean isTrackingEventTimeWatermark(TypedProperties props) {
+    return props.getBoolean("hoodie.write.track.event.time.watermark", false);
+  }
+
+  /**
+   * Check if logical timestamp should be made consistent.
+   */
+  public static boolean shouldKeepConsistentLogicalTimestamp(TypedProperties 
props) {
+    return Boolean.parseBoolean(props.getProperty(
+        KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+        KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+  }
+
+  /**
+   * Extract event_time field name from configuration.
+   */
+  @Nullable
+  public static String getEventTimeFieldName(TypedProperties props) {
+    return 
props.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY);
+  }
+
   public static List<String> split2List(String param) {
     return StringUtils.split(param, ",").stream()
         .map(String::trim).distinct().collect(Collectors.toList());
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index 46ad7910af13..a287092c88c3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -22,7 +22,9 @@ package org.apache.hudi.common.util;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap.DiskMapType;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -38,6 +40,8 @@ import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES_PREFIX;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -59,7 +63,7 @@ public class TestConfigUtils {
 
   @ParameterizedTest
   @MethodSource("separatorArgs")
-  public void testToMapSucceeds(Option<String> separator) {
+  void testToMapSucceeds(Option<String> separator) {
     String sepString = separator.isPresent() ? separator.get() : "\n";
     Map<String, String> expectedMap = new HashMap<>();
     expectedMap.put("k.1.1.2", "v1");
@@ -111,13 +115,84 @@ public class TestConfigUtils {
 
   @ParameterizedTest
   @MethodSource("separatorArgs")
-  public void testToMapThrowError(Option<String> separator) {
+  void testToMapThrowError(Option<String> separator) {
     String sepString = separator.isPresent() ? separator.get() : "\n";
     String srcKv = String.format(
         "k.1.1.2=v1=v1.1%sk.2.1.2=v2%sk.3.1.2=v3", sepString, sepString);
     assertThrows(IllegalArgumentException.class, () -> toMap(srcKv, 
separator));
   }
 
+  @Test
+  void testShouldTrackEventTimeWaterMarkByConfig() {
+    TypedProperties props = new TypedProperties();
+
+    // Test default value (should be false)
+    assertFalse(ConfigUtils.isTrackingEventTimeWatermark(props));
+
+    // Test when explicitly set to true
+    props.put("hoodie.write.track.event.time.watermark", "true");
+    assertTrue(ConfigUtils.isTrackingEventTimeWatermark(props));
+
+    // Test when explicitly set to false
+    props.put("hoodie.write.track.event.time.watermark", "false");
+    assertFalse(ConfigUtils.isTrackingEventTimeWatermark(props));
+
+    // Test with boolean value
+    props.put("hoodie.write.track.event.time.watermark", true);
+    assertTrue(ConfigUtils.isTrackingEventTimeWatermark(props));
+
+    props.put("hoodie.write.track.event.time.watermark", false);
+    assertFalse(ConfigUtils.isTrackingEventTimeWatermark(props));
+  }
+
+  @Test
+  void testShouldKeepConsistentLogicalTimestamp() {
+    TypedProperties props = new TypedProperties();
+
+    // Test default value (should be false based on KeyGeneratorOptions)
+    assertFalse(ConfigUtils.shouldKeepConsistentLogicalTimestamp(props));
+
+    // Test when explicitly set to true
+    
props.put(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
 "true");
+    assertTrue(ConfigUtils.shouldKeepConsistentLogicalTimestamp(props));
+
+    // Test when explicitly set to false
+    
props.put(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
 "false");
+    assertFalse(ConfigUtils.shouldKeepConsistentLogicalTimestamp(props));
+
+    // Test with boolean value
+    
props.put(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
 true);
+    assertTrue(ConfigUtils.shouldKeepConsistentLogicalTimestamp(props));
+
+    
props.put(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
 false);
+    assertFalse(ConfigUtils.shouldKeepConsistentLogicalTimestamp(props));
+  }
+
+  @Test
+  void testGetEventTimeFieldName() {
+    TypedProperties props = new TypedProperties();
+
+    // Test when property is not set (should return empty Option)
+    assertFalse(ConfigUtils.getEventTimeFieldName(props) != null);
+
+    // Test when property is set to a field name
+    String eventTimeField = "event_timestamp";
+    props.put(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
eventTimeField);
+    assertNotNull(ConfigUtils.getEventTimeFieldName(props));
+    assertEquals(eventTimeField, ConfigUtils.getEventTimeFieldName(props));
+
+    // Test with different field name
+    String anotherField = "created_at";
+    props.put(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
anotherField);
+    assertNotNull(ConfigUtils.getEventTimeFieldName(props));
+    assertEquals(anotherField, ConfigUtils.getEventTimeFieldName(props));
+
+    // Test with empty string
+    props.put(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, "");
+    assertNotNull(ConfigUtils.getEventTimeFieldName(props));
+    assertEquals("", ConfigUtils.getEventTimeFieldName(props));
+  }
+
   private Map<String, String> toMap(String config, Option<String> separator) {
     if (separator.isEmpty()) {
       return ConfigUtils.toMap(config);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index ddb5dbb2f24d..faa4384f7fc9 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -35,15 +35,21 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.time.LocalDate;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
@@ -135,6 +141,27 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
     throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
   }
 
+  @Override
+  public Object convertColumnValueForLogicalType(Schema fieldSchema,
+                                                 Object fieldValue,
+                                                 boolean 
keepConsistentLogicalTimestamp) {
+    if (fieldValue == null) {
+      return null;
+    }
+    LogicalType logicalType = fieldSchema.getLogicalType();
+
+    if (logicalType == LogicalTypes.date()) {
+      return LocalDate.ofEpochDay(((IntWritable) fieldValue).get());
+    } else if (logicalType == LogicalTypes.timestampMillis() && 
keepConsistentLogicalTimestamp) {
+      return ((LongWritable) fieldValue).get();
+    } else if (logicalType == LogicalTypes.timestampMicros() && 
keepConsistentLogicalTimestamp) {
+      return ((LongWritable) fieldValue).get() / 1000;
+    } else if (logicalType instanceof LogicalTypes.Decimal) {
+      return ((HiveDecimalWritable) 
fieldValue).getHiveDecimal().bigDecimalValue();
+    }
+    return fieldValue;
+  }
+
   @Override
   public Object[] getColumnValues(Schema recordSchema, String[] columns, 
boolean consistentLogicalTimestampEnabled) {
     Object[] objects = new Object[columns.length];
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
new file mode 100644
index 000000000000..d759d06c14c5
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the specific language 
governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class TestHoodieHiveRecord {
+  private HoodieHiveRecord hoodieHiveRecord;
+  @Mock
+  private ObjectInspectorCache mockObjectInspectorCache;
+
+  @BeforeEach
+  void setUp() {
+    MockitoAnnotations.openMocks(this);
+    
+    // Create a minimal HoodieHiveRecord instance with mocked dependencies
+    HoodieKey key = new HoodieKey("test-key", "test-partition");
+    ArrayWritable data = new ArrayWritable(Writable.class, new Writable[]{new 
Text("test")});
+    Schema schema = Schema.create(Schema.Type.STRING);
+    
+    // Create HoodieHiveRecord with mocked dependencies
+    hoodieHiveRecord = new HoodieHiveRecord(key, data, schema, 
mockObjectInspectorCache);
+  }
+
+  @Test
+  void testConvertColumnValueForLogicalTypeWithNullValue() {
+    Schema dateSchema = Schema.create(Schema.Type.INT);
+    LogicalTypes.date().addToSchema(dateSchema);
+    
+    Object result = 
hoodieHiveRecord.convertColumnValueForLogicalType(dateSchema, null, true);
+    assertNull(result);
+  }
+
+  @Test
+  void testConvertColumnValueForLogicalTypeWithDate() {
+    Schema dateSchema = Schema.create(Schema.Type.INT);
+    LogicalTypes.date().addToSchema(dateSchema);
+    
+    IntWritable dateValue = new IntWritable(18628); // 2021-01-01
+    Object result = 
hoodieHiveRecord.convertColumnValueForLogicalType(dateSchema, dateValue, true);
+    
+    assertEquals(LocalDate.class, result.getClass());
+    assertEquals("2021-01-01", String.valueOf(result));
+  }
+
+  @Test
+  void testConvertColumnValueForLogicalTypeWithTimestampMillis() {
+    Schema timestampMillisSchema = Schema.create(Schema.Type.LONG);
+    LogicalTypes.timestampMillis().addToSchema(timestampMillisSchema);
+    
+    LongWritable timestampValue = new LongWritable(1609459200000L); // 
2021-01-01 00:00:00 UTC
+    Object result = 
hoodieHiveRecord.convertColumnValueForLogicalType(timestampMillisSchema, 
timestampValue, true);
+    
+    assertEquals(Long.class, result.getClass());
+    assertEquals("1609459200000", String.valueOf(result));
+  }
+
+  @Test
+  void testConvertColumnValueForLogicalTypeWithTimestampMillisDisabled() {
+    Schema timestampMillisSchema = Schema.create(Schema.Type.LONG);
+    LogicalTypes.timestampMillis().addToSchema(timestampMillisSchema);
+    
+    LongWritable timestampValue = new LongWritable(1609459200000L);
+    Object result = 
hoodieHiveRecord.convertColumnValueForLogicalType(timestampMillisSchema, 
timestampValue, false);
+    
+    // Should return original value when keepConsistentLogicalTimestamp is 
false
+    assertEquals(LongWritable.class, result.getClass());
+    assertEquals("1609459200000", String.valueOf(result));
+  }
+
+  @Test
+  void testConvertColumnValueForLogicalTypeWithTimestampMicros() {
+    Schema timestampMicrosSchema = Schema.create(Schema.Type.LONG);
+    LogicalTypes.timestampMicros().addToSchema(timestampMicrosSchema);
+    
+    LongWritable timestampValue = new LongWritable(1609459200000000L); // 
2021-01-01 00:00:00 UTC in microseconds
+    Object result = 
hoodieHiveRecord.convertColumnValueForLogicalType(timestampMicrosSchema, 
timestampValue, true);
+    
+    assertEquals(Long.class, result.getClass());
+    assertEquals("1609459200000", String.valueOf(result));
+  }
+
+  @Test
+  void testConvertColumnValueForLogicalTypeWithTimestampMicrosDisabled() {
+    Schema timestampMicrosSchema = Schema.create(Schema.Type.LONG);
+    LogicalTypes.timestampMicros().addToSchema(timestampMicrosSchema);
+    
+    LongWritable timestampValue = new LongWritable(1609459200000000L);
+    Object result = 
hoodieHiveRecord.convertColumnValueForLogicalType(timestampMicrosSchema, 
timestampValue, false);
+    
+    // Should return original value when keepConsistentLogicalTimestamp is 
false
+    assertEquals(LongWritable.class, result.getClass());
+    assertEquals("1609459200000000", String.valueOf(result));
+  }
+
+  @Test
+  void testConvertColumnValueForLogicalTypeWithDecimal() {
+    Schema decimalSchema = Schema.create(Schema.Type.BYTES);
+    LogicalTypes.decimal(10, 2).addToSchema(decimalSchema);
+    
+    HiveDecimalWritable decimalValue = new HiveDecimalWritable("123.45");
+    Object result = 
hoodieHiveRecord.convertColumnValueForLogicalType(decimalSchema, decimalValue, 
true);
+    
+    assertEquals(BigDecimal.class, result.getClass());
+    assertEquals("123.45", String.valueOf(result));
+  }
+
+  @Test
+  void testConvertColumnValueForLogicalTypeWithString() {
+    Schema stringSchema = Schema.create(Schema.Type.STRING);
+    
+    Text stringValue = new Text("test string");
+    Object result = 
hoodieHiveRecord.convertColumnValueForLogicalType(stringSchema, stringValue, 
true);
+    
+    // Should return original value for non-logical types
+    assertEquals(Text.class, result.getClass());
+    assertEquals("test string", result.toString());
+  }
+
+  @Test
+  void testConvertColumnValueForLogicalTypeWithIntWritable() {
+    Schema stringSchema = Schema.create(Schema.Type.STRING);
+    
+    IntWritable intValue = new IntWritable(42);
+    Object result = 
hoodieHiveRecord.convertColumnValueForLogicalType(stringSchema, intValue, true);
+    
+    // Should return original value for non-logical types
+    assertEquals(IntWritable.class, result.getClass());
+    assertEquals("42", String.valueOf(result));
+  }
+
+  @Test
+  void testConvertColumnValueForLogicalTypeWithLongWritable() {
+    Schema stringSchema = Schema.create(Schema.Type.STRING);
+    
+    LongWritable longValue = new LongWritable(12345L);
+    Object result = 
hoodieHiveRecord.convertColumnValueForLogicalType(stringSchema, longValue, 
true);
+    
+    // Should return original value for non-logical types
+    assertEquals(LongWritable.class, result.getClass());
+    assertEquals("12345", String.valueOf(result));
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala
index 9aaee8b0a8c8..0e581fdbfbf1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala
@@ -30,8 +30,7 @@ import org.apache.hudi.testutils.HoodieSparkClientTestBase
 
 import org.apache.spark.sql._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
-import org.slf4j.LoggerFactory
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertTrue, fail}
 
 import java.io.FileNotFoundException
 
@@ -41,11 +40,7 @@ import scala.collection.JavaConverters._
  * Tests on HoodieTimeline using the real hudi table.
  */
 class TestHoodieActiveTimeline extends HoodieSparkClientTestBase {
-
   var spark: SparkSession = null
-
-  private val log = LoggerFactory.getLogger(classOf[TestHoodieActiveTimeline])
-
   val commonOpts = Map(
     "hoodie.insert.shuffle.parallelism" -> "4",
     "hoodie.upsert.shuffle.parallelism" -> "4",
@@ -259,4 +254,57 @@ class TestHoodieActiveTimeline extends 
HoodieSparkClientTestBase {
       case _ => fail("Should have failed with FileNotFoundException")
     }
   }
+
+  @Test
+  def testEventTimeTracking(): Unit = {
+    val eventTimeOpts = commonOpts ++ Map(
+      HoodieWriteConfig.TRACK_EVENT_TIME_WATERMARK.key -> "true",
+      HoodieWriteConfig.RECORD_MERGE_MODE.key -> "EVENT_TIME_ORDERING",
+      "hoodie.payload.event.time.field" -> "current_ts"
+    )
+
+    // Initial insert
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 
100)).asScala.toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(eventTimeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    val metaClient = createMetaClient(basePath)
+    validateEventTimeMetadata(metaClient)
+
+    // Insert again
+    inputDF1.write.format("org.apache.hudi")
+      .options(eventTimeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    validateEventTimeMetadata(metaClient)
+
+    // Update records
+    val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 
50)).asScala.toList
+    val inputDF3: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records3, 2))
+    inputDF3.write.format("org.apache.hudi")
+      .options(eventTimeOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    validateEventTimeMetadata(metaClient)
+  }
+
+  def validateEventTimeMetadata(metaClient: HoodieTableMetaClient): Unit = {
+    val localMetaClient = HoodieTableMetaClient.reload(metaClient)
+    val ret = 
localMetaClient.getActiveTimeline.getLastCommitMetadataWithValidData
+    assertTrue(ret.isPresent)
+    val stats = ret.get().getRight.getPartitionToWriteStats.values().asScala
+    for {
+      statList <- stats
+      stat <- statList.asScala
+    } {
+      assertNotNull(stat.getMaxEventTime)
+      assertNotNull(stat.getMinEventTime)
+    }
+  }
 }

Reply via email to