This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d8aafb4  fix: handle zero event time ordering (#357)
d8aafb4 is described below

commit d8aafb40407e7431ac141ac45abc5edf5591b051
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Jun 26 13:17:45 2025 -0500

    fix: handle zero event time ordering (#357)
    
    When ordering value from delete block is 0 (a special value to indicate 
ordering value should not be used), ignore the event time ordering, and use 
commit time as fall back.
---
 crates/core/src/file_group/record_batches.rs |  28 +-
 crates/core/src/merge/ordering.rs            | 707 ++++++++++++++++++++++++++-
 crates/core/src/merge/record_merger.rs       |  51 +-
 crates/core/src/record/mod.rs                |  18 +
 crates/core/src/schema/delete.rs             |  56 ++-
 5 files changed, 799 insertions(+), 61 deletions(-)

diff --git a/crates/core/src/file_group/record_batches.rs 
b/crates/core/src/file_group/record_batches.rs
index 5173639..c7aa7ff 100644
--- a/crates/core/src/file_group/record_batches.rs
+++ b/crates/core/src/file_group/record_batches.rs
@@ -143,7 +143,7 @@ mod tests {
 
     // Helper function to create a test data RecordBatch
     fn create_test_data_batch(num_rows: usize) -> RecordBatch {
-        create_test_data_batch_with_ordering_field(num_rows, "value")
+        create_test_data_batch_with_ordering_field(num_rows, "ord_val")
     }
 
     fn create_test_data_batch_with_ordering_field(
@@ -509,7 +509,7 @@ mod tests {
         let record_batches = RecordBatches::new();
         let hudi_configs = Arc::new(HudiConfigs::new([(
             HudiTableConfig::PrecombineField.as_ref(),
-            "orderingVal",
+            "any_ordering_field",
         )]));
 
         let result = record_batches
@@ -525,7 +525,7 @@ mod tests {
         let mut record_batches = RecordBatches::new();
 
         // Need at least one data batch for schema reference
-        let ordering_field = "ord_val";
+        let ordering_field = "seq_num";
         
record_batches.push_data_batch(create_test_data_batch_with_ordering_field(
             1,
             ordering_field,
@@ -544,21 +544,26 @@ mod tests {
             .unwrap();
 
         assert_eq!(result.num_rows(), 3);
-        assert_eq!(result.num_columns(), 3); // commit_time, record_key, 
partition_path
+        assert_eq!(result.num_columns(), 4); // commit_time, record_key, 
partition_path, seq_num
 
         // Check schema field names
         let schema = result.schema();
         assert_eq!(schema.field(0).name(), MetaField::CommitTime.as_ref());
         assert_eq!(schema.field(1).name(), MetaField::RecordKey.as_ref());
         assert_eq!(schema.field(2).name(), MetaField::PartitionPath.as_ref());
+        assert_eq!(schema.field(3).name(), ordering_field);
     }
 
     #[test]
     fn test_concat_delete_batches_transformed_multiple() {
         let mut record_batches = RecordBatches::new();
 
+        let ordering_field = "seq_num";
         // Need at least one data batch for schema reference
-        record_batches.push_data_batch(create_test_data_batch(1));
+        
record_batches.push_data_batch(create_test_data_batch_with_ordering_field(
+            1,
+            ordering_field,
+        ));
 
         // Add multiple delete batches
         record_batches.push_delete_batch(create_test_delete_batch(2), 
"20240101000000".to_string());
@@ -567,7 +572,7 @@ mod tests {
 
         let hudi_configs = Arc::new(HudiConfigs::new([(
             HudiTableConfig::PrecombineField.as_ref(),
-            "value",
+            ordering_field,
         )]));
 
         let result = record_batches
@@ -575,7 +580,7 @@ mod tests {
             .unwrap();
 
         assert_eq!(result.num_rows(), 6); // 2 + 3 + 1
-        assert_eq!(result.num_columns(), 3);
+        assert_eq!(result.num_columns(), 4);
 
         // Verify all commit times are preserved correctly
         let commit_time_array = result
@@ -602,12 +607,16 @@ mod tests {
         let mut record_batches = RecordBatches::new();
 
         // Create data batch with custom ordering field
-        record_batches.push_data_batch(create_test_data_batch(1));
+        let ordering_field = "custom_ordering_field";
+        
record_batches.push_data_batch(create_test_data_batch_with_ordering_field(
+            1,
+            ordering_field,
+        ));
         record_batches.push_delete_batch(create_test_delete_batch(2), 
"20240101000000".to_string());
 
         let hudi_configs = Arc::new(HudiConfigs::new([(
             HudiTableConfig::PrecombineField.as_ref(),
-            "custom_ts",
+            ordering_field,
         )]));
 
         let result = record_batches
@@ -619,6 +628,7 @@ mod tests {
         assert_eq!(schema.field(0).name(), MetaField::CommitTime.as_ref());
         assert_eq!(schema.field(1).name(), MetaField::RecordKey.as_ref());
         assert_eq!(schema.field(2).name(), MetaField::PartitionPath.as_ref());
+        assert_eq!(schema.field(3).name(), ordering_field);
     }
 
     #[test]
diff --git a/crates/core/src/merge/ordering.rs 
b/crates/core/src/merge/ordering.rs
index dba281d..3f279eb 100644
--- a/crates/core/src/merge/ordering.rs
+++ b/crates/core/src/merge/ordering.rs
@@ -16,54 +16,83 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::record::{extract_commit_time_ordering_values, extract_record_keys};
+use crate::config::table::HudiTableConfig;
+use crate::config::HudiConfigs;
+use crate::record::{
+    extract_commit_time_ordering_values, extract_event_time_ordering_values, 
extract_record_keys,
+};
 use crate::Result;
-use arrow_array::RecordBatch;
+use arrow_array::{
+    Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, 
UInt16Array, UInt32Array,
+    UInt64Array, UInt8Array,
+};
 use arrow_row::{OwnedRow, Row, RowConverter};
+use arrow_schema::DataType;
 use std::collections::HashMap;
+use std::sync::Arc;
 
 #[derive(Debug, Clone)]
 pub struct MaxOrderingInfo {
-    ordering_value: OwnedRow,
-    source_is_delete: bool,
+    event_time_ordering: OwnedRow,
+    commit_time_ordering: OwnedRow,
+    is_event_time_zero: bool,
 }
 
 impl MaxOrderingInfo {
-    pub fn ordering_value(&self) -> Row {
-        self.ordering_value.row()
+    pub fn is_greater_than(&self, event_time: Row, commit_time: Row) -> bool {
+        if self.is_event_time_zero {
+            self.commit_time_ordering.row() > commit_time
+        } else {
+            self.event_time_ordering.row() > event_time
+                || (self.event_time_ordering.row() == event_time
+                    && self.commit_time_ordering.row() > commit_time)
+        }
     }
 }
 
 pub fn process_batch_for_max_orderings(
     batch: &RecordBatch,
     max_ordering: &mut HashMap<OwnedRow, MaxOrderingInfo>,
-    is_delete_batch: bool,
     key_converter: &RowConverter,
-    ordering_converter: &RowConverter,
+    event_time_converter: &RowConverter,
+    commit_time_converter: &RowConverter,
+    hudi_configs: Arc<HudiConfigs>,
 ) -> Result<()> {
     if batch.num_rows() == 0 {
         return Ok(());
     }
 
+    let ordering_field = hudi_configs
+        .get(HudiTableConfig::PrecombineField)?
+        .to::<String>();
+
     let keys = extract_record_keys(key_converter, batch)?;
-    let orderings = extract_commit_time_ordering_values(ordering_converter, 
batch)?;
+    let event_times =
+        extract_event_time_ordering_values(event_time_converter, batch, 
&ordering_field)?;
+    let commit_times = 
extract_commit_time_ordering_values(commit_time_converter, batch)?;
     for i in 0..batch.num_rows() {
         let key = keys.row(i).owned();
-        let ordering = orderings.row(i).owned();
+        let event_time = event_times.row(i).owned();
+        let commit_time = commit_times.row(i).owned();
+        let is_event_time_zero = is_event_time_zero(event_time.row(), 
event_time_converter)?;
 
         match max_ordering.get_mut(&key) {
             Some(info) => {
-                if ordering > info.ordering_value {
-                    info.ordering_value = ordering;
-                    info.source_is_delete = is_delete_batch;
+                if event_time > info.event_time_ordering {
+                    info.event_time_ordering = event_time;
+                    info.is_event_time_zero = is_event_time_zero;
+                }
+                if commit_time > info.commit_time_ordering {
+                    info.commit_time_ordering = commit_time;
                 }
             }
             None => {
                 max_ordering.insert(
                     key,
                     MaxOrderingInfo {
-                        ordering_value: ordering,
-                        source_is_delete: is_delete_batch,
+                        event_time_ordering: event_time,
+                        commit_time_ordering: commit_time,
+                        is_event_time_zero,
                     },
                 );
             }
@@ -72,3 +101,651 @@ pub fn process_batch_for_max_orderings(
 
     Ok(())
 }
+
+pub fn is_event_time_zero(
+    event_time_row: Row,
+    event_time_converter: &RowConverter,
+) -> Result<bool> {
+    let event_times = event_time_converter.convert_rows([event_time_row])?;
+    assert_eq!(
+        event_times.len(),
+        1,
+        "Expected exactly one row for event time conversion"
+    );
+    let event_time = &event_times[0];
+
+    let is_zero = match event_time.data_type() {
+        DataType::Int8 => {
+            event_time
+                .as_any()
+                .downcast_ref::<Int8Array>()
+                .unwrap()
+                .value(0)
+                == 0
+        }
+        DataType::Int16 => {
+            event_time
+                .as_any()
+                .downcast_ref::<Int16Array>()
+                .unwrap()
+                .value(0)
+                == 0
+        }
+        DataType::Int32 => {
+            event_time
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .value(0)
+                == 0
+        }
+        DataType::Int64 => {
+            event_time
+                .as_any()
+                .downcast_ref::<Int64Array>()
+                .unwrap()
+                .value(0)
+                == 0
+        }
+        DataType::UInt8 => {
+            event_time
+                .as_any()
+                .downcast_ref::<UInt8Array>()
+                .unwrap()
+                .value(0)
+                == 0
+        }
+        DataType::UInt16 => {
+            event_time
+                .as_any()
+                .downcast_ref::<UInt16Array>()
+                .unwrap()
+                .value(0)
+                == 0
+        }
+        DataType::UInt32 => {
+            event_time
+                .as_any()
+                .downcast_ref::<UInt32Array>()
+                .unwrap()
+                .value(0)
+                == 0
+        }
+        DataType::UInt64 => {
+            event_time
+                .as_any()
+                .downcast_ref::<UInt64Array>()
+                .unwrap()
+                .value(0)
+                == 0
+        }
+        _ => false, // not an integer type
+    };
+
+    Ok(is_zero)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow_row::{RowConverter, SortField};
+    use std::collections::HashMap;
+
+    use arrow_array::{
+        Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, 
StringArray, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    };
+    use arrow_schema::{DataType, Field, Schema, SchemaRef};
+    use std::sync::Arc;
+
+    // Helper function to create a basic test schema
+    fn create_test_delete_schema() -> SchemaRef {
+        create_test_delete_schema_with_event_time_type(DataType::UInt32)
+    }
+
+    // Helper function to create test schema with different event time data 
type
+    fn create_test_delete_schema_with_event_time_type(event_time_type: 
DataType) -> SchemaRef {
+        Arc::new(Schema::new(vec![
+            Field::new("_hoodie_commit_time", DataType::Utf8, false),
+            Field::new("_hoodie_record_key", DataType::Utf8, false),
+            Field::new("_hoodie_partition_path", DataType::Utf8, false),
+            Field::new("an_ordering_field", event_time_type, true),
+        ]))
+    }
+
+    // Helper function to create a test RecordBatch
+    fn create_test_delete_batch() -> RecordBatch {
+        let schema = create_test_delete_schema();
+
+        let commit_times = Arc::new(StringArray::from(vec![
+            "20240101000000",
+            "20240102000000",
+            "20240103000000",
+        ]));
+        let record_keys = Arc::new(StringArray::from(vec!["key1", "key2", 
"key3"]));
+        let partition_paths = Arc::new(StringArray::from(vec!["part1", 
"part2", "part3"]));
+        let ordering_values = Arc::new(UInt32Array::from(vec![100, 0, 300]));
+
+        RecordBatch::try_new(
+            schema,
+            vec![commit_times, record_keys, partition_paths, ordering_values],
+        )
+        .unwrap()
+    }
+
+    // Helper function to create test batch with specific event time type
+    fn create_test_delete_batch_with_event_time_type<T>(
+        event_time_type: DataType,
+        values: Vec<T>,
+    ) -> RecordBatch
+    where
+        T: Into<i64> + Copy,
+    {
+        let schema = 
create_test_delete_schema_with_event_time_type(event_time_type.clone());
+
+        let commit_times = Arc::new(StringArray::from(vec!["20240101000000", 
"20240102000000"]));
+        let record_keys = Arc::new(StringArray::from(vec!["key1", "key2"]));
+        let partition_paths = Arc::new(StringArray::from(vec!["part1", 
"part2"]));
+
+        let ordering_values: Arc<dyn Array> = match event_time_type {
+            DataType::Int8 => Arc::new(Int8Array::from(
+                values
+                    .into_iter()
+                    .map(|v| v.into() as i8)
+                    .collect::<Vec<_>>(),
+            )),
+            DataType::Int16 => Arc::new(Int16Array::from(
+                values
+                    .into_iter()
+                    .map(|v| v.into() as i16)
+                    .collect::<Vec<_>>(),
+            )),
+            DataType::Int32 => Arc::new(Int32Array::from(
+                values
+                    .into_iter()
+                    .map(|v| v.into() as i32)
+                    .collect::<Vec<_>>(),
+            )),
+            DataType::Int64 => Arc::new(Int64Array::from(
+                values.into_iter().map(|v| v.into()).collect::<Vec<_>>(),
+            )),
+            DataType::UInt8 => Arc::new(UInt8Array::from(
+                values
+                    .into_iter()
+                    .map(|v| v.into() as u8)
+                    .collect::<Vec<_>>(),
+            )),
+            DataType::UInt16 => Arc::new(UInt16Array::from(
+                values
+                    .into_iter()
+                    .map(|v| v.into() as u16)
+                    .collect::<Vec<_>>(),
+            )),
+            DataType::UInt32 => Arc::new(UInt32Array::from(
+                values
+                    .into_iter()
+                    .map(|v| v.into() as u32)
+                    .collect::<Vec<_>>(),
+            )),
+            DataType::UInt64 => Arc::new(UInt64Array::from(
+                values
+                    .into_iter()
+                    .map(|v| v.into() as u64)
+                    .collect::<Vec<_>>(),
+            )),
+            _ => panic!("Unsupported event time type for test"),
+        };
+
+        RecordBatch::try_new(
+            schema,
+            vec![commit_times, record_keys, partition_paths, ordering_values],
+        )
+        .unwrap()
+    }
+
+    // Helper function to create empty test batch
+    fn create_empty_test_delete_batch() -> RecordBatch {
+        let schema = create_test_delete_schema();
+
+        let commit_times = Arc::new(StringArray::from(Vec::<&str>::new()));
+        let record_keys = Arc::new(StringArray::from(Vec::<&str>::new()));
+        let partition_paths = Arc::new(StringArray::from(Vec::<&str>::new()));
+        let ordering_values = Arc::new(UInt32Array::from(Vec::<u32>::new()));
+
+        RecordBatch::try_new(
+            schema,
+            vec![commit_times, record_keys, partition_paths, ordering_values],
+        )
+        .unwrap()
+    }
+
+    // Helper function to create row converters
+    fn create_test_converters(schema: SchemaRef) -> (RowConverter, 
RowConverter, RowConverter) {
+        let key_converter = RowConverter::new(vec![
+            SortField::new(schema.field(1).data_type().clone()), // 
_hoodie_record_key
+        ])
+        .unwrap();
+
+        let event_time_converter = RowConverter::new(vec![
+            SortField::new(schema.field(3).data_type().clone()), // 
an_ordering_field
+        ])
+        .unwrap();
+
+        let commit_time_converter = RowConverter::new(vec![
+            SortField::new(schema.field(0).data_type().clone()), // 
_hoodie_commit_time
+        ])
+        .unwrap();
+
+        (key_converter, event_time_converter, commit_time_converter)
+    }
+
+    // Helper function to create test HudiConfigs
+    fn create_test_hudi_configs() -> Arc<HudiConfigs> {
+        Arc::new(HudiConfigs::new([(
+            HudiTableConfig::PrecombineField.as_ref(),
+            "an_ordering_field",
+        )]))
+    }
+
+    #[test]
+    fn test_max_ordering_info_is_greater_than_event_time_not_zero() {
+        let schema = create_test_delete_schema();
+        let (_, event_time_converter, commit_time_converter) = 
create_test_converters(schema);
+
+        // Create test data
+        let event_times = event_time_converter
+            .convert_columns(&[Arc::new(UInt32Array::from(vec![100, 200, 
300]))])
+            .unwrap();
+        let commit_times = commit_time_converter
+            .convert_columns(&[Arc::new(StringArray::from(vec![
+                "20240101000000",
+                "20240102000000",
+                "20240102000000",
+            ]))])
+            .unwrap();
+
+        let max_info = MaxOrderingInfo {
+            event_time_ordering: event_times.row(1).owned(), // 200
+            commit_time_ordering: commit_times.row(1).owned(), // 
"20240102000000"
+            is_event_time_zero: false,
+        };
+
+        // Test: the info's event time is larger
+        assert!(max_info.is_greater_than(event_times.row(0), 
commit_times.row(0))); // 200 > 100
+
+        // Test: the info's event time is smaller with the same commit time
+        assert!(!max_info.is_greater_than(event_times.row(2), 
commit_times.row(1))); // 200 < 300
+
+        // Test: the info's event time is the same, but commit time is larger
+        assert!(max_info.is_greater_than(event_times.row(1), 
commit_times.row(0)));
+        // same event,
+    }
+
+    #[test]
+    fn test_max_ordering_info_is_greater_than_event_time_zero() {
+        let schema = create_test_delete_schema();
+        let (_, event_time_converter, commit_time_converter) = 
create_test_converters(schema);
+
+        // Create test data
+        let event_times = event_time_converter
+            .convert_columns(&[Arc::new(UInt32Array::from(vec![0, 0, 100]))])
+            .unwrap();
+        let commit_times = commit_time_converter
+            .convert_columns(&[Arc::new(StringArray::from(vec![
+                "20240101000000",
+                "20240102000000",
+                "20240103000000",
+            ]))])
+            .unwrap();
+
+        let max_info = MaxOrderingInfo {
+            event_time_ordering: event_times.row(0).owned(), // 0
+            commit_time_ordering: commit_times.row(1).owned(), // 
"20240102000000"
+            is_event_time_zero: true,
+        };
+
+        // When event time is zero, only commit time matters
+        assert!(
+            !max_info.is_greater_than(event_times.row(1), commit_times.row(2)),
+            "Event time is zero, max_info is not greater than later commit"
+        );
+        assert!(
+            max_info.is_greater_than(event_times.row(1), commit_times.row(0)),
+            "Event time is zero, max_info is greater than earlier commit"
+        );
+    }
+
+    #[test]
+    fn test_process_batch_for_max_orderings_basic() {
+        let batch = create_test_delete_batch();
+        let schema = batch.schema();
+        let (key_converter, event_time_converter, commit_time_converter) =
+            create_test_converters(schema);
+        let hudi_configs = create_test_hudi_configs();
+
+        let mut max_ordering = HashMap::new();
+
+        let result = process_batch_for_max_orderings(
+            &batch,
+            &mut max_ordering,
+            &key_converter,
+            &event_time_converter,
+            &commit_time_converter,
+            hudi_configs,
+        );
+
+        assert!(result.is_ok());
+        assert_eq!(max_ordering.len(), 3); // 3 unique keys
+
+        // Verify that all keys are present
+        let keys = key_converter
+            .convert_columns(&[batch.column(1).clone()])
+            .unwrap(); // _hoodie_record_key
+        for i in 0..3 {
+            let key = keys.row(i).owned();
+            assert!(max_ordering.contains_key(&key));
+        }
+    }
+
+    #[test]
+    fn test_process_batch_for_max_orderings_empty_batch() {
+        let batch = create_empty_test_delete_batch();
+        let schema = batch.schema();
+        let (key_converter, event_time_converter, commit_time_converter) =
+            create_test_converters(schema);
+        let hudi_configs = create_test_hudi_configs();
+
+        let mut max_ordering = HashMap::new();
+
+        let result = process_batch_for_max_orderings(
+            &batch,
+            &mut max_ordering,
+            &key_converter,
+            &event_time_converter,
+            &commit_time_converter,
+            hudi_configs,
+        );
+
+        assert!(result.is_ok());
+        assert_eq!(max_ordering.len(), 0);
+    }
+
+    fn validate_max_ordering_info_with_batch(
+        max_ordering: &HashMap<OwnedRow, MaxOrderingInfo>,
+        batch: &RecordBatch,
+        key_converter: &RowConverter,
+        event_time_converter: &RowConverter,
+        commit_time_converter: &RowConverter,
+    ) {
+        let keys = key_converter
+            .convert_columns(&[batch.column(1).clone()])
+            .unwrap(); // _hoodie_record_key
+        let event_times = event_time_converter
+            .convert_columns(&[batch.column(3).clone()])
+            .unwrap();
+        let commit_times = commit_time_converter
+            .convert_columns(&[batch.column(0).clone()])
+            .unwrap();
+
+        for i in 0..batch.num_rows() {
+            let key = keys.row(i).owned();
+            let info = max_ordering
+                .get(&key)
+                .expect("Key should exist in max_ordering");
+
+            assert_eq!(info.event_time_ordering.row(), event_times.row(i));
+            assert_eq!(info.commit_time_ordering.row(), commit_times.row(i));
+        }
+    }
+
+    #[test]
+    fn test_process_batch_for_max_orderings_updates_existing() {
+        let batch = create_test_delete_batch();
+        let schema = batch.schema();
+        let (key_converter, event_time_converter, commit_time_converter) =
+            create_test_converters(schema.clone());
+        let hudi_configs = create_test_hudi_configs();
+
+        let mut max_ordering = HashMap::new();
+
+        // Process first time
+        process_batch_for_max_orderings(
+            &batch,
+            &mut max_ordering,
+            &key_converter,
+            &event_time_converter,
+            &commit_time_converter,
+            hudi_configs.clone(),
+        )
+        .unwrap();
+
+        let initial_count = max_ordering.len();
+        assert_eq!(initial_count, 3);
+
+        validate_max_ordering_info_with_batch(
+            &max_ordering,
+            &batch,
+            &key_converter,
+            &event_time_converter,
+            &commit_time_converter,
+        );
+
+        // Create second batch with same keys but different values
+        let commit_times = Arc::new(StringArray::from(vec!["20240201000000", 
"20240202000000"])); // Later commits
+        let record_keys = Arc::new(StringArray::from(vec!["key1", "key2"])); 
// Same keys
+        let partition_paths = Arc::new(StringArray::from(vec!["part1", 
"part2"]));
+        let ordering_values = Arc::new(UInt32Array::from(vec![500, 600])); // 
Higher event times
+
+        let batch2 = RecordBatch::try_new(
+            schema.clone(),
+            vec![commit_times, record_keys, partition_paths, ordering_values],
+        )
+        .unwrap();
+
+        // Process second batch
+        process_batch_for_max_orderings(
+            &batch2,
+            &mut max_ordering,
+            &key_converter,
+            &event_time_converter,
+            &commit_time_converter,
+            hudi_configs,
+        )
+        .unwrap();
+
+        // Should still have same number of unique keys, but values should be 
updated
+        assert_eq!(max_ordering.len(), initial_count);
+
+        validate_max_ordering_info_with_batch(
+            &max_ordering,
+            &batch2,
+            &key_converter,
+            &event_time_converter,
+            &commit_time_converter,
+        );
+    }
+
+    #[test]
+    fn test_process_batch_for_max_orderings_missing_config() {
+        let batch = create_test_delete_batch();
+        let schema = batch.schema();
+        let (key_converter, event_time_converter, commit_time_converter) =
+            create_test_converters(schema);
+
+        // Create configs without PrecombineField
+        let hudi_configs = Arc::new(HudiConfigs::empty());
+
+        let mut max_ordering = HashMap::new();
+
+        let result = process_batch_for_max_orderings(
+            &batch,
+            &mut max_ordering,
+            &key_converter,
+            &event_time_converter,
+            &commit_time_converter,
+            hudi_configs,
+        );
+
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_is_event_time_zero_with_integer_types() {
+        let int_types = vec![
+            DataType::Int8,
+            DataType::Int16,
+            DataType::Int32,
+            DataType::Int64,
+            DataType::UInt8,
+            DataType::UInt16,
+            DataType::UInt32,
+            DataType::UInt64,
+        ];
+        for t in int_types {
+            let schema = 
create_test_delete_schema_with_event_time_type(t.clone());
+            let (_, event_time_converter, _) = create_test_converters(schema);
+
+            // Test zero value
+            let zero_batch = 
create_test_delete_batch_with_event_time_type(t.clone(), vec![0, 0]);
+            let event_times = event_time_converter
+                .convert_columns(&[zero_batch.column(3).clone()])
+                .unwrap();
+
+            let result = is_event_time_zero(event_times.row(0), 
&event_time_converter);
+            assert!(result.is_ok());
+            assert!(result.unwrap());
+
+            // Test non-zero value
+            let non_zero_batch =
+                create_test_delete_batch_with_event_time_type(t.clone(), 
vec![1, 123]);
+            let event_times = event_time_converter
+                .convert_columns(&[non_zero_batch.column(3).clone()])
+                .unwrap();
+
+            let result = is_event_time_zero(event_times.row(0), 
&event_time_converter);
+            assert!(result.is_ok());
+            assert!(!result.unwrap());
+        }
+    }
+
+    #[test]
+    fn test_is_event_time_zero_unsupported_type() {
+        let schema = 
create_test_delete_schema_with_event_time_type(DataType::Utf8);
+        let (_, event_time_converter, _) = 
create_test_converters(schema.clone());
+
+        // Create a batch with string ordering field (unsupported for zero 
check)
+        let commit_times = Arc::new(StringArray::from(vec!["20240101000000"]));
+        let record_keys = Arc::new(StringArray::from(vec!["key1"]));
+        let partition_paths = Arc::new(StringArray::from(vec!["part1"]));
+        let ordering_values = Arc::new(StringArray::from(vec!["0"])); // 
String "0"
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![commit_times, record_keys, partition_paths, ordering_values],
+        )
+        .unwrap();
+
+        let event_times = event_time_converter
+            .convert_columns(&[batch.column(3).clone()])
+            .unwrap();
+
+        let result = is_event_time_zero(event_times.row(0), 
&event_time_converter);
+        assert!(result.is_ok());
+        assert!(!result.unwrap()); // Should return false for unsupported types
+    }
+
+    #[test]
+    fn test_max_ordering_info_clone() {
+        let schema = create_test_delete_schema();
+        let (_, event_time_converter, commit_time_converter) = 
create_test_converters(schema);
+
+        let event_times = event_time_converter
+            .convert_columns(&[Arc::new(UInt32Array::from(vec![100]))])
+            .unwrap();
+        let commit_times = commit_time_converter
+            
.convert_columns(&[Arc::new(StringArray::from(vec!["20240101000000"]))])
+            .unwrap();
+
+        let original = MaxOrderingInfo {
+            event_time_ordering: event_times.row(0).owned(),
+            commit_time_ordering: commit_times.row(0).owned(),
+            is_event_time_zero: false,
+        };
+
+        let cloned = original.clone();
+
+        assert_eq!(original.is_event_time_zero, cloned.is_event_time_zero);
+        assert_eq!(
+            original.event_time_ordering.row(),
+            cloned.event_time_ordering.row()
+        );
+        assert_eq!(
+            original.commit_time_ordering.row(),
+            cloned.commit_time_ordering.row()
+        );
+    }
+
+    #[test]
+    fn test_process_batch_with_duplicate_keys() {
+        let schema = create_test_delete_schema();
+
+        // Create batch with duplicate keys
+        let commit_times = Arc::new(StringArray::from(vec![
+            "20240101000000",
+            "20240102000000",
+            "20240103000000",
+        ]));
+        let record_keys = Arc::new(StringArray::from(vec!["key1", "key1", 
"key2"])); // key1 appears twice
+        let partition_paths = Arc::new(StringArray::from(vec!["part1", 
"part1", "part2"]));
+        let ordering_values = Arc::new(UInt32Array::from(vec![0, 200, 300])); 
// Different event times
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![commit_times, record_keys, partition_paths, ordering_values],
+        )
+        .unwrap();
+
+        let (key_converter, event_time_converter, commit_time_converter) =
+            create_test_converters(schema);
+        let hudi_configs = create_test_hudi_configs();
+
+        let mut max_ordering = HashMap::new();
+
+        let result = process_batch_for_max_orderings(
+            &batch,
+            &mut max_ordering,
+            &key_converter,
+            &event_time_converter,
+            &commit_time_converter,
+            hudi_configs,
+        );
+
+        assert!(result.is_ok());
+        assert_eq!(max_ordering.len(), 2); // Only 2 unique keys
+
+        // Verify that key1 has the maximum values from both records
+        let keys = key_converter
+            .convert_columns(&[batch.column(1).clone()])
+            .unwrap(); // _hoodie_record_key
+        let key1 = keys.row(0).owned(); // Should be "key1"
+        let event_times = event_time_converter
+            .convert_columns(&[batch.column(3).clone()])
+            .unwrap();
+        let commit_times = commit_time_converter
+            .convert_columns(&[batch.column(0).clone()])
+            .unwrap();
+
+        let info = max_ordering.get(&key1).unwrap();
+        assert_eq!(
+            info.event_time_ordering.row(),
+            event_times.row(1),
+            "Event time for key1 should be the max of 0 and 200"
+        );
+        assert_eq!(
+            info.commit_time_ordering.row(),
+            commit_times.row(1),
+            "Commit time for key1 should be the max of 20240101000000 and 
20240102000000"
+        );
+        assert!(!info.is_event_time_zero);
+    }
+}
diff --git a/crates/core/src/merge/record_merger.rs 
b/crates/core/src/merge/record_merger.rs
index dc11ee1..9ebde9d 100644
--- a/crates/core/src/merge/record_merger.rs
+++ b/crates/core/src/merge/record_merger.rs
@@ -27,8 +27,9 @@ use crate::merge::ordering::{process_batch_for_max_orderings, 
MaxOrderingInfo};
 use crate::merge::RecordMergeStrategyValue;
 use crate::metadata::meta_field::MetaField;
 use crate::record::{
-    create_commit_time_ordering_converter, create_record_key_converter,
-    extract_commit_time_ordering_values, extract_record_keys,
+    create_commit_time_ordering_converter, 
create_event_time_ordering_converter,
+    create_record_key_converter, extract_commit_time_ordering_values,
+    extract_event_time_ordering_values, extract_record_keys,
 };
 use crate::util::arrow::lexsort_to_indices;
 use crate::util::arrow::ColumnAsArray;
@@ -111,15 +112,18 @@ impl RecordMerger {
                 let ordering_field = 
self.hudi_configs.get(PrecombineField)?.to::<String>();
                 let ordering_array = data_batch.get_array(&ordering_field)?;
                 let commit_seqno_array = 
data_batch.get_array(MetaField::CommitSeqno.as_ref())?;
-                let sorted_indices =
+                let desc_indices =
                     lexsort_to_indices(&[key_array, ordering_array, 
commit_seqno_array], true);
 
                 // Create shared converters for record keys and ordering values
                 let key_converter = 
create_record_key_converter(data_batch.schema())?;
-                let ordering_converter =
+                let ordering_field = 
self.hudi_configs.get(PrecombineField)?.to::<String>();
+                let event_time_converter =
+                    create_event_time_ordering_converter(data_batch.schema(), 
&ordering_field)?;
+                let commit_time_converter =
                     
create_commit_time_ordering_converter(data_batch.schema())?;
 
-                // Process the delete batches to get the max ordering of each 
delete
+                // Process the delete batches to get the max ordering of each 
deleting key
                 let delete_orderings: HashMap<OwnedRow, MaxOrderingInfo> =
                     if record_batches.num_delete_rows() == 0 {
                         HashMap::new()
@@ -131,9 +135,10 @@ impl RecordMerger {
                         process_batch_for_max_orderings(
                             &delete_batch,
                             &mut delete_orderings,
-                            true,
                             &key_converter,
-                            &ordering_converter,
+                            &event_time_converter,
+                            &commit_time_converter,
+                            self.hudi_configs.clone(),
                         )?;
                         delete_orderings
                     };
@@ -142,25 +147,33 @@ impl RecordMerger {
                 let mut keep_mask_builder = BooleanArray::builder(num_records);
 
                 let record_keys = extract_record_keys(&key_converter, 
&data_batch)?;
-                let ordering_values =
-                    extract_commit_time_ordering_values(&ordering_converter, 
&data_batch)?;
+                let event_times = extract_event_time_ordering_values(
+                    &event_time_converter,
+                    &data_batch,
+                    &ordering_field,
+                )?;
+                let commit_times =
+                    
extract_commit_time_ordering_values(&commit_time_converter, &data_batch)?;
 
                 let mut last_key: Option<Row> = None;
                 for i in 0..num_records {
                     // Iterator over sorted indices to process records in desc 
order
-                    let idx = sorted_indices.value(i) as usize;
-                    let current_key = record_keys.row(idx);
-                    let current_ordering = ordering_values.row(idx);
+                    let idx = desc_indices.value(i) as usize;
+                    let curr_key = record_keys.row(idx);
+                    let curr_event_time = event_times.row(idx);
+                    let curr_commit_time = commit_times.row(idx);
 
-                    let first_seen = last_key != Some(current_key);
+                    let first_seen = last_key != Some(curr_key);
                     if first_seen {
-                        last_key = Some(current_key);
+                        last_key = Some(curr_key);
 
-                        let should_keep = match 
delete_orderings.get(&current_key.owned()) {
+                        let should_keep = match 
delete_orderings.get(&curr_key.owned()) {
                             Some(delete_max_ordering) => {
-                                // If the record's ordering >= its delete 
ordering, keep it.
-                                // Otherwise, it's deleted and it should not 
be kept.
-                                current_ordering >= 
delete_max_ordering.ordering_value()
+                                // If the delete ordering is not greater than 
the record's ordering,
+                                // we keep the record.
+                                // Otherwise, we discard it as the delete is 
more recent.
+                                !delete_max_ordering
+                                    .is_greater_than(curr_event_time, 
curr_commit_time)
                             }
                             None => true, // There is no delete for this key, 
keep it.
                         };
@@ -176,7 +189,7 @@ impl RecordMerger {
                 // Filter the sorted indices based on the keep mask
                 // then take the records
                 let keep_mask = keep_mask_builder.finish();
-                let keep_indices = arrow::compute::filter(&sorted_indices, 
&keep_mask)?;
+                let keep_indices = arrow::compute::filter(&desc_indices, 
&keep_mask)?;
                 Ok(take_record_batch(&data_batch, &keep_indices)?)
             }
         }
diff --git a/crates/core/src/record/mod.rs b/crates/core/src/record/mod.rs
index 13b3433..e519f07 100644
--- a/crates/core/src/record/mod.rs
+++ b/crates/core/src/record/mod.rs
@@ -28,6 +28,13 @@ pub fn create_record_key_converter(schema: SchemaRef) -> 
Result<RowConverter> {
     create_row_converter(schema, [MetaField::RecordKey.as_ref()])
 }
 
+pub fn create_event_time_ordering_converter(
+    schema: SchemaRef,
+    ordering_field: &str,
+) -> Result<RowConverter> {
+    create_row_converter(schema, [ordering_field])
+}
+
 pub fn create_commit_time_ordering_converter(schema: SchemaRef) -> 
Result<RowConverter> {
     create_row_converter(schema, [MetaField::CommitTime.as_ref()])
 }
@@ -39,6 +46,17 @@ pub fn extract_record_keys(converter: &RowConverter, batch: 
&RecordBatch) -> Res
         .map_err(CoreError::ArrowError)
 }
 
+pub fn extract_event_time_ordering_values(
+    converter: &RowConverter,
+    batch: &RecordBatch,
+    ordering_field: &str,
+) -> Result<Rows> {
+    let ordering_columns = get_column_arrays(batch, [ordering_field])?;
+    converter
+        .convert_columns(&ordering_columns)
+        .map_err(CoreError::ArrowError)
+}
+
 pub fn extract_commit_time_ordering_values(
     converter: &RowConverter,
     batch: &RecordBatch,
diff --git a/crates/core/src/schema/delete.rs b/crates/core/src/schema/delete.rs
index 4d560c9..838fb9c 100644
--- a/crates/core/src/schema/delete.rs
+++ b/crates/core/src/schema/delete.rs
@@ -128,13 +128,10 @@ pub fn avro_schema_for_delete_record_list() -> 
Result<&'static AvroSchema> {
 }
 
 /// Transforms a RecordBatch representing delete records into a new RecordBatch
-///
-/// ## Notes
-/// `ordering_field` is ignored as the delete ordering is determined by the 
commit time.
 pub fn transform_delete_record_batch(
     batch: &RecordBatch,
     commit_time: &str,
-    _ordering_field: &str,
+    ordering_field: &str,
 ) -> Result<RecordBatch> {
     let num_rows = batch.num_rows();
 
@@ -144,9 +141,15 @@ pub fn transform_delete_record_batch(
     // Get the original column data directly by position
     let record_key_array = batch.column(0).clone(); // recordKey at pos 0
     let partition_path_array = batch.column(1).clone(); // partitionPath at 
pos 1
+    let ordering_val_array = batch.column(2).clone(); // orderingVal at pos 2
 
     // Create new columns vector with the new order
-    let new_columns = vec![commit_time_array, record_key_array, 
partition_path_array];
+    let new_columns = vec![
+        commit_time_array,
+        record_key_array,
+        partition_path_array,
+        ordering_val_array,
+    ];
 
     let new_fields = vec![
         Arc::new(Field::new(
@@ -164,6 +167,11 @@ pub fn transform_delete_record_batch(
             DataType::Utf8,
             true,
         )),
+        Arc::new(Field::new(
+            ordering_field,
+            batch.schema().field(2).data_type().clone(),
+            batch.schema().field(2).is_nullable(),
+        )),
     ];
     let new_schema = SchemaRef::from(Schema::new(new_fields));
     RecordBatch::try_new(new_schema, 
new_columns).map_err(CoreError::ArrowError)
@@ -305,7 +313,7 @@ mod tests {
         }
     }
 
-    fn create_test_schema() -> SchemaRef {
+    fn create_test_delete_schema() -> SchemaRef {
         Arc::new(Schema::new(vec![
             Field::new("recordKey", DataType::Utf8, false),
             Field::new("partitionPath", DataType::Utf8, true),
@@ -314,8 +322,8 @@ mod tests {
     }
 
     // Helper function to create a test RecordBatch
-    fn create_test_record_batch() -> RecordBatch {
-        let schema = create_test_schema();
+    fn create_test_delete_record_batch() -> RecordBatch {
+        let schema = create_test_delete_schema();
 
         let record_keys = Arc::new(StringArray::from(vec!["key1", "key2", 
"key3"]));
         let partition_paths = Arc::new(StringArray::from(vec![
@@ -329,8 +337,8 @@ mod tests {
     }
 
     // Helper function to create empty RecordBatch
-    fn create_empty_record_batch() -> RecordBatch {
-        let schema = create_test_schema();
+    fn create_empty_delete_record_batch() -> RecordBatch {
+        let schema = create_test_delete_schema();
 
         let record_keys = Arc::new(StringArray::from(Vec::<&str>::new()));
         let partition_paths = 
Arc::new(StringArray::from(Vec::<Option<&str>>::new()));
@@ -341,7 +349,7 @@ mod tests {
 
     #[test]
     fn test_transform_delete_record_batch_basic() {
-        let batch = create_test_record_batch();
+        let batch = create_test_delete_record_batch();
         let commit_time = "20240101000000";
         let ordering_field = "sequenceNumber";
 
@@ -350,14 +358,15 @@ mod tests {
         // Check number of rows preserved
         assert_eq!(result.num_rows(), 3);
 
-        // Check number of columns (should be 4: commit_time + original 3)
-        assert_eq!(result.num_columns(), 3);
+        // Check number of columns (should be 4: commit_time + original 2 + 
renamed ordering field)
+        assert_eq!(result.num_columns(), 4);
 
         // Check schema field names
         let schema = result.schema();
         assert_eq!(schema.field(0).name(), MetaField::CommitTime.as_ref());
         assert_eq!(schema.field(1).name(), MetaField::RecordKey.as_ref());
         assert_eq!(schema.field(2).name(), MetaField::PartitionPath.as_ref());
+        assert_eq!(schema.field(3).name(), ordering_field);
 
         // Check commit_time column values
         let commit_time_array = result
@@ -389,11 +398,21 @@ mod tests {
         assert_eq!(partition_path_array.value(0), "path1");
         assert_eq!(partition_path_array.value(1), "path2");
         assert_eq!(partition_path_array.value(2), "path3");
+
+        // Check ordering value column values preserved
+        let ordering_val_array = result
+            .column(3)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        assert_eq!(ordering_val_array.value(0), 100);
+        assert_eq!(ordering_val_array.value(1), 200);
+        assert_eq!(ordering_val_array.value(2), 300);
     }
 
     #[test]
     fn test_transform_delete_record_batch_empty() {
-        let batch = create_empty_record_batch();
+        let batch = create_empty_delete_record_batch();
         let commit_time = "20240101000000";
         let ordering_field = "sequenceNumber";
 
@@ -401,27 +420,28 @@ mod tests {
 
         // Check empty batch handling
         assert_eq!(result.num_rows(), 0);
-        assert_eq!(result.num_columns(), 3);
+        assert_eq!(result.num_columns(), 4);
 
         // Check schema is still correct
         let schema = result.schema();
         assert_eq!(schema.field(0).name(), MetaField::CommitTime.as_ref());
         assert_eq!(schema.field(1).name(), MetaField::RecordKey.as_ref());
         assert_eq!(schema.field(2).name(), MetaField::PartitionPath.as_ref());
+        assert_eq!(schema.field(3).name(), "sequenceNumber");
 
         // Check all arrays are empty
-        for i in 0..3 {
+        for i in 0..4 {
             assert_eq!(result.column(i).len(), 0);
         }
     }
 
     #[test]
     fn test_commit_time_values() {
-        let batch = create_test_record_batch();
+        let batch = create_test_delete_record_batch();
         let commit_times = ["20240101000000", "20231225123045", 
"20240630235959"];
 
         for commit_time in commit_times {
-            let result = transform_delete_record_batch(&batch, commit_time, 
"orderingVal").unwrap();
+            let result = transform_delete_record_batch(&batch, commit_time, 
"seq").unwrap();
 
             let commit_time_array = result
                 .column(0)


Reply via email to