This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c9f499  fix: include commit_seqno for merge order (#250)
6c9f499 is described below

commit 6c9f499678a7c8dd26394cb7b76f9a0b21c2c328
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 17 22:26:36 2025 -0600

    fix: include commit_seqno for merge order (#250)
    
    - Add `MetaField` enum for Hudi meta fields.
    - Consider `_hoodie_commit_seqno` for merging when precombine values are 
equal
---
 crates/core/src/error.rs               |   3 +
 crates/core/src/lib.rs                 |   1 +
 crates/core/src/merge/record_merger.rs | 169 ++++++++++++++++----------------
 crates/core/src/metadata/meta_field.rs | 174 +++++++++++++++++++++++++++++++++
 crates/core/src/metadata/mod.rs        |  19 ++++
 crates/core/src/util/arrow.rs          | 150 ++++++++++++++++++++++++++++
 crates/core/src/util/mod.rs            |   1 +
 7 files changed, 434 insertions(+), 83 deletions(-)

diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index ed319e3..25e0aec 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -54,6 +54,9 @@ pub enum CoreError {
     #[error("{0}")]
     InvalidPartitionPath(String),
 
+    #[error("{0}")]
+    InvalidValue(String),
+
     #[error(transparent)]
     ParquetError(#[from] parquet::errors::ParquetError),
 
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 4c2e036..97be437 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -48,6 +48,7 @@ pub mod error;
 pub mod expr;
 pub mod file_group;
 pub mod merge;
+pub mod metadata;
 pub mod storage;
 pub mod table;
 pub mod timeline;
diff --git a/crates/core/src/merge/record_merger.rs 
b/crates/core/src/merge/record_merger.rs
index 325f3b0..3a1b6ac 100644
--- a/crates/core/src/merge/record_merger.rs
+++ b/crates/core/src/merge/record_merger.rs
@@ -23,11 +23,13 @@ use crate::config::table::HudiTableConfig::{
 };
 use crate::config::HudiConfigs;
 use crate::merge::RecordMergeStrategyValue;
+use crate::metadata::meta_field::MetaField;
+use crate::util::arrow::lexsort_to_indices;
+use crate::util::arrow::ColumnAsArray;
 use crate::Result;
-use arrow::compute::{sort_to_indices, take_record_batch};
-use arrow::error::ArrowError;
-use arrow_array::{Array, RecordBatch, StringArray, UInt32Array};
-use arrow_schema::{SchemaRef, SortOptions};
+use arrow::compute::take_record_batch;
+use arrow_array::{Array, RecordBatch, UInt32Array};
+use arrow_schema::SchemaRef;
 use arrow_select::concat::concat_batches;
 use std::collections::HashMap;
 use std::str::FromStr;
@@ -114,39 +116,26 @@ impl RecordMerger {
                 }
 
                 let precombine_field = 
self.hudi_configs.get(PrecombineField)?.to::<String>();
-                let precombine_array =
-                    concat_batch
-                        .column_by_name(&precombine_field)
-                        .ok_or_else(|| {
-                            ArrowError::SchemaError(format!("Column 
{precombine_field} not found."))
-                        })?;
-
-                // Sort the precombine values in descending order, and put 
nulls last.
-                let sort_options = SortOptions::new(true, false);
-                let sorted_indices = sort_to_indices(precombine_array, 
Some(sort_options), None)?;
-
-                let record_key_field = "_hoodie_record_key";
-                let record_key_array = concat_batch
-                    .column_by_name(record_key_field)
-                    .ok_or_else(|| {
-                        ArrowError::SchemaError(format!("Column 
{record_key_field} not found."))
-                    })?
-                    .as_any()
-                    .downcast_ref::<StringArray>()
-                    .ok_or_else(|| {
-                        ArrowError::CastError(format!(
-                            "Column {record_key_field} cannot be cast to 
StringArray."
-                        ))
-                    })?;
-
+                let precombine_array = 
concat_batch.get_array(&precombine_field)?;
+                let commit_seqno_array = 
concat_batch.get_array(MetaField::CommitSeqno.as_ref())?;
+                let sorted_indices = lexsort_to_indices(
+                    &[precombine_array.clone(), commit_seqno_array.clone()],
+                    true,
+                );
+
+                let record_key_array =
+                    
concat_batch.get_string_array(MetaField::RecordKey.as_ref())?;
                 let mut keys_and_latest_indices: HashMap<&str, u32> =
                     HashMap::with_capacity(record_key_array.len());
                 for i in sorted_indices.values() {
                     let record_key = record_key_array.value(*i as usize);
                     if keys_and_latest_indices.contains_key(record_key) {
-                        // We sorted the precombine field in descending order, 
so if the record key
-                        // is already in the map, the associated index will be 
already pointing to
-                        // the latest version of that record.
+                        // We sorted the precombine and commit seqno in 
descending order,
+                        // so if the record key is already in the map, the 
associated row index
+                        // will be already pointing to the latest version of 
that record.
+                        // Note that records with the same record key, 
precombine value,
+                        // and commit seqno are considered duplicates, and we 
keep whichever
+                        // comes first in the sorted indices.
                         continue;
                     } else {
                         keys_and_latest_indices.insert(record_key, *i);
@@ -163,20 +152,24 @@ impl RecordMerger {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use arrow_array::Int32Array;
+    use arrow_array::{Int32Array, StringArray};
     use arrow_schema::{DataType, Field, Schema};
 
-    fn create_configs(strategy: &str, meta_fields: bool, precombine: 
Option<&str>) -> HudiConfigs {
+    fn create_configs(
+        strategy: &str,
+        populates_meta_fields: bool,
+        precombine: Option<&str>,
+    ) -> HudiConfigs {
         if let Some(precombine) = precombine {
             HudiConfigs::new([
                 (RecordMergeStrategy, strategy.to_string()),
-                (PopulatesMetaFields, meta_fields.to_string()),
+                (PopulatesMetaFields, populates_meta_fields.to_string()),
                 (PrecombineField, precombine.to_string()),
             ])
         } else {
             HudiConfigs::new([
                 (RecordMergeStrategy, strategy.to_string()),
-                (PopulatesMetaFields, meta_fields.to_string()),
+                (PopulatesMetaFields, populates_meta_fields.to_string()),
             ])
         }
     }
@@ -209,40 +202,56 @@ mod tests {
         SchemaRef::from(schema)
     }
 
-    fn get_sorted_rows(batch: &RecordBatch) -> Vec<(String, i32, i32)> {
+    fn create_test_schema(ts_nullable: bool) -> SchemaRef {
+        create_schema(vec![
+            (MetaField::CommitSeqno.as_ref(), DataType::Utf8, false),
+            (MetaField::RecordKey.as_ref(), DataType::Utf8, false),
+            ("ts", DataType::Int32, ts_nullable),
+            ("value", DataType::Int32, false),
+        ])
+    }
+
+    fn get_sorted_rows(batch: &RecordBatch) -> Vec<(String, String, i32, i32)> 
{
+        let seqno = batch
+            .get_string_array(MetaField::CommitSeqno.as_ref())
+            .unwrap();
         let keys = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .expect("First column must be strings");
-        let timestamps = batch
-            .column(1)
+            .get_string_array(MetaField::RecordKey.as_ref())
+            .unwrap();
+        let timestamps = batch.get_array("ts").unwrap();
+        let timestamps = timestamps
             .as_any()
             .downcast_ref::<Int32Array>()
-            .expect("Second column must be i32");
-        let values = batch
-            .column(2)
+            .unwrap()
+            .clone();
+        let values = batch.get_array("value").unwrap();
+        let values = values
             .as_any()
             .downcast_ref::<Int32Array>()
-            .expect("Third column must be i32");
+            .unwrap()
+            .clone();
 
-        let mut result: Vec<(String, i32, i32)> = keys
+        let mut result: Vec<(String, String, i32, i32)> = seqno
             .iter()
+            .zip(keys.iter())
             .zip(timestamps.iter())
             .zip(values.iter())
-            .map(|((k, t), v)| (k.unwrap().to_string(), t.unwrap(), 
v.unwrap()))
+            .map(|(((s, k), t), v)| {
+                (
+                    s.unwrap().to_string(),
+                    k.unwrap().to_string(),
+                    t.unwrap(),
+                    v.unwrap(),
+                )
+            })
             .collect();
-        result.sort_unstable_by_key(|(k, ts, _)| (k.clone(), *ts));
+        result.sort_unstable_by_key(|(s, k, ts, _)| (k.clone(), *ts, 
s.clone()));
         result
     }
 
     #[test]
     fn test_merge_records_empty() {
-        let schema = create_schema(vec![
-            ("_hoodie_record_key", DataType::Utf8, false),
-            ("ts", DataType::Int32, false),
-            ("value", DataType::Int32, false),
-        ]);
+        let schema = create_test_schema(false);
 
         let configs = create_configs("OVERWRITE_WITH_LATEST", true, 
Some("ts"));
         let merger = RecordMerger::new(Arc::new(configs));
@@ -261,16 +270,13 @@ mod tests {
 
     #[test]
     fn test_merge_records_append_only() {
-        let schema = create_schema(vec![
-            ("_hoodie_record_key", DataType::Utf8, false),
-            ("ts", DataType::Int32, false),
-            ("value", DataType::Int32, false),
-        ]);
+        let schema = create_test_schema(false);
 
         // First batch
         let batch1 = RecordBatch::try_new(
             schema.clone(),
             vec![
+                Arc::new(StringArray::from(vec!["s1", "s1"])),
                 Arc::new(StringArray::from(vec!["k1", "k2"])),
                 Arc::new(Int32Array::from(vec![1, 2])),
                 Arc::new(Int32Array::from(vec![10, 20])),
@@ -282,6 +288,7 @@ mod tests {
         let batch2 = RecordBatch::try_new(
             schema.clone(),
             vec![
+                Arc::new(StringArray::from(vec!["s2", "s2"])),
                 Arc::new(StringArray::from(vec!["k1", "k3"])),
                 Arc::new(Int32Array::from(vec![3, 4])),
                 Arc::new(Int32Array::from(vec![30, 40])),
@@ -302,26 +309,23 @@ mod tests {
         assert_eq!(
             result,
             vec![
-                ("k1".to_string(), 1, 10),
-                ("k1".to_string(), 3, 30),
-                ("k2".to_string(), 2, 20),
-                ("k3".to_string(), 4, 40),
+                ("s1".to_string(), "k1".to_string(), 1, 10),
+                ("s2".to_string(), "k1".to_string(), 3, 30),
+                ("s1".to_string(), "k2".to_string(), 2, 20),
+                ("s2".to_string(), "k3".to_string(), 4, 40),
             ]
         );
     }
 
     #[test]
     fn test_merge_records_nulls() {
-        let schema = create_schema(vec![
-            ("_hoodie_record_key", DataType::Utf8, false),
-            ("ts", DataType::Int32, true), // Nullable timestamp
-            ("value", DataType::Int32, false),
-        ]);
+        let schema = create_test_schema(true);
 
         // First batch with some null timestamps
         let batch1 = RecordBatch::try_new(
             schema.clone(),
             vec![
+                Arc::new(StringArray::from(vec!["s1", "s1", "s1"])),
                 Arc::new(StringArray::from(vec!["k1", "k2", "k3"])),
                 Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
                 Arc::new(Int32Array::from(vec![10, 20, 30])),
@@ -333,6 +337,7 @@ mod tests {
         let batch2 = RecordBatch::try_new(
             schema.clone(),
             vec![
+                Arc::new(StringArray::from(vec!["s2", "s2"])),
                 Arc::new(StringArray::from(vec!["k1", "k2"])),
                 Arc::new(Int32Array::from(vec![None, Some(5)])),
                 Arc::new(Int32Array::from(vec![40, 50])),
@@ -352,25 +357,22 @@ mod tests {
         assert_eq!(
             result,
             vec![
-                ("k1".to_string(), 1, 10), // Keep original since both updates 
have null ts
-                ("k2".to_string(), 5, 50), // Take second value due to higher 
ts
-                ("k3".to_string(), 3, 30), // Unchanged
+                ("s1".to_string(), "k1".to_string(), 1, 10), // Keep original 
since ts is null in 2nd batch
+                ("s2".to_string(), "k2".to_string(), 5, 50), // Take second 
value due to higher ts
+                ("s1".to_string(), "k3".to_string(), 3, 30), // Unchanged
             ]
         );
     }
 
     #[test]
     fn test_merge_records_overwrite_with_latest() {
-        let schema = create_schema(vec![
-            ("_hoodie_record_key", DataType::Utf8, false),
-            ("ts", DataType::Int32, false),
-            ("value", DataType::Int32, false),
-        ]);
+        let schema = create_test_schema(false);
 
         // First batch
         let batch1 = RecordBatch::try_new(
             schema.clone(),
             vec![
+                Arc::new(StringArray::from(vec!["s1", "s1", "s1"])),
                 Arc::new(StringArray::from(vec!["k1", "k2", "k3"])),
                 Arc::new(Int32Array::from(vec![1, 2, 3])),
                 Arc::new(Int32Array::from(vec![10, 20, 30])),
@@ -382,9 +384,10 @@ mod tests {
         let batch2 = RecordBatch::try_new(
             schema.clone(),
             vec![
-                Arc::new(StringArray::from(vec!["k1", "k2"])),
-                Arc::new(Int32Array::from(vec![4, 1])),
-                Arc::new(Int32Array::from(vec![40, 50])),
+                Arc::new(StringArray::from(vec!["s2", "s2", "s2"])),
+                Arc::new(StringArray::from(vec!["k1", "k2", "k3"])),
+                Arc::new(Int32Array::from(vec![4, 1, 3])),
+                Arc::new(Int32Array::from(vec![40, 50, 60])),
             ],
         )
         .unwrap();
@@ -401,9 +404,9 @@ mod tests {
         assert_eq!(
             result,
             vec![
-                ("k1".to_string(), 4, 40), // Latest value due to ts=4
-                ("k2".to_string(), 2, 20), // Original value since ts=1 < ts=2
-                ("k3".to_string(), 3, 30), // Unchanged
+                ("s2".to_string(), "k1".to_string(), 4, 40), // Latest value 
due to ts=4
+                ("s1".to_string(), "k2".to_string(), 2, 20), // Original value 
since ts=1 < ts=2
+                ("s2".to_string(), "k3".to_string(), 3, 60), // Latest value 
due to equal ts and seqno=s2
             ]
         );
     }
diff --git a/crates/core/src/metadata/meta_field.rs 
b/crates/core/src/metadata/meta_field.rs
new file mode 100644
index 0000000..240aeff
--- /dev/null
+++ b/crates/core/src/metadata/meta_field.rs
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::CoreError;
+use crate::Result;
+use std::fmt::Display;
+use std::str::FromStr;
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum MetaField {
+    CommitTime = 0,
+    CommitSeqno = 1,
+    RecordKey = 2,
+    PartitionPath = 3,
+    FileName = 4,
+    Operation = 5,
+}
+
+impl AsRef<str> for MetaField {
+    fn as_ref(&self) -> &str {
+        match self {
+            MetaField::CommitTime => "_hoodie_commit_time",
+            MetaField::CommitSeqno => "_hoodie_commit_seqno",
+            MetaField::RecordKey => "_hoodie_record_key",
+            MetaField::PartitionPath => "_hoodie_partition_path",
+            MetaField::FileName => "_hoodie_file_name",
+            MetaField::Operation => "_hoodie_operation",
+        }
+    }
+}
+
+impl Display for MetaField {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.as_ref())
+    }
+}
+
+impl FromStr for MetaField {
+    type Err = CoreError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "_hoodie_commit_time" => Ok(MetaField::CommitTime),
+            "_hoodie_commit_seqno" => Ok(MetaField::CommitSeqno),
+            "_hoodie_record_key" => Ok(MetaField::RecordKey),
+            "_hoodie_partition_path" => Ok(MetaField::PartitionPath),
+            "_hoodie_file_name" => Ok(MetaField::FileName),
+            "_hoodie_operation" => Ok(MetaField::Operation),
+            _ => Err(CoreError::InvalidValue(s.to_string())),
+        }
+    }
+}
+
+impl MetaField {
+    #[inline]
+    pub fn field_index(&self) -> usize {
+        self.clone() as usize
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_field_index() {
+        assert_eq!(MetaField::CommitTime.field_index(), 0);
+        assert_eq!(MetaField::CommitSeqno.field_index(), 1);
+        assert_eq!(MetaField::RecordKey.field_index(), 2);
+        assert_eq!(MetaField::PartitionPath.field_index(), 3);
+        assert_eq!(MetaField::FileName.field_index(), 4);
+        assert_eq!(MetaField::Operation.field_index(), 5);
+    }
+
+    #[test]
+    fn test_as_ref() {
+        assert_eq!(MetaField::CommitTime.as_ref(), "_hoodie_commit_time");
+        assert_eq!(MetaField::CommitSeqno.as_ref(), "_hoodie_commit_seqno");
+        assert_eq!(MetaField::RecordKey.as_ref(), "_hoodie_record_key");
+        assert_eq!(MetaField::PartitionPath.as_ref(), 
"_hoodie_partition_path");
+        assert_eq!(MetaField::FileName.as_ref(), "_hoodie_file_name");
+        assert_eq!(MetaField::Operation.as_ref(), "_hoodie_operation");
+    }
+
+    #[test]
+    fn test_display() {
+        assert_eq!(MetaField::CommitTime.to_string(), "_hoodie_commit_time");
+        assert_eq!(
+            format!("{}", MetaField::CommitSeqno),
+            "_hoodie_commit_seqno"
+        );
+        assert_eq!(MetaField::RecordKey.to_string(), "_hoodie_record_key");
+    }
+
+    #[test]
+    fn test_from_str_valid() -> Result<(), CoreError> {
+        assert_eq!(
+            MetaField::from_str("_hoodie_commit_time")?,
+            MetaField::CommitTime
+        );
+        assert_eq!(
+            MetaField::from_str("_hoodie_commit_seqno")?,
+            MetaField::CommitSeqno
+        );
+        assert_eq!(
+            MetaField::from_str("_hoodie_record_key")?,
+            MetaField::RecordKey
+        );
+        assert_eq!(
+            MetaField::from_str("_hoodie_partition_path")?,
+            MetaField::PartitionPath
+        );
+        assert_eq!(
+            MetaField::from_str("_hoodie_file_name")?,
+            MetaField::FileName
+        );
+        assert_eq!(
+            MetaField::from_str("_hoodie_operation")?,
+            MetaField::Operation
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn test_from_str_invalid() {
+        assert!(matches!(
+            MetaField::from_str(""),
+            Err(CoreError::InvalidValue(_))
+        ));
+        assert!(matches!(
+            MetaField::from_str("_hoodie_invalid"),
+            Err(CoreError::InvalidValue(_))
+        ));
+        assert!(matches!(
+            MetaField::from_str("invalid"),
+            Err(CoreError::InvalidValue(_))
+        ));
+    }
+
+    #[test]
+    fn test_roundtrip() -> Result<(), CoreError> {
+        // Test conversion from enum -> string -> enum
+        let fields = [
+            MetaField::CommitTime,
+            MetaField::CommitSeqno,
+            MetaField::RecordKey,
+            MetaField::PartitionPath,
+            MetaField::FileName,
+            MetaField::Operation,
+        ];
+
+        for field in fields {
+            let s = field.to_string();
+            let parsed = MetaField::from_str(&s)?;
+            assert_eq!(field, parsed);
+        }
+        Ok(())
+    }
+}
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs
new file mode 100644
index 0000000..48133fd
--- /dev/null
+++ b/crates/core/src/metadata/mod.rs
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+pub mod meta_field;
diff --git a/crates/core/src/util/arrow.rs b/crates/core/src/util/arrow.rs
new file mode 100644
index 0000000..fd8c1f4
--- /dev/null
+++ b/crates/core/src/util/arrow.rs
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::Result;
+use arrow::array::ArrayRef;
+use arrow::array::RecordBatch;
+use arrow::array::StringArray;
+use arrow_array::{Array, UInt32Array};
+use arrow_row::{RowConverter, SortField};
+use arrow_schema::ArrowError;
+
+pub trait ColumnAsArray {
+    fn get_array(&self, column_name: &str) -> Result<ArrayRef>;
+
+    fn get_string_array(&self, column_name: &str) -> Result<StringArray>;
+}
+
+impl ColumnAsArray for RecordBatch {
+    fn get_array(&self, column_name: &str) -> Result<ArrayRef> {
+        let index = self.schema().index_of(column_name)?;
+        let array = self.column(index);
+        Ok(array.clone())
+    }
+
+    fn get_string_array(&self, column_name: &str) -> Result<StringArray> {
+        let array = self.get_array(column_name)?;
+        let array = array
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .ok_or_else(|| {
+                ArrowError::CastError(format!(
+                    "Column {column_name} cannot be cast to StringArray."
+                ))
+            })?;
+        Ok(array.clone())
+    }
+}
+
+pub fn lexsort_to_indices(arrays: &[ArrayRef], desc: bool) -> UInt32Array {
+    let fields = arrays
+        .iter()
+        .map(|a| SortField::new(a.data_type().clone()))
+        .collect();
+    let converter = RowConverter::new(fields).unwrap();
+    let rows = converter.convert_columns(arrays).unwrap();
+    let mut sort: Vec<_> = rows.iter().enumerate().collect();
+    if desc {
+        sort.sort_unstable_by(|(_, a), (_, b)| b.cmp(a));
+    } else {
+        sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
+    }
+    UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Int32Array, StringArray};
+    use arrow_array::Float64Array;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_basic_int_sort() {
+        let arr = Int32Array::from(vec![3, 1, 4, 1, 5]);
+        let arrays = vec![Arc::new(arr) as ArrayRef];
+
+        // Test ascending
+        let result = lexsort_to_indices(&arrays, false);
+        assert_eq!(
+            result.values(),
+            &[1, 3, 0, 2, 4] // Indices that would sort to [1,1,3,4,5]
+        );
+
+        // Test descending
+        let result = lexsort_to_indices(&arrays, true);
+        assert_eq!(
+            result.values(),
+            &[4, 2, 0, 1, 3] // Indices that would sort to [5,4,3,1,1]
+        );
+    }
+
+    #[test]
+    fn test_multiple_columns() {
+        let arr1 = Int32Array::from(vec![1, 1, 2, 2]);
+        let arr2 = StringArray::from(vec!["b", "a", "b", "a"]);
+        let arrays = vec![Arc::new(arr1) as ArrayRef, Arc::new(arr2) as 
ArrayRef];
+
+        let result = lexsort_to_indices(&arrays, false);
+        assert_eq!(
+            result.values(),
+            &[1, 0, 3, 2] // Should sort by first column then second
+        );
+    }
+
+    #[test]
+    fn test_edge_cases() {
+        // Empty array
+        assert_eq!(lexsort_to_indices(&[], false).len(), 0);
+
+        // Array of empty array
+        let arr = Int32Array::from(vec![] as Vec<i32>);
+        let arrays = vec![Arc::new(arr) as ArrayRef];
+        let result = lexsort_to_indices(&arrays, false);
+        assert_eq!(result.len(), 0);
+
+        // Single element
+        let arr = Int32Array::from(vec![1]);
+        let arrays = vec![Arc::new(arr) as ArrayRef];
+        let result = lexsort_to_indices(&arrays, false);
+        assert_eq!(result.values(), &[0]);
+
+        // All equal values
+        let arr = Int32Array::from(vec![5, 5, 5, 5]);
+        let arrays = vec![Arc::new(arr) as ArrayRef];
+        let result = lexsort_to_indices(&arrays, false);
+        assert_eq!(result.values(), &[0, 1, 2, 3]);
+    }
+
+    #[test]
+    fn test_different_types() {
+        let int_arr = Int32Array::from(vec![1, 2, 1]);
+        let str_arr = StringArray::from(vec!["a", "b", "c"]);
+        let float_arr = Float64Array::from(vec![1.0, 2.0, 3.0]);
+
+        let arrays = vec![
+            Arc::new(int_arr) as ArrayRef,
+            Arc::new(str_arr) as ArrayRef,
+            Arc::new(float_arr) as ArrayRef,
+        ];
+
+        let result = lexsort_to_indices(&arrays, false);
+        assert_eq!(result.values(), &[0, 2, 1]);
+    }
+}
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs
index 7db2764..84fa313 100644
--- a/crates/core/src/util/mod.rs
+++ b/crates/core/src/util/mod.rs
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+pub mod arrow;
 
 pub fn convert_vec_to_slice(vec: &[(String, String, String)]) -> Vec<(&str, 
&str, &str)> {
     vec.iter()

Reply via email to