This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 6c9f499 fix: include commit_seqno for merge order (#250)
6c9f499 is described below
commit 6c9f499678a7c8dd26394cb7b76f9a0b21c2c328
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 17 22:26:36 2025 -0600
fix: include commit_seqno for merge order (#250)
- Add `MetaField` enum for Hudi meta fields.
- Consider `_hoodie_commit_seqno` for merging when precombine values are
equal
---
crates/core/src/error.rs | 3 +
crates/core/src/lib.rs | 1 +
crates/core/src/merge/record_merger.rs | 169 ++++++++++++++++----------------
crates/core/src/metadata/meta_field.rs | 174 +++++++++++++++++++++++++++++++++
crates/core/src/metadata/mod.rs | 19 ++++
crates/core/src/util/arrow.rs | 150 ++++++++++++++++++++++++++++
crates/core/src/util/mod.rs | 1 +
7 files changed, 434 insertions(+), 83 deletions(-)
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index ed319e3..25e0aec 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -54,6 +54,9 @@ pub enum CoreError {
#[error("{0}")]
InvalidPartitionPath(String),
+ #[error("{0}")]
+ InvalidValue(String),
+
#[error(transparent)]
ParquetError(#[from] parquet::errors::ParquetError),
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 4c2e036..97be437 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -48,6 +48,7 @@ pub mod error;
pub mod expr;
pub mod file_group;
pub mod merge;
+pub mod metadata;
pub mod storage;
pub mod table;
pub mod timeline;
diff --git a/crates/core/src/merge/record_merger.rs
b/crates/core/src/merge/record_merger.rs
index 325f3b0..3a1b6ac 100644
--- a/crates/core/src/merge/record_merger.rs
+++ b/crates/core/src/merge/record_merger.rs
@@ -23,11 +23,13 @@ use crate::config::table::HudiTableConfig::{
};
use crate::config::HudiConfigs;
use crate::merge::RecordMergeStrategyValue;
+use crate::metadata::meta_field::MetaField;
+use crate::util::arrow::lexsort_to_indices;
+use crate::util::arrow::ColumnAsArray;
use crate::Result;
-use arrow::compute::{sort_to_indices, take_record_batch};
-use arrow::error::ArrowError;
-use arrow_array::{Array, RecordBatch, StringArray, UInt32Array};
-use arrow_schema::{SchemaRef, SortOptions};
+use arrow::compute::take_record_batch;
+use arrow_array::{Array, RecordBatch, UInt32Array};
+use arrow_schema::SchemaRef;
use arrow_select::concat::concat_batches;
use std::collections::HashMap;
use std::str::FromStr;
@@ -114,39 +116,26 @@ impl RecordMerger {
}
let precombine_field =
self.hudi_configs.get(PrecombineField)?.to::<String>();
- let precombine_array =
- concat_batch
- .column_by_name(&precombine_field)
- .ok_or_else(|| {
- ArrowError::SchemaError(format!("Column
{precombine_field} not found."))
- })?;
-
- // Sort the precombine values in descending order, and put
nulls last.
- let sort_options = SortOptions::new(true, false);
- let sorted_indices = sort_to_indices(precombine_array,
Some(sort_options), None)?;
-
- let record_key_field = "_hoodie_record_key";
- let record_key_array = concat_batch
- .column_by_name(record_key_field)
- .ok_or_else(|| {
- ArrowError::SchemaError(format!("Column
{record_key_field} not found."))
- })?
- .as_any()
- .downcast_ref::<StringArray>()
- .ok_or_else(|| {
- ArrowError::CastError(format!(
- "Column {record_key_field} cannot be cast to
StringArray."
- ))
- })?;
-
+ let precombine_array =
concat_batch.get_array(&precombine_field)?;
+ let commit_seqno_array =
concat_batch.get_array(MetaField::CommitSeqno.as_ref())?;
+ let sorted_indices = lexsort_to_indices(
+ &[precombine_array.clone(), commit_seqno_array.clone()],
+ true,
+ );
+
+ let record_key_array =
+
concat_batch.get_string_array(MetaField::RecordKey.as_ref())?;
let mut keys_and_latest_indices: HashMap<&str, u32> =
HashMap::with_capacity(record_key_array.len());
for i in sorted_indices.values() {
let record_key = record_key_array.value(*i as usize);
if keys_and_latest_indices.contains_key(record_key) {
- // We sorted the precombine field in descending order,
so if the record key
- // is already in the map, the associated index will be
already pointing to
- // the latest version of that record.
+ // We sorted the precombine and commit seqno in
descending order,
+ // so if the record key is already in the map, the
associated row index
+ // will be already pointing to the latest version of
that record.
+ // Note that records with the same record key,
precombine value,
+ // and commit seqno are considered duplicates, and we
keep whichever
+ // comes first in the sorted indices.
continue;
} else {
keys_and_latest_indices.insert(record_key, *i);
@@ -163,20 +152,24 @@ impl RecordMerger {
#[cfg(test)]
mod tests {
use super::*;
- use arrow_array::Int32Array;
+ use arrow_array::{Int32Array, StringArray};
use arrow_schema::{DataType, Field, Schema};
- fn create_configs(strategy: &str, meta_fields: bool, precombine:
Option<&str>) -> HudiConfigs {
+ fn create_configs(
+ strategy: &str,
+ populates_meta_fields: bool,
+ precombine: Option<&str>,
+ ) -> HudiConfigs {
if let Some(precombine) = precombine {
HudiConfigs::new([
(RecordMergeStrategy, strategy.to_string()),
- (PopulatesMetaFields, meta_fields.to_string()),
+ (PopulatesMetaFields, populates_meta_fields.to_string()),
(PrecombineField, precombine.to_string()),
])
} else {
HudiConfigs::new([
(RecordMergeStrategy, strategy.to_string()),
- (PopulatesMetaFields, meta_fields.to_string()),
+ (PopulatesMetaFields, populates_meta_fields.to_string()),
])
}
}
@@ -209,40 +202,56 @@ mod tests {
SchemaRef::from(schema)
}
- fn get_sorted_rows(batch: &RecordBatch) -> Vec<(String, i32, i32)> {
+ fn create_test_schema(ts_nullable: bool) -> SchemaRef {
+ create_schema(vec![
+ (MetaField::CommitSeqno.as_ref(), DataType::Utf8, false),
+ (MetaField::RecordKey.as_ref(), DataType::Utf8, false),
+ ("ts", DataType::Int32, ts_nullable),
+ ("value", DataType::Int32, false),
+ ])
+ }
+
+ fn get_sorted_rows(batch: &RecordBatch) -> Vec<(String, String, i32, i32)>
{
+ let seqno = batch
+ .get_string_array(MetaField::CommitSeqno.as_ref())
+ .unwrap();
let keys = batch
- .column(0)
- .as_any()
- .downcast_ref::<StringArray>()
- .expect("First column must be strings");
- let timestamps = batch
- .column(1)
+ .get_string_array(MetaField::RecordKey.as_ref())
+ .unwrap();
+ let timestamps = batch.get_array("ts").unwrap();
+ let timestamps = timestamps
.as_any()
.downcast_ref::<Int32Array>()
- .expect("Second column must be i32");
- let values = batch
- .column(2)
+ .unwrap()
+ .clone();
+ let values = batch.get_array("value").unwrap();
+ let values = values
.as_any()
.downcast_ref::<Int32Array>()
- .expect("Third column must be i32");
+ .unwrap()
+ .clone();
- let mut result: Vec<(String, i32, i32)> = keys
+ let mut result: Vec<(String, String, i32, i32)> = seqno
.iter()
+ .zip(keys.iter())
.zip(timestamps.iter())
.zip(values.iter())
- .map(|((k, t), v)| (k.unwrap().to_string(), t.unwrap(),
v.unwrap()))
+ .map(|(((s, k), t), v)| {
+ (
+ s.unwrap().to_string(),
+ k.unwrap().to_string(),
+ t.unwrap(),
+ v.unwrap(),
+ )
+ })
.collect();
- result.sort_unstable_by_key(|(k, ts, _)| (k.clone(), *ts));
+ result.sort_unstable_by_key(|(s, k, ts, _)| (k.clone(), *ts,
s.clone()));
result
}
#[test]
fn test_merge_records_empty() {
- let schema = create_schema(vec![
- ("_hoodie_record_key", DataType::Utf8, false),
- ("ts", DataType::Int32, false),
- ("value", DataType::Int32, false),
- ]);
+ let schema = create_test_schema(false);
let configs = create_configs("OVERWRITE_WITH_LATEST", true,
Some("ts"));
let merger = RecordMerger::new(Arc::new(configs));
@@ -261,16 +270,13 @@ mod tests {
#[test]
fn test_merge_records_append_only() {
- let schema = create_schema(vec![
- ("_hoodie_record_key", DataType::Utf8, false),
- ("ts", DataType::Int32, false),
- ("value", DataType::Int32, false),
- ]);
+ let schema = create_test_schema(false);
// First batch
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
+ Arc::new(StringArray::from(vec!["s1", "s1"])),
Arc::new(StringArray::from(vec!["k1", "k2"])),
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(Int32Array::from(vec![10, 20])),
@@ -282,6 +288,7 @@ mod tests {
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
+ Arc::new(StringArray::from(vec!["s2", "s2"])),
Arc::new(StringArray::from(vec!["k1", "k3"])),
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(Int32Array::from(vec![30, 40])),
@@ -302,26 +309,23 @@ mod tests {
assert_eq!(
result,
vec![
- ("k1".to_string(), 1, 10),
- ("k1".to_string(), 3, 30),
- ("k2".to_string(), 2, 20),
- ("k3".to_string(), 4, 40),
+ ("s1".to_string(), "k1".to_string(), 1, 10),
+ ("s2".to_string(), "k1".to_string(), 3, 30),
+ ("s1".to_string(), "k2".to_string(), 2, 20),
+ ("s2".to_string(), "k3".to_string(), 4, 40),
]
);
}
#[test]
fn test_merge_records_nulls() {
- let schema = create_schema(vec![
- ("_hoodie_record_key", DataType::Utf8, false),
- ("ts", DataType::Int32, true), // Nullable timestamp
- ("value", DataType::Int32, false),
- ]);
+ let schema = create_test_schema(true);
// First batch with some null timestamps
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
+ Arc::new(StringArray::from(vec!["s1", "s1", "s1"])),
Arc::new(StringArray::from(vec!["k1", "k2", "k3"])),
Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
Arc::new(Int32Array::from(vec![10, 20, 30])),
@@ -333,6 +337,7 @@ mod tests {
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
+ Arc::new(StringArray::from(vec!["s2", "s2"])),
Arc::new(StringArray::from(vec!["k1", "k2"])),
Arc::new(Int32Array::from(vec![None, Some(5)])),
Arc::new(Int32Array::from(vec![40, 50])),
@@ -352,25 +357,22 @@ mod tests {
assert_eq!(
result,
vec![
- ("k1".to_string(), 1, 10), // Keep original since both updates
have null ts
- ("k2".to_string(), 5, 50), // Take second value due to higher
ts
- ("k3".to_string(), 3, 30), // Unchanged
+ ("s1".to_string(), "k1".to_string(), 1, 10), // Keep original
since ts is null in 2nd batch
+ ("s2".to_string(), "k2".to_string(), 5, 50), // Take second
value due to higher ts
+ ("s1".to_string(), "k3".to_string(), 3, 30), // Unchanged
]
);
}
#[test]
fn test_merge_records_overwrite_with_latest() {
- let schema = create_schema(vec![
- ("_hoodie_record_key", DataType::Utf8, false),
- ("ts", DataType::Int32, false),
- ("value", DataType::Int32, false),
- ]);
+ let schema = create_test_schema(false);
// First batch
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
+ Arc::new(StringArray::from(vec!["s1", "s1", "s1"])),
Arc::new(StringArray::from(vec!["k1", "k2", "k3"])),
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![10, 20, 30])),
@@ -382,9 +384,10 @@ mod tests {
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
- Arc::new(StringArray::from(vec!["k1", "k2"])),
- Arc::new(Int32Array::from(vec![4, 1])),
- Arc::new(Int32Array::from(vec![40, 50])),
+ Arc::new(StringArray::from(vec!["s2", "s2", "s2"])),
+ Arc::new(StringArray::from(vec!["k1", "k2", "k3"])),
+ Arc::new(Int32Array::from(vec![4, 1, 3])),
+ Arc::new(Int32Array::from(vec![40, 50, 60])),
],
)
.unwrap();
@@ -401,9 +404,9 @@ mod tests {
assert_eq!(
result,
vec![
- ("k1".to_string(), 4, 40), // Latest value due to ts=4
- ("k2".to_string(), 2, 20), // Original value since ts=1 < ts=2
- ("k3".to_string(), 3, 30), // Unchanged
+ ("s2".to_string(), "k1".to_string(), 4, 40), // Latest value
due to ts=4
+ ("s1".to_string(), "k2".to_string(), 2, 20), // Original value
since ts=1 < ts=2
+ ("s2".to_string(), "k3".to_string(), 3, 60), // Latest value
due to equal ts and seqno=s2
]
);
}
diff --git a/crates/core/src/metadata/meta_field.rs
b/crates/core/src/metadata/meta_field.rs
new file mode 100644
index 0000000..240aeff
--- /dev/null
+++ b/crates/core/src/metadata/meta_field.rs
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::CoreError;
+use crate::Result;
+use std::fmt::Display;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum MetaField {
+ CommitTime = 0,
+ CommitSeqno = 1,
+ RecordKey = 2,
+ PartitionPath = 3,
+ FileName = 4,
+ Operation = 5,
+}
+
+impl AsRef<str> for MetaField {
+ fn as_ref(&self) -> &str {
+ match self {
+ MetaField::CommitTime => "_hoodie_commit_time",
+ MetaField::CommitSeqno => "_hoodie_commit_seqno",
+ MetaField::RecordKey => "_hoodie_record_key",
+ MetaField::PartitionPath => "_hoodie_partition_path",
+ MetaField::FileName => "_hoodie_file_name",
+ MetaField::Operation => "_hoodie_operation",
+ }
+ }
+}
+
+impl Display for MetaField {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.as_ref())
+ }
+}
+
+impl FromStr for MetaField {
+ type Err = CoreError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "_hoodie_commit_time" => Ok(MetaField::CommitTime),
+ "_hoodie_commit_seqno" => Ok(MetaField::CommitSeqno),
+ "_hoodie_record_key" => Ok(MetaField::RecordKey),
+ "_hoodie_partition_path" => Ok(MetaField::PartitionPath),
+ "_hoodie_file_name" => Ok(MetaField::FileName),
+ "_hoodie_operation" => Ok(MetaField::Operation),
+ _ => Err(CoreError::InvalidValue(s.to_string())),
+ }
+ }
+}
+
+impl MetaField {
+ #[inline]
+ pub fn field_index(&self) -> usize {
+ self.clone() as usize
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_field_index() {
+ assert_eq!(MetaField::CommitTime.field_index(), 0);
+ assert_eq!(MetaField::CommitSeqno.field_index(), 1);
+ assert_eq!(MetaField::RecordKey.field_index(), 2);
+ assert_eq!(MetaField::PartitionPath.field_index(), 3);
+ assert_eq!(MetaField::FileName.field_index(), 4);
+ assert_eq!(MetaField::Operation.field_index(), 5);
+ }
+
+ #[test]
+ fn test_as_ref() {
+ assert_eq!(MetaField::CommitTime.as_ref(), "_hoodie_commit_time");
+ assert_eq!(MetaField::CommitSeqno.as_ref(), "_hoodie_commit_seqno");
+ assert_eq!(MetaField::RecordKey.as_ref(), "_hoodie_record_key");
+ assert_eq!(MetaField::PartitionPath.as_ref(),
"_hoodie_partition_path");
+ assert_eq!(MetaField::FileName.as_ref(), "_hoodie_file_name");
+ assert_eq!(MetaField::Operation.as_ref(), "_hoodie_operation");
+ }
+
+ #[test]
+ fn test_display() {
+ assert_eq!(MetaField::CommitTime.to_string(), "_hoodie_commit_time");
+ assert_eq!(
+ format!("{}", MetaField::CommitSeqno),
+ "_hoodie_commit_seqno"
+ );
+ assert_eq!(MetaField::RecordKey.to_string(), "_hoodie_record_key");
+ }
+
+ #[test]
+ fn test_from_str_valid() -> Result<(), CoreError> {
+ assert_eq!(
+ MetaField::from_str("_hoodie_commit_time")?,
+ MetaField::CommitTime
+ );
+ assert_eq!(
+ MetaField::from_str("_hoodie_commit_seqno")?,
+ MetaField::CommitSeqno
+ );
+ assert_eq!(
+ MetaField::from_str("_hoodie_record_key")?,
+ MetaField::RecordKey
+ );
+ assert_eq!(
+ MetaField::from_str("_hoodie_partition_path")?,
+ MetaField::PartitionPath
+ );
+ assert_eq!(
+ MetaField::from_str("_hoodie_file_name")?,
+ MetaField::FileName
+ );
+ assert_eq!(
+ MetaField::from_str("_hoodie_operation")?,
+ MetaField::Operation
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_from_str_invalid() {
+ assert!(matches!(
+ MetaField::from_str(""),
+ Err(CoreError::InvalidValue(_))
+ ));
+ assert!(matches!(
+ MetaField::from_str("_hoodie_invalid"),
+ Err(CoreError::InvalidValue(_))
+ ));
+ assert!(matches!(
+ MetaField::from_str("invalid"),
+ Err(CoreError::InvalidValue(_))
+ ));
+ }
+
+ #[test]
+ fn test_roundtrip() -> Result<(), CoreError> {
+ // Test conversion from enum -> string -> enum
+ let fields = [
+ MetaField::CommitTime,
+ MetaField::CommitSeqno,
+ MetaField::RecordKey,
+ MetaField::PartitionPath,
+ MetaField::FileName,
+ MetaField::Operation,
+ ];
+
+ for field in fields {
+ let s = field.to_string();
+ let parsed = MetaField::from_str(&s)?;
+ assert_eq!(field, parsed);
+ }
+ Ok(())
+ }
+}
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs
new file mode 100644
index 0000000..48133fd
--- /dev/null
+++ b/crates/core/src/metadata/mod.rs
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+pub mod meta_field;
diff --git a/crates/core/src/util/arrow.rs b/crates/core/src/util/arrow.rs
new file mode 100644
index 0000000..fd8c1f4
--- /dev/null
+++ b/crates/core/src/util/arrow.rs
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::Result;
+use arrow::array::ArrayRef;
+use arrow::array::RecordBatch;
+use arrow::array::StringArray;
+use arrow_array::{Array, UInt32Array};
+use arrow_row::{RowConverter, SortField};
+use arrow_schema::ArrowError;
+
+pub trait ColumnAsArray {
+ fn get_array(&self, column_name: &str) -> Result<ArrayRef>;
+
+ fn get_string_array(&self, column_name: &str) -> Result<StringArray>;
+}
+
+impl ColumnAsArray for RecordBatch {
+ fn get_array(&self, column_name: &str) -> Result<ArrayRef> {
+ let index = self.schema().index_of(column_name)?;
+ let array = self.column(index);
+ Ok(array.clone())
+ }
+
+ fn get_string_array(&self, column_name: &str) -> Result<StringArray> {
+ let array = self.get_array(column_name)?;
+ let array = array
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .ok_or_else(|| {
+ ArrowError::CastError(format!(
+ "Column {column_name} cannot be cast to StringArray."
+ ))
+ })?;
+ Ok(array.clone())
+ }
+}
+
+pub fn lexsort_to_indices(arrays: &[ArrayRef], desc: bool) -> UInt32Array {
+ let fields = arrays
+ .iter()
+ .map(|a| SortField::new(a.data_type().clone()))
+ .collect();
+ let converter = RowConverter::new(fields).unwrap();
+ let rows = converter.convert_columns(arrays).unwrap();
+ let mut sort: Vec<_> = rows.iter().enumerate().collect();
+ if desc {
+ sort.sort_unstable_by(|(_, a), (_, b)| b.cmp(a));
+ } else {
+ sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
+ }
+ UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::{Int32Array, StringArray};
+ use arrow_array::Float64Array;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_basic_int_sort() {
+ let arr = Int32Array::from(vec![3, 1, 4, 1, 5]);
+ let arrays = vec![Arc::new(arr) as ArrayRef];
+
+ // Test ascending
+ let result = lexsort_to_indices(&arrays, false);
+ assert_eq!(
+ result.values(),
+ &[1, 3, 0, 2, 4] // Indices that would sort to [1,1,3,4,5]
+ );
+
+ // Test descending
+ let result = lexsort_to_indices(&arrays, true);
+ assert_eq!(
+ result.values(),
+ &[4, 2, 0, 1, 3] // Indices that would sort to [5,4,3,1,1]
+ );
+ }
+
+ #[test]
+ fn test_multiple_columns() {
+ let arr1 = Int32Array::from(vec![1, 1, 2, 2]);
+ let arr2 = StringArray::from(vec!["b", "a", "b", "a"]);
+ let arrays = vec![Arc::new(arr1) as ArrayRef, Arc::new(arr2) as
ArrayRef];
+
+ let result = lexsort_to_indices(&arrays, false);
+ assert_eq!(
+ result.values(),
+ &[1, 0, 3, 2] // Should sort by first column then second
+ );
+ }
+
+ #[test]
+ fn test_edge_cases() {
+ // Empty array
+ assert_eq!(lexsort_to_indices(&[], false).len(), 0);
+
+ // Array of empty array
+ let arr = Int32Array::from(vec![] as Vec<i32>);
+ let arrays = vec![Arc::new(arr) as ArrayRef];
+ let result = lexsort_to_indices(&arrays, false);
+ assert_eq!(result.len(), 0);
+
+ // Single element
+ let arr = Int32Array::from(vec![1]);
+ let arrays = vec![Arc::new(arr) as ArrayRef];
+ let result = lexsort_to_indices(&arrays, false);
+ assert_eq!(result.values(), &[0]);
+
+ // All equal values
+ let arr = Int32Array::from(vec![5, 5, 5, 5]);
+ let arrays = vec![Arc::new(arr) as ArrayRef];
+ let result = lexsort_to_indices(&arrays, false);
+ assert_eq!(result.values(), &[0, 1, 2, 3]);
+ }
+
+ #[test]
+ fn test_different_types() {
+ let int_arr = Int32Array::from(vec![1, 2, 1]);
+ let str_arr = StringArray::from(vec!["a", "b", "c"]);
+ let float_arr = Float64Array::from(vec![1.0, 2.0, 3.0]);
+
+ let arrays = vec![
+ Arc::new(int_arr) as ArrayRef,
+ Arc::new(str_arr) as ArrayRef,
+ Arc::new(float_arr) as ArrayRef,
+ ];
+
+ let result = lexsort_to_indices(&arrays, false);
+ assert_eq!(result.values(), &[0, 2, 1]);
+ }
+}
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs
index 7db2764..84fa313 100644
--- a/crates/core/src/util/mod.rs
+++ b/crates/core/src/util/mod.rs
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+pub mod arrow;
pub fn convert_vec_to_slice(vec: &[(String, String, String)]) -> Vec<(&str,
&str, &str)> {
vec.iter()