This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d8aafb4 fix: handle zero event time ordering (#357)
d8aafb4 is described below
commit d8aafb40407e7431ac141ac45abc5edf5591b051
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Jun 26 13:17:45 2025 -0500
fix: handle zero event time ordering (#357)
When ordering value from delete block is 0 (a special value to indicate
ordering value should not be used), ignore the event time ordering, and use
commit time as fall back.
---
crates/core/src/file_group/record_batches.rs | 28 +-
crates/core/src/merge/ordering.rs | 707 ++++++++++++++++++++++++++-
crates/core/src/merge/record_merger.rs | 51 +-
crates/core/src/record/mod.rs | 18 +
crates/core/src/schema/delete.rs | 56 ++-
5 files changed, 799 insertions(+), 61 deletions(-)
diff --git a/crates/core/src/file_group/record_batches.rs
b/crates/core/src/file_group/record_batches.rs
index 5173639..c7aa7ff 100644
--- a/crates/core/src/file_group/record_batches.rs
+++ b/crates/core/src/file_group/record_batches.rs
@@ -143,7 +143,7 @@ mod tests {
// Helper function to create a test data RecordBatch
fn create_test_data_batch(num_rows: usize) -> RecordBatch {
- create_test_data_batch_with_ordering_field(num_rows, "value")
+ create_test_data_batch_with_ordering_field(num_rows, "ord_val")
}
fn create_test_data_batch_with_ordering_field(
@@ -509,7 +509,7 @@ mod tests {
let record_batches = RecordBatches::new();
let hudi_configs = Arc::new(HudiConfigs::new([(
HudiTableConfig::PrecombineField.as_ref(),
- "orderingVal",
+ "any_ordering_field",
)]));
let result = record_batches
@@ -525,7 +525,7 @@ mod tests {
let mut record_batches = RecordBatches::new();
// Need at least one data batch for schema reference
- let ordering_field = "ord_val";
+ let ordering_field = "seq_num";
record_batches.push_data_batch(create_test_data_batch_with_ordering_field(
1,
ordering_field,
@@ -544,21 +544,26 @@ mod tests {
.unwrap();
assert_eq!(result.num_rows(), 3);
- assert_eq!(result.num_columns(), 3); // commit_time, record_key,
partition_path
+ assert_eq!(result.num_columns(), 4); // commit_time, record_key,
partition_path, seq_num
// Check schema field names
let schema = result.schema();
assert_eq!(schema.field(0).name(), MetaField::CommitTime.as_ref());
assert_eq!(schema.field(1).name(), MetaField::RecordKey.as_ref());
assert_eq!(schema.field(2).name(), MetaField::PartitionPath.as_ref());
+ assert_eq!(schema.field(3).name(), ordering_field);
}
#[test]
fn test_concat_delete_batches_transformed_multiple() {
let mut record_batches = RecordBatches::new();
+ let ordering_field = "seq_num";
// Need at least one data batch for schema reference
- record_batches.push_data_batch(create_test_data_batch(1));
+
record_batches.push_data_batch(create_test_data_batch_with_ordering_field(
+ 1,
+ ordering_field,
+ ));
// Add multiple delete batches
record_batches.push_delete_batch(create_test_delete_batch(2),
"20240101000000".to_string());
@@ -567,7 +572,7 @@ mod tests {
let hudi_configs = Arc::new(HudiConfigs::new([(
HudiTableConfig::PrecombineField.as_ref(),
- "value",
+ ordering_field,
)]));
let result = record_batches
@@ -575,7 +580,7 @@ mod tests {
.unwrap();
assert_eq!(result.num_rows(), 6); // 2 + 3 + 1
- assert_eq!(result.num_columns(), 3);
+ assert_eq!(result.num_columns(), 4);
// Verify all commit times are preserved correctly
let commit_time_array = result
@@ -602,12 +607,16 @@ mod tests {
let mut record_batches = RecordBatches::new();
// Create data batch with custom ordering field
- record_batches.push_data_batch(create_test_data_batch(1));
+ let ordering_field = "custom_ordering_field";
+
record_batches.push_data_batch(create_test_data_batch_with_ordering_field(
+ 1,
+ ordering_field,
+ ));
record_batches.push_delete_batch(create_test_delete_batch(2),
"20240101000000".to_string());
let hudi_configs = Arc::new(HudiConfigs::new([(
HudiTableConfig::PrecombineField.as_ref(),
- "custom_ts",
+ ordering_field,
)]));
let result = record_batches
@@ -619,6 +628,7 @@ mod tests {
assert_eq!(schema.field(0).name(), MetaField::CommitTime.as_ref());
assert_eq!(schema.field(1).name(), MetaField::RecordKey.as_ref());
assert_eq!(schema.field(2).name(), MetaField::PartitionPath.as_ref());
+ assert_eq!(schema.field(3).name(), ordering_field);
}
#[test]
diff --git a/crates/core/src/merge/ordering.rs
b/crates/core/src/merge/ordering.rs
index dba281d..3f279eb 100644
--- a/crates/core/src/merge/ordering.rs
+++ b/crates/core/src/merge/ordering.rs
@@ -16,54 +16,83 @@
* specific language governing permissions and limitations
* under the License.
*/
-use crate::record::{extract_commit_time_ordering_values, extract_record_keys};
+use crate::config::table::HudiTableConfig;
+use crate::config::HudiConfigs;
+use crate::record::{
+ extract_commit_time_ordering_values, extract_event_time_ordering_values,
extract_record_keys,
+};
use crate::Result;
-use arrow_array::RecordBatch;
+use arrow_array::{
+ Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch,
UInt16Array, UInt32Array,
+ UInt64Array, UInt8Array,
+};
use arrow_row::{OwnedRow, Row, RowConverter};
+use arrow_schema::DataType;
use std::collections::HashMap;
+use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct MaxOrderingInfo {
- ordering_value: OwnedRow,
- source_is_delete: bool,
+ event_time_ordering: OwnedRow,
+ commit_time_ordering: OwnedRow,
+ is_event_time_zero: bool,
}
impl MaxOrderingInfo {
- pub fn ordering_value(&self) -> Row {
- self.ordering_value.row()
+ pub fn is_greater_than(&self, event_time: Row, commit_time: Row) -> bool {
+ if self.is_event_time_zero {
+ self.commit_time_ordering.row() > commit_time
+ } else {
+ self.event_time_ordering.row() > event_time
+ || (self.event_time_ordering.row() == event_time
+ && self.commit_time_ordering.row() > commit_time)
+ }
}
}
pub fn process_batch_for_max_orderings(
batch: &RecordBatch,
max_ordering: &mut HashMap<OwnedRow, MaxOrderingInfo>,
- is_delete_batch: bool,
key_converter: &RowConverter,
- ordering_converter: &RowConverter,
+ event_time_converter: &RowConverter,
+ commit_time_converter: &RowConverter,
+ hudi_configs: Arc<HudiConfigs>,
) -> Result<()> {
if batch.num_rows() == 0 {
return Ok(());
}
+ let ordering_field = hudi_configs
+ .get(HudiTableConfig::PrecombineField)?
+ .to::<String>();
+
let keys = extract_record_keys(key_converter, batch)?;
- let orderings = extract_commit_time_ordering_values(ordering_converter,
batch)?;
+ let event_times =
+ extract_event_time_ordering_values(event_time_converter, batch,
&ordering_field)?;
+ let commit_times =
extract_commit_time_ordering_values(commit_time_converter, batch)?;
for i in 0..batch.num_rows() {
let key = keys.row(i).owned();
- let ordering = orderings.row(i).owned();
+ let event_time = event_times.row(i).owned();
+ let commit_time = commit_times.row(i).owned();
+ let is_event_time_zero = is_event_time_zero(event_time.row(),
event_time_converter)?;
match max_ordering.get_mut(&key) {
Some(info) => {
- if ordering > info.ordering_value {
- info.ordering_value = ordering;
- info.source_is_delete = is_delete_batch;
+ if event_time > info.event_time_ordering {
+ info.event_time_ordering = event_time;
+ info.is_event_time_zero = is_event_time_zero;
+ }
+ if commit_time > info.commit_time_ordering {
+ info.commit_time_ordering = commit_time;
}
}
None => {
max_ordering.insert(
key,
MaxOrderingInfo {
- ordering_value: ordering,
- source_is_delete: is_delete_batch,
+ event_time_ordering: event_time,
+ commit_time_ordering: commit_time,
+ is_event_time_zero,
},
);
}
@@ -72,3 +101,651 @@ pub fn process_batch_for_max_orderings(
Ok(())
}
+
+pub fn is_event_time_zero(
+ event_time_row: Row,
+ event_time_converter: &RowConverter,
+) -> Result<bool> {
+ let event_times = event_time_converter.convert_rows([event_time_row])?;
+ assert_eq!(
+ event_times.len(),
+ 1,
+ "Expected exactly one row for event time conversion"
+ );
+ let event_time = &event_times[0];
+
+ let is_zero = match event_time.data_type() {
+ DataType::Int8 => {
+ event_time
+ .as_any()
+ .downcast_ref::<Int8Array>()
+ .unwrap()
+ .value(0)
+ == 0
+ }
+ DataType::Int16 => {
+ event_time
+ .as_any()
+ .downcast_ref::<Int16Array>()
+ .unwrap()
+ .value(0)
+ == 0
+ }
+ DataType::Int32 => {
+ event_time
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .value(0)
+ == 0
+ }
+ DataType::Int64 => {
+ event_time
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap()
+ .value(0)
+ == 0
+ }
+ DataType::UInt8 => {
+ event_time
+ .as_any()
+ .downcast_ref::<UInt8Array>()
+ .unwrap()
+ .value(0)
+ == 0
+ }
+ DataType::UInt16 => {
+ event_time
+ .as_any()
+ .downcast_ref::<UInt16Array>()
+ .unwrap()
+ .value(0)
+ == 0
+ }
+ DataType::UInt32 => {
+ event_time
+ .as_any()
+ .downcast_ref::<UInt32Array>()
+ .unwrap()
+ .value(0)
+ == 0
+ }
+ DataType::UInt64 => {
+ event_time
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .unwrap()
+ .value(0)
+ == 0
+ }
+ _ => false, // not an integer type
+ };
+
+ Ok(is_zero)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_row::{RowConverter, SortField};
+ use std::collections::HashMap;
+
+ use arrow_array::{
+ Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch,
StringArray, UInt16Array,
+ UInt32Array, UInt64Array, UInt8Array,
+ };
+ use arrow_schema::{DataType, Field, Schema, SchemaRef};
+ use std::sync::Arc;
+
+ // Helper function to create a basic test schema
+ fn create_test_delete_schema() -> SchemaRef {
+ create_test_delete_schema_with_event_time_type(DataType::UInt32)
+ }
+
+ // Helper function to create test schema with different event time data
type
+ fn create_test_delete_schema_with_event_time_type(event_time_type:
DataType) -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("_hoodie_commit_time", DataType::Utf8, false),
+ Field::new("_hoodie_record_key", DataType::Utf8, false),
+ Field::new("_hoodie_partition_path", DataType::Utf8, false),
+ Field::new("an_ordering_field", event_time_type, true),
+ ]))
+ }
+
+ // Helper function to create a test RecordBatch
+ fn create_test_delete_batch() -> RecordBatch {
+ let schema = create_test_delete_schema();
+
+ let commit_times = Arc::new(StringArray::from(vec![
+ "20240101000000",
+ "20240102000000",
+ "20240103000000",
+ ]));
+ let record_keys = Arc::new(StringArray::from(vec!["key1", "key2",
"key3"]));
+ let partition_paths = Arc::new(StringArray::from(vec!["part1",
"part2", "part3"]));
+ let ordering_values = Arc::new(UInt32Array::from(vec![100, 0, 300]));
+
+ RecordBatch::try_new(
+ schema,
+ vec![commit_times, record_keys, partition_paths, ordering_values],
+ )
+ .unwrap()
+ }
+
+ // Helper function to create test batch with specific event time type
+ fn create_test_delete_batch_with_event_time_type<T>(
+ event_time_type: DataType,
+ values: Vec<T>,
+ ) -> RecordBatch
+ where
+ T: Into<i64> + Copy,
+ {
+ let schema =
create_test_delete_schema_with_event_time_type(event_time_type.clone());
+
+ let commit_times = Arc::new(StringArray::from(vec!["20240101000000",
"20240102000000"]));
+ let record_keys = Arc::new(StringArray::from(vec!["key1", "key2"]));
+ let partition_paths = Arc::new(StringArray::from(vec!["part1",
"part2"]));
+
+ let ordering_values: Arc<dyn Array> = match event_time_type {
+ DataType::Int8 => Arc::new(Int8Array::from(
+ values
+ .into_iter()
+ .map(|v| v.into() as i8)
+ .collect::<Vec<_>>(),
+ )),
+ DataType::Int16 => Arc::new(Int16Array::from(
+ values
+ .into_iter()
+ .map(|v| v.into() as i16)
+ .collect::<Vec<_>>(),
+ )),
+ DataType::Int32 => Arc::new(Int32Array::from(
+ values
+ .into_iter()
+ .map(|v| v.into() as i32)
+ .collect::<Vec<_>>(),
+ )),
+ DataType::Int64 => Arc::new(Int64Array::from(
+ values.into_iter().map(|v| v.into()).collect::<Vec<_>>(),
+ )),
+ DataType::UInt8 => Arc::new(UInt8Array::from(
+ values
+ .into_iter()
+ .map(|v| v.into() as u8)
+ .collect::<Vec<_>>(),
+ )),
+ DataType::UInt16 => Arc::new(UInt16Array::from(
+ values
+ .into_iter()
+ .map(|v| v.into() as u16)
+ .collect::<Vec<_>>(),
+ )),
+ DataType::UInt32 => Arc::new(UInt32Array::from(
+ values
+ .into_iter()
+ .map(|v| v.into() as u32)
+ .collect::<Vec<_>>(),
+ )),
+ DataType::UInt64 => Arc::new(UInt64Array::from(
+ values
+ .into_iter()
+ .map(|v| v.into() as u64)
+ .collect::<Vec<_>>(),
+ )),
+ _ => panic!("Unsupported event time type for test"),
+ };
+
+ RecordBatch::try_new(
+ schema,
+ vec![commit_times, record_keys, partition_paths, ordering_values],
+ )
+ .unwrap()
+ }
+
+ // Helper function to create empty test batch
+ fn create_empty_test_delete_batch() -> RecordBatch {
+ let schema = create_test_delete_schema();
+
+ let commit_times = Arc::new(StringArray::from(Vec::<&str>::new()));
+ let record_keys = Arc::new(StringArray::from(Vec::<&str>::new()));
+ let partition_paths = Arc::new(StringArray::from(Vec::<&str>::new()));
+ let ordering_values = Arc::new(UInt32Array::from(Vec::<u32>::new()));
+
+ RecordBatch::try_new(
+ schema,
+ vec![commit_times, record_keys, partition_paths, ordering_values],
+ )
+ .unwrap()
+ }
+
+ // Helper function to create row converters
+ fn create_test_converters(schema: SchemaRef) -> (RowConverter,
RowConverter, RowConverter) {
+ let key_converter = RowConverter::new(vec![
+ SortField::new(schema.field(1).data_type().clone()), //
_hoodie_record_key
+ ])
+ .unwrap();
+
+ let event_time_converter = RowConverter::new(vec![
+ SortField::new(schema.field(3).data_type().clone()), //
an_ordering_field
+ ])
+ .unwrap();
+
+ let commit_time_converter = RowConverter::new(vec![
+ SortField::new(schema.field(0).data_type().clone()), //
_hoodie_commit_time
+ ])
+ .unwrap();
+
+ (key_converter, event_time_converter, commit_time_converter)
+ }
+
+ // Helper function to create test HudiConfigs
+ fn create_test_hudi_configs() -> Arc<HudiConfigs> {
+ Arc::new(HudiConfigs::new([(
+ HudiTableConfig::PrecombineField.as_ref(),
+ "an_ordering_field",
+ )]))
+ }
+
+ #[test]
+ fn test_max_ordering_info_is_greater_than_event_time_not_zero() {
+ let schema = create_test_delete_schema();
+ let (_, event_time_converter, commit_time_converter) =
create_test_converters(schema);
+
+ // Create test data
+ let event_times = event_time_converter
+ .convert_columns(&[Arc::new(UInt32Array::from(vec![100, 200,
300]))])
+ .unwrap();
+ let commit_times = commit_time_converter
+ .convert_columns(&[Arc::new(StringArray::from(vec![
+ "20240101000000",
+ "20240102000000",
+ "20240102000000",
+ ]))])
+ .unwrap();
+
+ let max_info = MaxOrderingInfo {
+ event_time_ordering: event_times.row(1).owned(), // 200
+ commit_time_ordering: commit_times.row(1).owned(), //
"20240102000000"
+ is_event_time_zero: false,
+ };
+
+ // Test: the info's event time is larger
+ assert!(max_info.is_greater_than(event_times.row(0),
commit_times.row(0))); // 200 > 100
+
+ // Test: the info's event time is smaller with the same commit time
+ assert!(!max_info.is_greater_than(event_times.row(2),
commit_times.row(1))); // 200 < 300
+
+ // Test: the info's event time is the same, but commit time is larger
+ assert!(max_info.is_greater_than(event_times.row(1),
commit_times.row(0)));
+ // same event,
+ }
+
+ #[test]
+ fn test_max_ordering_info_is_greater_than_event_time_zero() {
+ let schema = create_test_delete_schema();
+ let (_, event_time_converter, commit_time_converter) =
create_test_converters(schema);
+
+ // Create test data
+ let event_times = event_time_converter
+ .convert_columns(&[Arc::new(UInt32Array::from(vec![0, 0, 100]))])
+ .unwrap();
+ let commit_times = commit_time_converter
+ .convert_columns(&[Arc::new(StringArray::from(vec![
+ "20240101000000",
+ "20240102000000",
+ "20240103000000",
+ ]))])
+ .unwrap();
+
+ let max_info = MaxOrderingInfo {
+ event_time_ordering: event_times.row(0).owned(), // 0
+ commit_time_ordering: commit_times.row(1).owned(), //
"20240102000000"
+ is_event_time_zero: true,
+ };
+
+ // When event time is zero, only commit time matters
+ assert!(
+ !max_info.is_greater_than(event_times.row(1), commit_times.row(2)),
+ "Event time is zero, max_info is not greater than later commit"
+ );
+ assert!(
+ max_info.is_greater_than(event_times.row(1), commit_times.row(0)),
+ "Event time is zero, max_info is greater than earlier commit"
+ );
+ }
+
+ #[test]
+ fn test_process_batch_for_max_orderings_basic() {
+ let batch = create_test_delete_batch();
+ let schema = batch.schema();
+ let (key_converter, event_time_converter, commit_time_converter) =
+ create_test_converters(schema);
+ let hudi_configs = create_test_hudi_configs();
+
+ let mut max_ordering = HashMap::new();
+
+ let result = process_batch_for_max_orderings(
+ &batch,
+ &mut max_ordering,
+ &key_converter,
+ &event_time_converter,
+ &commit_time_converter,
+ hudi_configs,
+ );
+
+ assert!(result.is_ok());
+ assert_eq!(max_ordering.len(), 3); // 3 unique keys
+
+ // Verify that all keys are present
+ let keys = key_converter
+ .convert_columns(&[batch.column(1).clone()])
+ .unwrap(); // _hoodie_record_key
+ for i in 0..3 {
+ let key = keys.row(i).owned();
+ assert!(max_ordering.contains_key(&key));
+ }
+ }
+
+ #[test]
+ fn test_process_batch_for_max_orderings_empty_batch() {
+ let batch = create_empty_test_delete_batch();
+ let schema = batch.schema();
+ let (key_converter, event_time_converter, commit_time_converter) =
+ create_test_converters(schema);
+ let hudi_configs = create_test_hudi_configs();
+
+ let mut max_ordering = HashMap::new();
+
+ let result = process_batch_for_max_orderings(
+ &batch,
+ &mut max_ordering,
+ &key_converter,
+ &event_time_converter,
+ &commit_time_converter,
+ hudi_configs,
+ );
+
+ assert!(result.is_ok());
+ assert_eq!(max_ordering.len(), 0);
+ }
+
+ fn validate_max_ordering_info_with_batch(
+ max_ordering: &HashMap<OwnedRow, MaxOrderingInfo>,
+ batch: &RecordBatch,
+ key_converter: &RowConverter,
+ event_time_converter: &RowConverter,
+ commit_time_converter: &RowConverter,
+ ) {
+ let keys = key_converter
+ .convert_columns(&[batch.column(1).clone()])
+ .unwrap(); // _hoodie_record_key
+ let event_times = event_time_converter
+ .convert_columns(&[batch.column(3).clone()])
+ .unwrap();
+ let commit_times = commit_time_converter
+ .convert_columns(&[batch.column(0).clone()])
+ .unwrap();
+
+ for i in 0..batch.num_rows() {
+ let key = keys.row(i).owned();
+ let info = max_ordering
+ .get(&key)
+ .expect("Key should exist in max_ordering");
+
+ assert_eq!(info.event_time_ordering.row(), event_times.row(i));
+ assert_eq!(info.commit_time_ordering.row(), commit_times.row(i));
+ }
+ }
+
+ #[test]
+ fn test_process_batch_for_max_orderings_updates_existing() {
+ let batch = create_test_delete_batch();
+ let schema = batch.schema();
+ let (key_converter, event_time_converter, commit_time_converter) =
+ create_test_converters(schema.clone());
+ let hudi_configs = create_test_hudi_configs();
+
+ let mut max_ordering = HashMap::new();
+
+ // Process first time
+ process_batch_for_max_orderings(
+ &batch,
+ &mut max_ordering,
+ &key_converter,
+ &event_time_converter,
+ &commit_time_converter,
+ hudi_configs.clone(),
+ )
+ .unwrap();
+
+ let initial_count = max_ordering.len();
+ assert_eq!(initial_count, 3);
+
+ validate_max_ordering_info_with_batch(
+ &max_ordering,
+ &batch,
+ &key_converter,
+ &event_time_converter,
+ &commit_time_converter,
+ );
+
+ // Create second batch with same keys but different values
+ let commit_times = Arc::new(StringArray::from(vec!["20240201000000",
"20240202000000"])); // Later commits
+ let record_keys = Arc::new(StringArray::from(vec!["key1", "key2"]));
// Same keys
+ let partition_paths = Arc::new(StringArray::from(vec!["part1",
"part2"]));
+ let ordering_values = Arc::new(UInt32Array::from(vec![500, 600])); //
Higher event times
+
+ let batch2 = RecordBatch::try_new(
+ schema.clone(),
+ vec![commit_times, record_keys, partition_paths, ordering_values],
+ )
+ .unwrap();
+
+ // Process second batch
+ process_batch_for_max_orderings(
+ &batch2,
+ &mut max_ordering,
+ &key_converter,
+ &event_time_converter,
+ &commit_time_converter,
+ hudi_configs,
+ )
+ .unwrap();
+
+ // Should still have same number of unique keys, but values should be
updated
+ assert_eq!(max_ordering.len(), initial_count);
+
+ validate_max_ordering_info_with_batch(
+ &max_ordering,
+ &batch2,
+ &key_converter,
+ &event_time_converter,
+ &commit_time_converter,
+ );
+ }
+
+ #[test]
+ fn test_process_batch_for_max_orderings_missing_config() {
+ let batch = create_test_delete_batch();
+ let schema = batch.schema();
+ let (key_converter, event_time_converter, commit_time_converter) =
+ create_test_converters(schema);
+
+ // Create configs without PrecombineField
+ let hudi_configs = Arc::new(HudiConfigs::empty());
+
+ let mut max_ordering = HashMap::new();
+
+ let result = process_batch_for_max_orderings(
+ &batch,
+ &mut max_ordering,
+ &key_converter,
+ &event_time_converter,
+ &commit_time_converter,
+ hudi_configs,
+ );
+
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_is_event_time_zero_with_integer_types() {
+ let int_types = vec![
+ DataType::Int8,
+ DataType::Int16,
+ DataType::Int32,
+ DataType::Int64,
+ DataType::UInt8,
+ DataType::UInt16,
+ DataType::UInt32,
+ DataType::UInt64,
+ ];
+ for t in int_types {
+ let schema =
create_test_delete_schema_with_event_time_type(t.clone());
+ let (_, event_time_converter, _) = create_test_converters(schema);
+
+ // Test zero value
+ let zero_batch =
create_test_delete_batch_with_event_time_type(t.clone(), vec![0, 0]);
+ let event_times = event_time_converter
+ .convert_columns(&[zero_batch.column(3).clone()])
+ .unwrap();
+
+ let result = is_event_time_zero(event_times.row(0),
&event_time_converter);
+ assert!(result.is_ok());
+ assert!(result.unwrap());
+
+ // Test non-zero value
+ let non_zero_batch =
+ create_test_delete_batch_with_event_time_type(t.clone(),
vec![1, 123]);
+ let event_times = event_time_converter
+ .convert_columns(&[non_zero_batch.column(3).clone()])
+ .unwrap();
+
+ let result = is_event_time_zero(event_times.row(0),
&event_time_converter);
+ assert!(result.is_ok());
+ assert!(!result.unwrap());
+ }
+ }
+
+ #[test]
+ fn test_is_event_time_zero_unsupported_type() {
+ let schema =
create_test_delete_schema_with_event_time_type(DataType::Utf8);
+ let (_, event_time_converter, _) =
create_test_converters(schema.clone());
+
+ // Create a batch with string ordering field (unsupported for zero
check)
+ let commit_times = Arc::new(StringArray::from(vec!["20240101000000"]));
+ let record_keys = Arc::new(StringArray::from(vec!["key1"]));
+ let partition_paths = Arc::new(StringArray::from(vec!["part1"]));
+ let ordering_values = Arc::new(StringArray::from(vec!["0"])); //
String "0"
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![commit_times, record_keys, partition_paths, ordering_values],
+ )
+ .unwrap();
+
+ let event_times = event_time_converter
+ .convert_columns(&[batch.column(3).clone()])
+ .unwrap();
+
+ let result = is_event_time_zero(event_times.row(0),
&event_time_converter);
+ assert!(result.is_ok());
+ assert!(!result.unwrap()); // Should return false for unsupported types
+ }
+
+ #[test]
+ fn test_max_ordering_info_clone() {
+ let schema = create_test_delete_schema();
+ let (_, event_time_converter, commit_time_converter) =
create_test_converters(schema);
+
+ let event_times = event_time_converter
+ .convert_columns(&[Arc::new(UInt32Array::from(vec![100]))])
+ .unwrap();
+ let commit_times = commit_time_converter
+
.convert_columns(&[Arc::new(StringArray::from(vec!["20240101000000"]))])
+ .unwrap();
+
+ let original = MaxOrderingInfo {
+ event_time_ordering: event_times.row(0).owned(),
+ commit_time_ordering: commit_times.row(0).owned(),
+ is_event_time_zero: false,
+ };
+
+ let cloned = original.clone();
+
+ assert_eq!(original.is_event_time_zero, cloned.is_event_time_zero);
+ assert_eq!(
+ original.event_time_ordering.row(),
+ cloned.event_time_ordering.row()
+ );
+ assert_eq!(
+ original.commit_time_ordering.row(),
+ cloned.commit_time_ordering.row()
+ );
+ }
+
+ #[test]
+ fn test_process_batch_with_duplicate_keys() {
+ let schema = create_test_delete_schema();
+
+ // Create batch with duplicate keys
+ let commit_times = Arc::new(StringArray::from(vec![
+ "20240101000000",
+ "20240102000000",
+ "20240103000000",
+ ]));
+ let record_keys = Arc::new(StringArray::from(vec!["key1", "key1",
"key2"])); // key1 appears twice
+ let partition_paths = Arc::new(StringArray::from(vec!["part1",
"part1", "part2"]));
+ let ordering_values = Arc::new(UInt32Array::from(vec![0, 200, 300]));
// Different event times
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![commit_times, record_keys, partition_paths, ordering_values],
+ )
+ .unwrap();
+
+ let (key_converter, event_time_converter, commit_time_converter) =
+ create_test_converters(schema);
+ let hudi_configs = create_test_hudi_configs();
+
+ let mut max_ordering = HashMap::new();
+
+ let result = process_batch_for_max_orderings(
+ &batch,
+ &mut max_ordering,
+ &key_converter,
+ &event_time_converter,
+ &commit_time_converter,
+ hudi_configs,
+ );
+
+ assert!(result.is_ok());
+ assert_eq!(max_ordering.len(), 2); // Only 2 unique keys
+
+ // Verify that key1 has the maximum values from both records
+ let keys = key_converter
+ .convert_columns(&[batch.column(1).clone()])
+ .unwrap(); // _hoodie_record_key
+ let key1 = keys.row(0).owned(); // Should be "key1"
+ let event_times = event_time_converter
+ .convert_columns(&[batch.column(3).clone()])
+ .unwrap();
+ let commit_times = commit_time_converter
+ .convert_columns(&[batch.column(0).clone()])
+ .unwrap();
+
+ let info = max_ordering.get(&key1).unwrap();
+ assert_eq!(
+ info.event_time_ordering.row(),
+ event_times.row(1),
+ "Event time for key1 should be the max of 0 and 200"
+ );
+ assert_eq!(
+ info.commit_time_ordering.row(),
+ commit_times.row(1),
+ "Commit time for key1 should be the max of 20240101000000 and
20240102000000"
+ );
+ assert!(!info.is_event_time_zero);
+ }
+}
diff --git a/crates/core/src/merge/record_merger.rs
b/crates/core/src/merge/record_merger.rs
index dc11ee1..9ebde9d 100644
--- a/crates/core/src/merge/record_merger.rs
+++ b/crates/core/src/merge/record_merger.rs
@@ -27,8 +27,9 @@ use crate::merge::ordering::{process_batch_for_max_orderings,
MaxOrderingInfo};
use crate::merge::RecordMergeStrategyValue;
use crate::metadata::meta_field::MetaField;
use crate::record::{
- create_commit_time_ordering_converter, create_record_key_converter,
- extract_commit_time_ordering_values, extract_record_keys,
+ create_commit_time_ordering_converter,
create_event_time_ordering_converter,
+ create_record_key_converter, extract_commit_time_ordering_values,
+ extract_event_time_ordering_values, extract_record_keys,
};
use crate::util::arrow::lexsort_to_indices;
use crate::util::arrow::ColumnAsArray;
@@ -111,15 +112,18 @@ impl RecordMerger {
let ordering_field =
self.hudi_configs.get(PrecombineField)?.to::<String>();
let ordering_array = data_batch.get_array(&ordering_field)?;
let commit_seqno_array =
data_batch.get_array(MetaField::CommitSeqno.as_ref())?;
- let sorted_indices =
+ let desc_indices =
lexsort_to_indices(&[key_array, ordering_array,
commit_seqno_array], true);
// Create shared converters for record keys and ordering values
let key_converter =
create_record_key_converter(data_batch.schema())?;
- let ordering_converter =
+ let ordering_field =
self.hudi_configs.get(PrecombineField)?.to::<String>();
+ let event_time_converter =
+ create_event_time_ordering_converter(data_batch.schema(),
&ordering_field)?;
+ let commit_time_converter =
create_commit_time_ordering_converter(data_batch.schema())?;
- // Process the delete batches to get the max ordering of each
delete
+ // Process the delete batches to get the max ordering of each
deleting key
let delete_orderings: HashMap<OwnedRow, MaxOrderingInfo> =
if record_batches.num_delete_rows() == 0 {
HashMap::new()
@@ -131,9 +135,10 @@ impl RecordMerger {
process_batch_for_max_orderings(
&delete_batch,
&mut delete_orderings,
- true,
&key_converter,
- &ordering_converter,
+ &event_time_converter,
+ &commit_time_converter,
+ self.hudi_configs.clone(),
)?;
delete_orderings
};
@@ -142,25 +147,33 @@ impl RecordMerger {
let mut keep_mask_builder = BooleanArray::builder(num_records);
let record_keys = extract_record_keys(&key_converter,
&data_batch)?;
- let ordering_values =
- extract_commit_time_ordering_values(&ordering_converter,
&data_batch)?;
+ let event_times = extract_event_time_ordering_values(
+ &event_time_converter,
+ &data_batch,
+ &ordering_field,
+ )?;
+ let commit_times =
+
extract_commit_time_ordering_values(&commit_time_converter, &data_batch)?;
let mut last_key: Option<Row> = None;
for i in 0..num_records {
// Iterator over sorted indices to process records in desc
order
- let idx = sorted_indices.value(i) as usize;
- let current_key = record_keys.row(idx);
- let current_ordering = ordering_values.row(idx);
+ let idx = desc_indices.value(i) as usize;
+ let curr_key = record_keys.row(idx);
+ let curr_event_time = event_times.row(idx);
+ let curr_commit_time = commit_times.row(idx);
- let first_seen = last_key != Some(current_key);
+ let first_seen = last_key != Some(curr_key);
if first_seen {
- last_key = Some(current_key);
+ last_key = Some(curr_key);
- let should_keep = match
delete_orderings.get(¤t_key.owned()) {
+ let should_keep = match
delete_orderings.get(&curr_key.owned()) {
Some(delete_max_ordering) => {
- // If the record's ordering >= its delete
ordering, keep it.
- // Otherwise, it's deleted and it should not
be kept.
- current_ordering >=
delete_max_ordering.ordering_value()
+ // If the delete ordering is not greater than
the record's ordering,
+ // we keep the record.
+ // Otherwise, we discard it as the delete is
more recent.
+ !delete_max_ordering
+ .is_greater_than(curr_event_time,
curr_commit_time)
}
None => true, // There is no delete for this key,
keep it.
};
@@ -176,7 +189,7 @@ impl RecordMerger {
// Filter the sorted indices based on the keep mask
// then take the records
let keep_mask = keep_mask_builder.finish();
- let keep_indices = arrow::compute::filter(&sorted_indices,
&keep_mask)?;
+ let keep_indices = arrow::compute::filter(&desc_indices,
&keep_mask)?;
Ok(take_record_batch(&data_batch, &keep_indices)?)
}
}
diff --git a/crates/core/src/record/mod.rs b/crates/core/src/record/mod.rs
index 13b3433..e519f07 100644
--- a/crates/core/src/record/mod.rs
+++ b/crates/core/src/record/mod.rs
@@ -28,6 +28,13 @@ pub fn create_record_key_converter(schema: SchemaRef) ->
Result<RowConverter> {
create_row_converter(schema, [MetaField::RecordKey.as_ref()])
}
+pub fn create_event_time_ordering_converter(
+ schema: SchemaRef,
+ ordering_field: &str,
+) -> Result<RowConverter> {
+ create_row_converter(schema, [ordering_field])
+}
+
pub fn create_commit_time_ordering_converter(schema: SchemaRef) ->
Result<RowConverter> {
create_row_converter(schema, [MetaField::CommitTime.as_ref()])
}
@@ -39,6 +46,17 @@ pub fn extract_record_keys(converter: &RowConverter, batch:
&RecordBatch) -> Res
.map_err(CoreError::ArrowError)
}
+pub fn extract_event_time_ordering_values(
+ converter: &RowConverter,
+ batch: &RecordBatch,
+ ordering_field: &str,
+) -> Result<Rows> {
+ let ordering_columns = get_column_arrays(batch, [ordering_field])?;
+ converter
+ .convert_columns(&ordering_columns)
+ .map_err(CoreError::ArrowError)
+}
+
pub fn extract_commit_time_ordering_values(
converter: &RowConverter,
batch: &RecordBatch,
diff --git a/crates/core/src/schema/delete.rs b/crates/core/src/schema/delete.rs
index 4d560c9..838fb9c 100644
--- a/crates/core/src/schema/delete.rs
+++ b/crates/core/src/schema/delete.rs
@@ -128,13 +128,10 @@ pub fn avro_schema_for_delete_record_list() ->
Result<&'static AvroSchema> {
}
/// Transforms a RecordBatch representing delete records into a new RecordBatch
-///
-/// ## Notes
-/// `ordering_field` is ignored as the delete ordering is determined by the
commit time.
pub fn transform_delete_record_batch(
batch: &RecordBatch,
commit_time: &str,
- _ordering_field: &str,
+ ordering_field: &str,
) -> Result<RecordBatch> {
let num_rows = batch.num_rows();
@@ -144,9 +141,15 @@ pub fn transform_delete_record_batch(
// Get the original column data directly by position
let record_key_array = batch.column(0).clone(); // recordKey at pos 0
let partition_path_array = batch.column(1).clone(); // partitionPath at
pos 1
+ let ordering_val_array = batch.column(2).clone(); // orderingVal at pos 2
// Create new columns vector with the new order
- let new_columns = vec![commit_time_array, record_key_array,
partition_path_array];
+ let new_columns = vec![
+ commit_time_array,
+ record_key_array,
+ partition_path_array,
+ ordering_val_array,
+ ];
let new_fields = vec![
Arc::new(Field::new(
@@ -164,6 +167,11 @@ pub fn transform_delete_record_batch(
DataType::Utf8,
true,
)),
+ Arc::new(Field::new(
+ ordering_field,
+ batch.schema().field(2).data_type().clone(),
+ batch.schema().field(2).is_nullable(),
+ )),
];
let new_schema = SchemaRef::from(Schema::new(new_fields));
RecordBatch::try_new(new_schema,
new_columns).map_err(CoreError::ArrowError)
@@ -305,7 +313,7 @@ mod tests {
}
}
- fn create_test_schema() -> SchemaRef {
+ fn create_test_delete_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("recordKey", DataType::Utf8, false),
Field::new("partitionPath", DataType::Utf8, true),
@@ -314,8 +322,8 @@ mod tests {
}
// Helper function to create a test RecordBatch
- fn create_test_record_batch() -> RecordBatch {
- let schema = create_test_schema();
+ fn create_test_delete_record_batch() -> RecordBatch {
+ let schema = create_test_delete_schema();
let record_keys = Arc::new(StringArray::from(vec!["key1", "key2",
"key3"]));
let partition_paths = Arc::new(StringArray::from(vec![
@@ -329,8 +337,8 @@ mod tests {
}
// Helper function to create empty RecordBatch
- fn create_empty_record_batch() -> RecordBatch {
- let schema = create_test_schema();
+ fn create_empty_delete_record_batch() -> RecordBatch {
+ let schema = create_test_delete_schema();
let record_keys = Arc::new(StringArray::from(Vec::<&str>::new()));
let partition_paths =
Arc::new(StringArray::from(Vec::<Option<&str>>::new()));
@@ -341,7 +349,7 @@ mod tests {
#[test]
fn test_transform_delete_record_batch_basic() {
- let batch = create_test_record_batch();
+ let batch = create_test_delete_record_batch();
let commit_time = "20240101000000";
let ordering_field = "sequenceNumber";
@@ -350,14 +358,15 @@ mod tests {
// Check number of rows preserved
assert_eq!(result.num_rows(), 3);
- // Check number of columns (should be 4: commit_time + original 3)
- assert_eq!(result.num_columns(), 3);
+ // Check number of columns (should be 4: commit_time + original 2 +
renamed ordering field)
+ assert_eq!(result.num_columns(), 4);
// Check schema field names
let schema = result.schema();
assert_eq!(schema.field(0).name(), MetaField::CommitTime.as_ref());
assert_eq!(schema.field(1).name(), MetaField::RecordKey.as_ref());
assert_eq!(schema.field(2).name(), MetaField::PartitionPath.as_ref());
+ assert_eq!(schema.field(3).name(), ordering_field);
// Check commit_time column values
let commit_time_array = result
@@ -389,11 +398,21 @@ mod tests {
assert_eq!(partition_path_array.value(0), "path1");
assert_eq!(partition_path_array.value(1), "path2");
assert_eq!(partition_path_array.value(2), "path3");
+
+ // Check ordering value column values preserved
+ let ordering_val_array = result
+ .column(3)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(ordering_val_array.value(0), 100);
+ assert_eq!(ordering_val_array.value(1), 200);
+ assert_eq!(ordering_val_array.value(2), 300);
}
#[test]
fn test_transform_delete_record_batch_empty() {
- let batch = create_empty_record_batch();
+ let batch = create_empty_delete_record_batch();
let commit_time = "20240101000000";
let ordering_field = "sequenceNumber";
@@ -401,27 +420,28 @@ mod tests {
// Check empty batch handling
assert_eq!(result.num_rows(), 0);
- assert_eq!(result.num_columns(), 3);
+ assert_eq!(result.num_columns(), 4);
// Check schema is still correct
let schema = result.schema();
assert_eq!(schema.field(0).name(), MetaField::CommitTime.as_ref());
assert_eq!(schema.field(1).name(), MetaField::RecordKey.as_ref());
assert_eq!(schema.field(2).name(), MetaField::PartitionPath.as_ref());
+ assert_eq!(schema.field(3).name(), "sequenceNumber");
// Check all arrays are empty
- for i in 0..3 {
+ for i in 0..4 {
assert_eq!(result.column(i).len(), 0);
}
}
#[test]
fn test_commit_time_values() {
- let batch = create_test_record_batch();
+ let batch = create_test_delete_record_batch();
let commit_times = ["20240101000000", "20231225123045",
"20240630235959"];
for commit_time in commit_times {
- let result = transform_delete_record_batch(&batch, commit_time,
"orderingVal").unwrap();
+ let result = transform_delete_record_batch(&batch, commit_time,
"seq").unwrap();
let commit_time_array = result
.column(0)