This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 221f5d2fe9 Datum based like kernels (#4595) (#4732)
221f5d2fe9 is described below

commit 221f5d2fe910afe15d7f7d35a87a803914451d29
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Aug 25 20:24:14 2023 +0100

    Datum based like kernels (#4595) (#4732)
    
    * Datum based like kernels (#4595)
    
    * Clippy
    
    * More Clippy
    
    * Review feedback
---
 arrow-flight/src/sql/metadata/db_schemas.rs |  14 +-
 arrow-flight/src/sql/metadata/tables.rs     |  22 +-
 arrow-string/src/lib.rs                     |   1 +
 arrow-string/src/like.rs                    | 961 +++++++++-------------------
 arrow-string/src/predicate.rs               | 229 +++++++
 arrow/benches/comparison_kernels.rs         |  33 +-
 6 files changed, 555 insertions(+), 705 deletions(-)

diff --git a/arrow-flight/src/sql/metadata/db_schemas.rs 
b/arrow-flight/src/sql/metadata/db_schemas.rs
index 20780a1160..642802b058 100644
--- a/arrow-flight/src/sql/metadata/db_schemas.rs
+++ b/arrow-flight/src/sql/metadata/db_schemas.rs
@@ -22,11 +22,11 @@
 use std::sync::Arc;
 
 use arrow_arith::boolean::and;
-use arrow_array::{builder::StringBuilder, ArrayRef, RecordBatch, Scalar, 
StringArray};
+use arrow_array::{builder::StringBuilder, ArrayRef, RecordBatch, StringArray};
 use arrow_ord::cmp::eq;
 use arrow_schema::{DataType, Field, Schema, SchemaRef};
 use arrow_select::{filter::filter_record_batch, take::take};
-use arrow_string::like::like_utf8_scalar;
+use arrow_string::like::like;
 use once_cell::sync::Lazy;
 
 use super::lexsort_to_indices;
@@ -122,15 +122,13 @@ impl GetDbSchemasBuilder {
 
         if let Some(db_schema_filter_pattern) = db_schema_filter_pattern {
             // use like kernel to get wildcard matching
-            filters.push(like_utf8_scalar(
-                &db_schema_name,
-                &db_schema_filter_pattern,
-            )?)
+            let scalar = StringArray::new_scalar(db_schema_filter_pattern);
+            filters.push(like(&db_schema_name, &scalar)?)
         }
 
         if let Some(catalog_filter_name) = catalog_filter {
-            let scalar = StringArray::from_iter_values([catalog_filter_name]);
-            filters.push(eq(&catalog_name, &Scalar::new(&scalar))?);
+            let scalar = StringArray::new_scalar(catalog_filter_name);
+            filters.push(eq(&catalog_name, &scalar)?);
         }
 
         // `AND` any filters together
diff --git a/arrow-flight/src/sql/metadata/tables.rs 
b/arrow-flight/src/sql/metadata/tables.rs
index de55f0624f..00502a76db 100644
--- a/arrow-flight/src/sql/metadata/tables.rs
+++ b/arrow-flight/src/sql/metadata/tables.rs
@@ -23,11 +23,11 @@ use std::sync::Arc;
 
 use arrow_arith::boolean::{and, or};
 use arrow_array::builder::{BinaryBuilder, StringBuilder};
-use arrow_array::{ArrayRef, RecordBatch, Scalar, StringArray};
+use arrow_array::{ArrayRef, RecordBatch, StringArray};
 use arrow_ord::cmp::eq;
 use arrow_schema::{DataType, Field, Schema, SchemaRef};
 use arrow_select::{filter::filter_record_batch, take::take};
-use arrow_string::like::like_utf8_scalar;
+use arrow_string::like::like;
 use once_cell::sync::Lazy;
 
 use super::lexsort_to_indices;
@@ -184,16 +184,13 @@ impl GetTablesBuilder {
         let mut filters = vec![];
 
         if let Some(catalog_filter_name) = catalog_filter {
-            let scalar = StringArray::from_iter_values([catalog_filter_name]);
-            filters.push(eq(&catalog_name, &Scalar::new(&scalar))?);
+            let scalar = StringArray::new_scalar(catalog_filter_name);
+            filters.push(eq(&catalog_name, &scalar)?);
         }
 
         let tt_filter = table_types_filter
             .into_iter()
-            .map(|tt| {
-                let scalar = StringArray::from_iter_values([tt]);
-                eq(&table_type, &Scalar::new(&scalar))
-            })
+            .map(|tt| eq(&table_type, &StringArray::new_scalar(tt)))
             .collect::<std::result::Result<Vec<_>, _>>()?
             .into_iter()
             // We know the arrays are of same length as they are produced 
fromn the same root array
@@ -204,15 +201,14 @@ impl GetTablesBuilder {
 
         if let Some(db_schema_filter_pattern) = db_schema_filter_pattern {
             // use like kernel to get wildcard matching
-            filters.push(like_utf8_scalar(
-                &db_schema_name,
-                &db_schema_filter_pattern,
-            )?)
+            let scalar = StringArray::new_scalar(db_schema_filter_pattern);
+            filters.push(like(&db_schema_name, &scalar)?)
         }
 
         if let Some(table_name_filter_pattern) = table_name_filter_pattern {
             // use like kernel to get wildcard matching
-            filters.push(like_utf8_scalar(&table_name, 
&table_name_filter_pattern)?)
+            let scalar = StringArray::new_scalar(table_name_filter_pattern);
+            filters.push(like(&table_name, &scalar)?)
         }
 
         let batch = if let Some(table_schema) = table_schema {
diff --git a/arrow-string/src/lib.rs b/arrow-string/src/lib.rs
index 4bd4d28265..4444b37a77 100644
--- a/arrow-string/src/lib.rs
+++ b/arrow-string/src/lib.rs
@@ -20,5 +20,6 @@
 pub mod concat_elements;
 pub mod length;
 pub mod like;
+mod predicate;
 pub mod regexp;
 pub mod substring;
diff --git a/arrow-string/src/like.rs b/arrow-string/src/like.rs
index 57cc22f2c5..412f1e6cc8 100644
--- a/arrow-string/src/like.rs
+++ b/arrow-string/src/like.rs
@@ -15,227 +15,37 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow_array::builder::BooleanBufferBuilder;
-use arrow_array::cast::*;
+use crate::predicate::Predicate;
+use arrow_array::cast::AsArray;
 use arrow_array::*;
-use arrow_buffer::NullBuffer;
-use arrow_data::ArrayDataBuilder;
 use arrow_schema::*;
 use arrow_select::take::take;
-use regex::Regex;
-use std::collections::HashMap;
-
-/// Helper function to perform boolean lambda function on values from two 
array accessors, this
-/// version does not attempt to use SIMD.
-///
-/// Duplicated from `arrow_ord::comparison`
-fn compare_op<T: ArrayAccessor, S: ArrayAccessor, F>(
-    left: T,
-    right: S,
-    op: F,
-) -> Result<BooleanArray, ArrowError>
-where
-    F: Fn(T::Item, S::Item) -> bool,
-{
-    if left.len() != right.len() {
-        return Err(ArrowError::ComputeError(
-            "Cannot perform comparison operation on arrays of different length"
-                .to_string(),
-        ));
-    }
-
-    Ok(BooleanArray::from_binary(left, right, op))
-}
-
-/// Helper function to perform boolean lambda function on values from array 
accessor, this
-/// version does not attempt to use SIMD.
-///
-/// Duplicated from `arrow_ord::comparison`
-fn compare_op_scalar<T: ArrayAccessor, F>(
-    left: T,
-    op: F,
-) -> Result<BooleanArray, ArrowError>
-where
-    F: Fn(T::Item) -> bool,
-{
-    Ok(BooleanArray::from_unary(left, op))
-}
-
-macro_rules! dyn_function {
-    ($sql:tt, $fn_name:tt, $fn_utf8:tt, $fn_dict:tt) => {
-#[doc = concat!("Perform SQL `", $sql ,"` operation on [`StringArray`] /")]
-/// [`LargeStringArray`], or [`DictionaryArray`] with values
-/// [`StringArray`]/[`LargeStringArray`].
-///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn $fn_name(left: &dyn Array, right: &dyn Array) -> Result<BooleanArray, 
ArrowError> {
-    match (left.data_type(), right.data_type()) {
-        (DataType::Utf8, DataType::Utf8)  => {
-            let left = left.as_string::<i32>();
-            let right = right.as_string::<i32>();
-            $fn_utf8(left, right)
-        }
-        (DataType::LargeUtf8, DataType::LargeUtf8) => {
-            let left = left.as_string::<i64>();
-            let right = right.as_string::<i64>();
-            $fn_utf8(left, right)
-        }
-        #[cfg(feature = "dyn_cmp_dict")]
-        (DataType::Dictionary(_, _), DataType::Dictionary(_, _)) => {
-            downcast_dictionary_array!(
-                left => {
-                    let right = as_dictionary_array(right);
-                    $fn_dict(left, right)
-                }
-                t => Err(ArrowError::ComputeError(format!(
-                    "Should be DictionaryArray but got: {}", t
-                )))
-            )
-        }
-        _ => {
-            Err(ArrowError::ComputeError(format!(
-                "{} only supports Utf8, LargeUtf8 or DictionaryArray (with 
feature `dyn_cmp_dict`) with Utf8 or LargeUtf8 values",
-                stringify!($fn_name)
-            )))
-        }
-    }
-}
-
-    }
-}
-dyn_function!("left LIKE right", like_dyn, like_utf8, like_dict);
-dyn_function!("left NOT LIKE right", nlike_dyn, nlike_utf8, nlike_dict);
-dyn_function!("left ILIKE right", ilike_dyn, ilike_utf8, ilike_dict);
-dyn_function!("left NOT ILIKE right", nilike_dyn, nilike_utf8, nilike_dict);
-dyn_function!(
-    "STARTSWITH(left, right)",
-    starts_with_dyn,
-    starts_with_utf8,
-    starts_with_dict
-);
-dyn_function!(
-    "ENDSWITH(left, right)",
-    ends_with_dyn,
-    ends_with_utf8,
-    ends_with_dict
-);
-dyn_function!(
-    "CONTAINS(left, right)",
-    contains_dyn,
-    contains_utf8,
-    contains_dict
-);
-
-macro_rules! scalar_dyn_function {
-    ($sql:tt, $fn_name:tt, $fn_scalar:tt) => {
-#[doc = concat!("Perform SQL `", $sql ,"` operation on [`StringArray`] /")]
-/// [`LargeStringArray`], or [`DictionaryArray`] with values
-/// [`StringArray`]/[`LargeStringArray`] and a scalar.
-///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn $fn_name(
-    left: &dyn Array,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    match left.data_type() {
-        DataType::Utf8 => {
-            let left = left.as_string::<i32>();
-            $fn_scalar(left, right)
-        }
-        DataType::LargeUtf8 => {
-            let left = left.as_string::<i64>();
-            $fn_scalar(left, right)
-        }
-        DataType::Dictionary(_, _) => {
-            downcast_dictionary_array!(
-                left => {
-                    let dict_comparison = $fn_name(left.values().as_ref(), 
right)?;
-                    // TODO: Use take_boolean (#2967)
-                    let array = take(&dict_comparison, left.keys(), None)?;
-                    Ok(BooleanArray::from(array.to_data()))
-                }
-                t => Err(ArrowError::ComputeError(format!(
-                    "Should be DictionaryArray but got: {}", t
-                )))
-            )
-        }
-        _ => {
-            Err(ArrowError::ComputeError(format!(
-                "{} only supports Utf8, LargeUtf8 or DictionaryArray with Utf8 
or LargeUtf8 values",
-                stringify!($fn_name)
-            )))
-        }
-    }
-}
-    }
+use std::sync::Arc;
+
+#[derive(Debug)]
+enum Op {
+    Like(bool),
+    ILike(bool),
+    Contains,
+    StartsWith,
+    EndsWith,
 }
-scalar_dyn_function!("left LIKE right", like_utf8_scalar_dyn, like_scalar);
-scalar_dyn_function!("left NOT LIKE right", nlike_utf8_scalar_dyn, 
nlike_scalar);
-scalar_dyn_function!("left ILIKE right", ilike_utf8_scalar_dyn, ilike_scalar);
-scalar_dyn_function!(
-    "left NOT ILIKE right",
-    nilike_utf8_scalar_dyn,
-    nilike_scalar
-);
-scalar_dyn_function!(
-    "STARTSWITH(left, right)",
-    starts_with_utf8_scalar_dyn,
-    starts_with_scalar
-);
-scalar_dyn_function!(
-    "ENDSWITH(left, right)",
-    ends_with_utf8_scalar_dyn,
-    ends_with_scalar
-);
-scalar_dyn_function!(
-    "CONTAINS(left, right)",
-    contains_utf8_scalar_dyn,
-    contains_scalar
-);
-
-macro_rules! dict_function {
-    ($sql:tt, $fn_name:tt, $fn_impl:tt) => {
-
-#[doc = concat!("Perform SQL `", $sql ,"` operation on [`DictionaryArray`] 
with values")]
-/// [`StringArray`]/[`LargeStringArray`].
-///
-/// See the documentation on [`like_utf8`] for more details.
-#[cfg(feature = "dyn_cmp_dict")]
-fn $fn_name<K: arrow_array::types::ArrowDictionaryKeyType>(
-    left: &DictionaryArray<K>,
-    right: &DictionaryArray<K>,
-) -> Result<BooleanArray, ArrowError> {
-    match (left.value_type(), right.value_type()) {
-        (DataType::Utf8, DataType::Utf8) => {
-            let left = 
left.downcast_dict::<GenericStringArray<i32>>().unwrap();
-            let right = 
right.downcast_dict::<GenericStringArray<i32>>().unwrap();
 
-            $fn_impl(left, right)
+impl std::fmt::Display for Op {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Op::Like(false) => write!(f, "LIKE"),
+            Op::Like(true) => write!(f, "NLIKE"),
+            Op::ILike(false) => write!(f, "ILIKE"),
+            Op::ILike(true) => write!(f, "NILIKE"),
+            Op::Contains => write!(f, "CONTAINS"),
+            Op::StartsWith => write!(f, "STARTS_WITH"),
+            Op::EndsWith => write!(f, "ENDS_WITH"),
         }
-        (DataType::LargeUtf8, DataType::LargeUtf8) => {
-            let left = 
left.downcast_dict::<GenericStringArray<i64>>().unwrap();
-            let right = 
right.downcast_dict::<GenericStringArray<i64>>().unwrap();
-
-            $fn_impl(left, right)
-        }
-        _ => Err(ArrowError::ComputeError(format!(
-            "{} only supports DictionaryArray with Utf8 or LargeUtf8 values",
-            stringify!($fn_name)
-        ))),
     }
 }
-    }
-}
-
-dict_function!("left LIKE right", like_dict, like);
-dict_function!("left NOT LIKE right", nlike_dict, nlike);
-dict_function!("left ILIKE right", ilike_dict, ilike);
-dict_function!("left NOT ILIKE right", nilike_dict, nilike);
-dict_function!("STARTSWITH(left, right)", starts_with_dict, starts_with);
-dict_function!("ENDSWITH(left, right)", ends_with_dict, ends_with);
-dict_function!("CONTAINS(left, right)", contains_dict, contains);
 
-/// Perform SQL `left LIKE right` operation on [`StringArray`] / 
[`LargeStringArray`].
+/// Perform SQL `left LIKE right`
 ///
 /// There are two wildcards supported with the LIKE operator:
 ///
@@ -244,490 +54,337 @@ dict_function!("CONTAINS(left, right)", contains_dict, 
contains);
 ///
 /// For example:
 /// ```
-/// use arrow_array::{StringArray, BooleanArray};
-/// use arrow_string::like::like_utf8;
-///
+/// # use arrow_array::{StringArray, BooleanArray};
+/// # use arrow_string::like::like;
+/// #
 /// let strings = StringArray::from(vec!["Arrow", "Arrow", "Arrow", "Ar"]);
 /// let patterns = StringArray::from(vec!["A%", "B%", "A.", "A_"]);
 ///
-/// let result = like_utf8(&strings, &patterns).unwrap();
+/// let result = like(&strings, &patterns).unwrap();
 /// assert_eq!(result, BooleanArray::from(vec![true, false, false, true]));
 /// ```
-pub fn like_utf8<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &GenericStringArray<OffsetSize>,
-) -> Result<BooleanArray, ArrowError> {
-    like(left, right)
-}
-
-#[inline]
-fn like<'a, S: ArrayAccessor<Item = &'a str>>(
-    left: S,
-    right: S,
-) -> Result<BooleanArray, ArrowError> {
-    regex_like(left, right, false, |re_pattern| {
-        Regex::new(&format!("(?s)^{re_pattern}$")).map_err(|e| {
-            ArrowError::ComputeError(format!(
-                "Unable to build regex from LIKE pattern: {e}"
-            ))
-        })
-    })
-}
-
-#[inline]
-fn like_scalar_op<'a, F: Fn(bool) -> bool, L: ArrayAccessor<Item = &'a str>>(
-    left: L,
-    right: &str,
-    op: F,
-) -> Result<BooleanArray, ArrowError> {
-    if !right.contains(is_like_pattern) {
-        // fast path, can use equals
-        Ok(BooleanArray::from_unary(left, |item| op(item == right)))
-    } else if right.ends_with('%')
-        && !right.ends_with("\\%")
-        && !right[..right.len() - 1].contains(is_like_pattern)
-    {
-        // fast path, can use starts_with
-        let starts_with = &right[..right.len() - 1];
-
-        Ok(BooleanArray::from_unary(left, |item| {
-            op(item.starts_with(starts_with))
-        }))
-    } else if right.starts_with('%') && !right[1..].contains(is_like_pattern) {
-        // fast path, can use ends_with
-        let ends_with = &right[1..];
-
-        Ok(BooleanArray::from_unary(left, |item| {
-            op(item.ends_with(ends_with))
-        }))
-    } else if right.starts_with('%')
-        && right.ends_with('%')
-        && !right.ends_with("\\%")
-        && !right[1..right.len() - 1].contains(is_like_pattern)
-    {
-        let contains = &right[1..right.len() - 1];
-
-        Ok(BooleanArray::from_unary(left, |item| {
-            op(item.contains(contains))
-        }))
-    } else {
-        let re_pattern = replace_like_wildcards(right)?;
-        let re = Regex::new(&format!("(?s)^{re_pattern}$")).map_err(|e| {
-            ArrowError::ComputeError(format!(
-                "Unable to build regex from LIKE pattern: {e}"
-            ))
-        })?;
-
-        Ok(BooleanArray::from_unary(left, |item| op(re.is_match(item))))
-    }
-}
-
-#[inline]
-fn like_scalar<'a, L: ArrayAccessor<Item = &'a str>>(
-    left: L,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    like_scalar_op(left, right, |x| x)
-}
-
-/// Perform SQL `left LIKE right` operation on [`StringArray`] /
-/// [`LargeStringArray`] and a scalar.
-///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn like_utf8_scalar<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    like_scalar(left, right)
-}
-
-/// Transforms a like `pattern` to a regex compatible pattern. To achieve 
that, it does:
-///
-/// 1. Replace like wildcards for regex expressions as the pattern will be 
evaluated using regex match: `%` => `.*` and `_` => `.`
-/// 2. Escape regex meta characters to match them and not be evaluated as 
regex special chars. For example: `.` => `\\.`
-/// 3. Replace escaped like wildcards removing the escape characters to be 
able to match it as a regex. For example: `\\%` => `%`
-fn replace_like_wildcards(pattern: &str) -> Result<String, ArrowError> {
-    let mut result = String::new();
-    let pattern = String::from(pattern);
-    let mut chars_iter = pattern.chars().peekable();
-    while let Some(c) = chars_iter.next() {
-        if c == '\\' {
-            let next = chars_iter.peek();
-            match next {
-                Some(next) if is_like_pattern(*next) => {
-                    result.push(*next);
-                    // Skipping the next char as it is already appended
-                    chars_iter.next();
-                }
-                _ => {
-                    result.push('\\');
-                    result.push('\\');
-                }
-            }
-        } else if regex_syntax::is_meta_character(c) {
-            result.push('\\');
-            result.push(c);
-        } else if c == '%' {
-            result.push_str(".*");
-        } else if c == '_' {
-            result.push('.');
-        } else {
-            result.push(c);
-        }
-    }
-    Ok(result)
-}
-
-/// Perform SQL `left NOT LIKE right` operation on [`StringArray`] /
-/// [`LargeStringArray`].
-///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn nlike_utf8<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &GenericStringArray<OffsetSize>,
-) -> Result<BooleanArray, ArrowError> {
-    nlike(left, right)
-}
-
-#[inline]
-fn nlike<'a, S: ArrayAccessor<Item = &'a str>>(
-    left: S,
-    right: S,
-) -> Result<BooleanArray, ArrowError> {
-    regex_like(left, right, true, |re_pattern| {
-        Regex::new(&format!("(?s)^{re_pattern}$")).map_err(|e| {
-            ArrowError::ComputeError(format!(
-                "Unable to build regex from LIKE pattern: {e}"
-            ))
-        })
-    })
+pub fn like(left: &dyn Datum, right: &dyn Datum) -> Result<BooleanArray, 
ArrowError> {
+    like_op(Op::Like(false), left, right)
 }
 
-#[inline]
-fn nlike_scalar<'a, L: ArrayAccessor<Item = &'a str>>(
-    left: L,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    like_scalar_op(left, right, |x| !x)
-}
-
-/// Perform SQL `left NOT LIKE right` operation on [`StringArray`] /
-/// [`LargeStringArray`] and a scalar.
+/// Perform SQL `left ILIKE right`
 ///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn nlike_utf8_scalar<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    nlike_scalar(left, right)
-}
-
-/// Perform SQL `left ILIKE right` operation on [`StringArray`] /
-/// [`LargeStringArray`].
-///
-/// Case insensitive version of [`like_utf8`]
+/// This is a case-insensitive version of [`like`]
 ///
 /// Note: this only implements loose matching as defined by the Unicode 
standard. For example,
 /// the `ff` ligature is not equivalent to `FF` and `ß` is not equivalent to 
`SS`
-pub fn ilike_utf8<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &GenericStringArray<OffsetSize>,
-) -> Result<BooleanArray, ArrowError> {
-    ilike(left, right)
+pub fn ilike(left: &dyn Datum, right: &dyn Datum) -> Result<BooleanArray, 
ArrowError> {
+    like_op(Op::ILike(false), left, right)
 }
 
-#[inline]
-fn ilike<'a, S: ArrayAccessor<Item = &'a str>>(
-    left: S,
-    right: S,
-) -> Result<BooleanArray, ArrowError> {
-    regex_like(left, right, false, |re_pattern| {
-        Regex::new(&format!("(?is)^{re_pattern}$")).map_err(|e| {
-            ArrowError::ComputeError(format!(
-                "Unable to build regex from ILIKE pattern: {e}"
-            ))
-        })
-    })
-}
-
-#[inline]
-fn ilike_scalar_op<O: OffsetSizeTrait, F: Fn(bool) -> bool>(
-    left: &GenericStringArray<O>,
-    right: &str,
-    op: F,
-) -> Result<BooleanArray, ArrowError> {
-    // If not ASCII faster to use case insensitive regex than using 
to_uppercase
-    if right.is_ascii() && left.is_ascii() {
-        if !right.contains(is_like_pattern) {
-            return Ok(BooleanArray::from_unary(left, |item| {
-                op(item.eq_ignore_ascii_case(right))
-            }));
-        } else if right.ends_with('%')
-            && !right.ends_with("\\%")
-            && !right[..right.len() - 1].contains(is_like_pattern)
-        {
-            // fast path, can use starts_with
-            let start_str = &right[..right.len() - 1];
-            return Ok(BooleanArray::from_unary(left, |item| {
-                let end = item.len().min(start_str.len());
-                let result = item.is_char_boundary(end)
-                    && start_str.eq_ignore_ascii_case(&item[..end]);
-                op(result)
-            }));
-        } else if right.starts_with('%') && 
!right[1..].contains(is_like_pattern) {
-            // fast path, can use ends_with
-            let ends_str = &right[1..];
-            return Ok(BooleanArray::from_unary(left, |item| {
-                let start = item.len().saturating_sub(ends_str.len());
-                let result = item.is_char_boundary(start)
-                    && ends_str.eq_ignore_ascii_case(&item[start..]);
-                op(result)
-            }));
-        }
-    }
-
-    let re_pattern = replace_like_wildcards(right)?;
-    let re = Regex::new(&format!("(?is)^{re_pattern}$")).map_err(|e| {
-        ArrowError::ComputeError(format!("Unable to build regex from ILIKE 
pattern: {e}"))
-    })?;
-
-    Ok(BooleanArray::from_unary(left, |item| op(re.is_match(item))))
-}
-
-#[inline]
-fn ilike_scalar<O: OffsetSizeTrait>(
-    left: &GenericStringArray<O>,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    ilike_scalar_op(left, right, |x| x)
-}
-
-/// Perform SQL `left ILIKE right` operation on [`StringArray`] /
-/// [`LargeStringArray`] and a scalar.
+/// Perform SQL `left NOT LIKE right`
 ///
-/// See the documentation on [`ilike_utf8`] for more details.
-pub fn ilike_utf8_scalar<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    ilike_scalar(left, right)
+/// See the documentation on [`like`] for more details
+pub fn nlike(left: &dyn Datum, right: &dyn Datum) -> Result<BooleanArray, 
ArrowError> {
+    like_op(Op::Like(true), left, right)
 }
 
-/// Perform SQL `left NOT ILIKE right` operation on [`StringArray`] /
-/// [`LargeStringArray`].
+/// Perform SQL `left NOT ILIKE right`
 ///
-/// See the documentation on [`ilike_utf8`] for more details.
-pub fn nilike_utf8<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &GenericStringArray<OffsetSize>,
-) -> Result<BooleanArray, ArrowError> {
-    nilike(left, right)
+/// See the documentation on [`ilike`] for more details
+pub fn nilike(left: &dyn Datum, right: &dyn Datum) -> Result<BooleanArray, 
ArrowError> {
+    like_op(Op::ILike(true), left, right)
 }
 
-#[inline]
-fn nilike<'a, S: ArrayAccessor<Item = &'a str>>(
-    left: S,
-    right: S,
+/// Perform SQL `STARTSWITH(left, right)`
+pub fn starts_with(
+    left: &dyn Datum,
+    right: &dyn Datum,
 ) -> Result<BooleanArray, ArrowError> {
-    regex_like(left, right, true, |re_pattern| {
-        Regex::new(&format!("(?is)^{re_pattern}$")).map_err(|e| {
-            ArrowError::ComputeError(format!(
-                "Unable to build regex from ILIKE pattern: {e}"
-            ))
-        })
-    })
+    like_op(Op::StartsWith, left, right)
 }
 
-#[inline]
-fn nilike_scalar<O: OffsetSizeTrait>(
-    left: &GenericStringArray<O>,
-    right: &str,
+/// Perform SQL `ENDSWITH(left, right)`
+pub fn ends_with(
+    left: &dyn Datum,
+    right: &dyn Datum,
 ) -> Result<BooleanArray, ArrowError> {
-    ilike_scalar_op(left, right, |x| !x)
+    like_op(Op::EndsWith, left, right)
 }
 
-/// Perform SQL `left NOT ILIKE right` operation on [`StringArray`] /
-/// [`LargeStringArray`] and a scalar.
-///
-/// See the documentation on [`ilike_utf8`] for more details.
-pub fn nilike_utf8_scalar<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    nilike_scalar(left, right)
+/// Perform SQL `CONTAINS(left, right)`
+pub fn contains(left: &dyn Datum, right: &dyn Datum) -> Result<BooleanArray, 
ArrowError> {
+    like_op(Op::Contains, left, right)
 }
 
-fn is_like_pattern(c: char) -> bool {
-    c == '%' || c == '_'
-}
-
-/// Evaluate regex `op(left)` matching `right` on [`StringArray`] / 
[`LargeStringArray`]
-///
-/// If `negate_regex` is true, the regex expression will be negated. (for 
example, with `not like`)
-fn regex_like<'a, S: ArrayAccessor<Item = &'a str>, F>(
-    left: S,
-    right: S,
-    negate_regex: bool,
-    op: F,
-) -> Result<BooleanArray, ArrowError>
-where
-    F: Fn(&str) -> Result<Regex, ArrowError>,
-{
-    let mut map = HashMap::new();
-    if left.len() != right.len() {
-        return Err(ArrowError::ComputeError(
-            "Cannot perform comparison operation on arrays of different length"
-                .to_string(),
-        ));
+fn like_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, 
ArrowError> {
+    use arrow_schema::DataType::*;
+    let (l, l_s) = lhs.get();
+    let (r, r_s) = rhs.get();
+
+    if l.len() != r.len() && !l_s && !r_s {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Cannot compare arrays of different lengths, got {} vs {}",
+            l.len(),
+            r.len()
+        )));
     }
 
-    let nulls = NullBuffer::union(
-        left.logical_nulls().as_ref(),
-        right.logical_nulls().as_ref(),
-    );
+    let l_v = l.as_any_dictionary_opt();
+    let l = l_v.map(|x| x.values().as_ref()).unwrap_or(l);
 
-    let mut result = BooleanBufferBuilder::new(left.len());
-    for i in 0..left.len() {
-        let haystack = left.value(i);
-        let pat = right.value(i);
-        let re = if let Some(ref regex) = map.get(pat) {
-            regex
-        } else {
-            let re_pattern = replace_like_wildcards(pat)?;
-            let re = op(&re_pattern)?;
-            map.insert(pat, re);
-            map.get(pat).unwrap()
-        };
+    let r_v = r.as_any_dictionary_opt();
+    let r = r_v.map(|x| x.values().as_ref()).unwrap_or(r);
 
-        result.append(if negate_regex {
-            !re.is_match(haystack)
-        } else {
-            re.is_match(haystack)
-        });
+    match (l.data_type(), r.data_type()) {
+        (Utf8, Utf8) => {
+            apply::<i32>(op, l.as_string(), l_s, l_v, r.as_string(), r_s, r_v)
+        }
+        (LargeUtf8, LargeUtf8) => {
+            apply::<i64>(op, l.as_string(), l_s, l_v, r.as_string(), r_s, r_v)
+        }
+        (l_t, r_t) => Err(ArrowError::InvalidArgumentError(format!(
+            "Invalid string operation: {l_t} {op} {r_t}"
+        ))),
     }
-
-    let data = unsafe {
-        ArrayDataBuilder::new(DataType::Boolean)
-            .len(left.len())
-            .nulls(nulls)
-            .buffers(vec![result.into()])
-            .build_unchecked()
-    };
-    Ok(BooleanArray::from(data))
 }
 
-/// Perform SQL `STARTSWITH(left, right)` operation on [`StringArray`] / 
[`LargeStringArray`].
-///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn starts_with_utf8<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &GenericStringArray<OffsetSize>,
+fn apply<O: OffsetSizeTrait>(
+    op: Op,
+    l: &GenericStringArray<O>,
+    l_s: bool,
+    l_v: Option<&dyn AnyDictionaryArray>,
+    r: &GenericStringArray<O>,
+    r_s: bool,
+    r_v: Option<&dyn AnyDictionaryArray>,
 ) -> Result<BooleanArray, ArrowError> {
-    starts_with(left, right)
+    let l_len = l_v.map(|l| l.len()).unwrap_or(l.len());
+    if r_s {
+        let scalar = match r_v {
+            Some(dict) => match dict.nulls().filter(|n| n.null_count() != 0) {
+                Some(_) => return Ok(BooleanArray::new_null(l_len)),
+                None => {
+                    let idx = dict.normalized_keys()[0];
+                    if r.is_null(idx) {
+                        return Ok(BooleanArray::new_null(l_len));
+                    }
+                    r.value(idx)
+                }
+            },
+            None => r.value(0),
+        };
+        op_scalar(op, l, l_v, scalar)
+    } else {
+        match (l_s, l_v, r_v) {
+            (true, None, None) => {
+                let v = l.is_valid(0).then(|| l.value(0));
+                op_binary(op, std::iter::repeat(v), r.iter())
+            }
+            (true, Some(l_v), None) => {
+                let idx = l_v.is_valid(0).then(|| l_v.normalized_keys()[0]);
+                let v = idx.and_then(|idx| l.is_valid(idx).then(|| 
l.value(idx)));
+                op_binary(op, std::iter::repeat(v), r.iter())
+            }
+            (true, None, Some(r_v)) => {
+                let v = l.is_valid(0).then(|| l.value(0));
+                op_binary(op, std::iter::repeat(v), vectored_iter(r, r_v))
+            }
+            (true, Some(l_v), Some(r_v)) => {
+                let idx = l_v.is_valid(0).then(|| l_v.normalized_keys()[0]);
+                let v = idx.and_then(|idx| l.is_valid(idx).then(|| 
l.value(idx)));
+                op_binary(op, std::iter::repeat(v), vectored_iter(r, r_v))
+            }
+            (false, None, None) => op_binary(op, l.iter(), r.iter()),
+            (false, Some(l_v), None) => op_binary(op, vectored_iter(l, l_v), 
r.iter()),
+            (false, None, Some(r_v)) => op_binary(op, l.iter(), 
vectored_iter(r, r_v)),
+            (false, Some(l_v), Some(r_v)) => {
+                op_binary(op, vectored_iter(l, l_v), vectored_iter(r, r_v))
+            }
+        }
+    }
 }
 
-#[inline]
-fn starts_with<'a, S: ArrayAccessor<Item = &'a str>>(
-    left: S,
-    right: S,
+#[inline(never)]
+fn op_scalar<O: OffsetSizeTrait>(
+    op: Op,
+    l: &GenericStringArray<O>,
+    l_v: Option<&dyn AnyDictionaryArray>,
+    r: &str,
 ) -> Result<BooleanArray, ArrowError> {
-    compare_op(left, right, |l, r| l.starts_with(r))
-}
+    let r = match op {
+        Op::Like(neg) => Predicate::like(r)?.evaluate_array(l, neg),
+        Op::ILike(neg) => Predicate::ilike(r, l.is_ascii())?.evaluate_array(l, 
neg),
+        Op::Contains => Predicate::Contains(r).evaluate_array(l, false),
+        Op::StartsWith => Predicate::StartsWith(r).evaluate_array(l, false),
+        Op::EndsWith => Predicate::EndsWith(r).evaluate_array(l, false),
+    };
 
-#[inline]
-fn starts_with_scalar<'a, L: ArrayAccessor<Item = &'a str>>(
-    left: L,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    compare_op_scalar(left, |item| item.starts_with(right))
+    Ok(match l_v {
+        Some(v) => take(&r, v.keys(), None)?.as_boolean().clone(),
+        None => r,
+    })
 }
 
-/// Perform SQL `STARTSWITH(left, right)` operation on [`StringArray`] /
-/// [`LargeStringArray`] and a scalar.
-///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn starts_with_utf8_scalar<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    starts_with_scalar(left, right)
+fn vectored_iter<'a, O: OffsetSizeTrait>(
+    a: &'a GenericStringArray<O>,
+    a_v: &'a dyn AnyDictionaryArray,
+) -> impl Iterator<Item = Option<&'a str>> + 'a {
+    let nulls = a_v.nulls();
+    let keys = a_v.normalized_keys();
+    keys.into_iter().enumerate().map(move |(idx, key)| {
+        if nulls.map(|n| n.is_null(idx)).unwrap_or_default() || a.is_null(key) 
{
+            return None;
+        }
+        Some(a.value(key))
+    })
 }
 
-/// Perform SQL `ENDSWITH(left, right)` operation on [`StringArray`] / 
[`LargeStringArray`].
-///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn ends_with_utf8<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &GenericStringArray<OffsetSize>,
+#[inline(never)]
+fn op_binary<'a>(
+    op: Op,
+    l: impl Iterator<Item = Option<&'a str>>,
+    r: impl Iterator<Item = Option<&'a str>>,
 ) -> Result<BooleanArray, ArrowError> {
-    ends_with(left, right)
+    match op {
+        Op::Like(neg) => binary_predicate(l, r, neg, Predicate::like),
+        Op::ILike(neg) => binary_predicate(l, r, neg, |s| Predicate::ilike(s, 
false)),
+        Op::Contains => Ok(l.zip(r).map(|(l, r)| 
Some(l?.contains(r?))).collect()),
+        Op::StartsWith => Ok(l.zip(r).map(|(l, r)| 
Some(l?.starts_with(r?))).collect()),
+        Op::EndsWith => Ok(l.zip(r).map(|(l, r)| 
Some(l?.ends_with(r?))).collect()),
+    }
 }
 
-#[inline]
-fn ends_with<'a, S: ArrayAccessor<Item = &'a str>>(
-    left: S,
-    right: S,
+fn binary_predicate<'a>(
+    l: impl Iterator<Item = Option<&'a str>>,
+    r: impl Iterator<Item = Option<&'a str>>,
+    neg: bool,
+    f: impl Fn(&'a str) -> Result<Predicate<'a>, ArrowError>,
 ) -> Result<BooleanArray, ArrowError> {
-    compare_op(left, right, |l, r| l.ends_with(r))
+    let mut previous = None;
+    l.zip(r)
+        .map(|(l, r)| match (l, r) {
+            (Some(l), Some(r)) => {
+                let p: &Predicate = match previous {
+                    Some((expr, ref predicate)) if expr == r => predicate,
+                    _ => &previous.insert((r, f(r)?)).1,
+                };
+                Ok(Some(p.evaluate(l) != neg))
+            }
+            _ => Ok(None),
+        })
+        .collect()
 }
 
-#[inline]
-fn ends_with_scalar<'a, L: ArrayAccessor<Item = &'a str>>(
-    left: L,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    compare_op_scalar(left, |item| item.ends_with(right))
-}
+// Deprecated kernels
 
-/// Perform SQL `ENDSWITH(left, right)` operation on [`StringArray`] /
-/// [`LargeStringArray`] and a scalar.
-///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn ends_with_utf8_scalar<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    ends_with_scalar(left, right)
+fn make_scalar(data_type: &DataType, scalar: &str) -> Result<ArrayRef, 
ArrowError> {
+    match data_type {
+        DataType::Utf8 => 
Ok(Arc::new(StringArray::from_iter_values([scalar]))),
+        DataType::LargeUtf8 => 
Ok(Arc::new(LargeStringArray::from_iter_values([scalar]))),
+        DataType::Dictionary(_, v) => make_scalar(v.as_ref(), scalar),
+        d => Err(ArrowError::InvalidArgumentError(format!(
+            "Unsupported string scalar data type {d:?}",
+        ))),
+    }
 }
 
-/// Perform SQL `CONTAINS(left, right)` operation on [`StringArray`] / 
[`LargeStringArray`].
-///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn contains_utf8<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &GenericStringArray<OffsetSize>,
-) -> Result<BooleanArray, ArrowError> {
-    contains(left, right)
-}
+macro_rules! legacy_kernels {
+    ($fn_datum:ident, $fn_array:ident, $fn_scalar:ident, $fn_array_dyn:ident, 
$fn_scalar_dyn:ident, $deprecation:expr) => {
+        #[doc(hidden)]
+        #[deprecated(note = $deprecation)]
+        pub fn $fn_array<O: OffsetSizeTrait>(
+            left: &GenericStringArray<O>,
+            right: &GenericStringArray<O>,
+        ) -> Result<BooleanArray, ArrowError> {
+            $fn_datum(left, right)
+        }
 
-#[inline]
-fn contains<'a, S: ArrayAccessor<Item = &'a str>>(
-    left: S,
-    right: S,
-) -> Result<BooleanArray, ArrowError> {
-    compare_op(left, right, |l, r| l.contains(r))
-}
+        #[doc(hidden)]
+        #[deprecated(note = $deprecation)]
+        pub fn $fn_scalar<O: OffsetSizeTrait>(
+            left: &GenericStringArray<O>,
+            right: &str,
+        ) -> Result<BooleanArray, ArrowError> {
+            let scalar = GenericStringArray::<O>::from_iter_values([right]);
+            $fn_datum(left, &Scalar::new(&scalar))
+        }
 
-#[inline]
-fn contains_scalar<'a, L: ArrayAccessor<Item = &'a str>>(
-    left: L,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    compare_op_scalar(left, |item| item.contains(right))
-}
+        #[doc(hidden)]
+        #[deprecated(note = $deprecation)]
+        pub fn $fn_array_dyn(
+            left: &dyn Array,
+            right: &dyn Array,
+        ) -> Result<BooleanArray, ArrowError> {
+            $fn_datum(&left, &right)
+        }
 
-/// Perform SQL `CONTAINS(left, right)` operation on [`StringArray`] /
-/// [`LargeStringArray`] and a scalar.
-///
-/// See the documentation on [`like_utf8`] for more details.
-pub fn contains_utf8_scalar<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &str,
-) -> Result<BooleanArray, ArrowError> {
-    contains_scalar(left, right)
+        #[doc(hidden)]
+        #[deprecated(note = $deprecation)]
+        pub fn $fn_scalar_dyn(
+            left: &dyn Array,
+            right: &str,
+        ) -> Result<BooleanArray, ArrowError> {
+            let scalar = make_scalar(left.data_type(), right)?;
+            $fn_datum(&left, &Scalar::new(&scalar))
+        }
+    };
 }
 
+legacy_kernels!(
+    like,
+    like_utf8,
+    like_utf8_scalar,
+    like_dyn,
+    like_utf8_scalar_dyn,
+    "Use arrow_string::like::like"
+);
+legacy_kernels!(
+    ilike,
+    ilike_utf8,
+    ilike_utf8_scalar,
+    ilike_dyn,
+    ilike_utf8_scalar_dyn,
+    "Use arrow_string::like::ilike"
+);
+legacy_kernels!(
+    nlike,
+    nlike_utf8,
+    nlike_utf8_scalar,
+    nlike_dyn,
+    nlike_utf8_scalar_dyn,
+    "Use arrow_string::like::nlike"
+);
+legacy_kernels!(
+    nilike,
+    nilike_utf8,
+    nilike_utf8_scalar,
+    nilike_dyn,
+    nilike_utf8_scalar_dyn,
+    "Use arrow_string::like::nilike"
+);
+legacy_kernels!(
+    contains,
+    contains_utf8,
+    contains_utf8_scalar,
+    contains_dyn,
+    contains_utf8_scalar_dyn,
+    "Use arrow_string::like::contains"
+);
+legacy_kernels!(
+    starts_with,
+    starts_with_utf8,
+    starts_with_utf8_scalar,
+    starts_with_dyn,
+    starts_with_utf8_scalar_dyn,
+    "Use arrow_string::like::starts_with"
+);
+
+legacy_kernels!(
+    ends_with,
+    ends_with_utf8,
+    ends_with_utf8_scalar,
+    ends_with_dyn,
+    ends_with_utf8_scalar_dyn,
+    "Use arrow_string::like::ends_with"
+);
+
 #[cfg(test)]
+#[allow(deprecated)]
 mod tests {
     use super::*;
     use arrow_array::types::Int8Type;
@@ -936,34 +593,6 @@ mod tests {
         vec![true]
     );
 
-    #[test]
-    fn test_replace_like_wildcards() {
-        let a_eq = "_%";
-        let expected = "..*";
-        assert_eq!(replace_like_wildcards(a_eq).unwrap(), expected);
-    }
-
-    #[test]
-    fn test_replace_like_wildcards_leave_like_meta_chars() {
-        let a_eq = "\\%\\_";
-        let expected = "%_";
-        assert_eq!(replace_like_wildcards(a_eq).unwrap(), expected);
-    }
-
-    #[test]
-    fn test_replace_like_wildcards_with_multiple_escape_chars() {
-        let a_eq = "\\\\%";
-        let expected = "\\\\%";
-        assert_eq!(replace_like_wildcards(a_eq).unwrap(), expected);
-    }
-
-    #[test]
-    fn test_replace_like_wildcards_escape_regex_meta_char() {
-        let a_eq = ".";
-        let expected = "\\.";
-        assert_eq!(replace_like_wildcards(a_eq).unwrap(), expected);
-    }
-
     test_utf8!(
         test_utf8_array_nlike,
         vec!["arrow", "arrow", "arrow", "arrow", "arrow", "arrows", "arrow"],
diff --git a/arrow-string/src/predicate.rs b/arrow-string/src/predicate.rs
new file mode 100644
index 0000000000..162e3c7502
--- /dev/null
+++ b/arrow-string/src/predicate.rs
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::{BooleanArray, GenericStringArray, OffsetSizeTrait};
+use arrow_schema::ArrowError;
+use regex::{Regex, RegexBuilder};
+
+/// A string based predicate
+pub enum Predicate<'a> {
+    Eq(&'a str),
+    Contains(&'a str),
+    StartsWith(&'a str),
+    EndsWith(&'a str),
+
+    /// Equality ignoring ASCII case
+    IEqAscii(&'a str),
+    /// Starts with ignoring ASCII case
+    IStartsWithAscii(&'a str),
+    /// Ends with ignoring ASCII case
+    IEndsWithAscii(&'a str),
+
+    Regex(Regex),
+}
+
+impl<'a> Predicate<'a> {
+    /// Create a predicate for the given like pattern
+    pub fn like(pattern: &'a str) -> Result<Self, ArrowError> {
+        if !pattern.contains(is_like_pattern) {
+            Ok(Self::Eq(pattern))
+        } else if pattern.ends_with('%')
+            && !pattern.ends_with("\\%")
+            && !pattern[..pattern.len() - 1].contains(is_like_pattern)
+        {
+            Ok(Self::StartsWith(&pattern[..pattern.len() - 1]))
+        } else if pattern.starts_with('%') && 
!pattern[1..].contains(is_like_pattern) {
+            Ok(Self::EndsWith(&pattern[1..]))
+        } else if pattern.starts_with('%')
+            && pattern.ends_with('%')
+            && !pattern.ends_with("\\%")
+            && !pattern[1..pattern.len() - 1].contains(is_like_pattern)
+        {
+            Ok(Self::Contains(&pattern[1..pattern.len() - 1]))
+        } else {
+            Ok(Self::Regex(regex_like(pattern, false)?))
+        }
+    }
+
+    /// Create a predicate for the given ilike pattern
+    pub fn ilike(pattern: &'a str, is_ascii: bool) -> Result<Self, ArrowError> 
{
+        if is_ascii && pattern.is_ascii() {
+            if !pattern.contains(is_like_pattern) {
+                return Ok(Self::IEqAscii(pattern));
+            } else if pattern.ends_with('%')
+                && !pattern.ends_with("\\%")
+                && !pattern[..pattern.len() - 1].contains(is_like_pattern)
+            {
+                return Ok(Self::IStartsWithAscii(&pattern[..pattern.len() - 
1]));
+            } else if pattern.starts_with('%') && 
!pattern[1..].contains(is_like_pattern)
+            {
+                return Ok(Self::IEndsWithAscii(&pattern[1..]));
+            }
+        }
+        Ok(Self::Regex(regex_like(pattern, true)?))
+    }
+
+    /// Evaluate this predicate against the given haystack
+    pub fn evaluate(&self, haystack: &str) -> bool {
+        match self {
+            Predicate::Eq(v) => *v == haystack,
+            Predicate::IEqAscii(v) => haystack.eq_ignore_ascii_case(v),
+            Predicate::Contains(v) => haystack.contains(v),
+            Predicate::StartsWith(v) => haystack.starts_with(v),
+            Predicate::IStartsWithAscii(v) => 
starts_with_ignore_ascii_case(haystack, v),
+            Predicate::EndsWith(v) => haystack.ends_with(v),
+            Predicate::IEndsWithAscii(v) => 
ends_with_ignore_ascii_case(haystack, v),
+            Predicate::Regex(v) => v.is_match(haystack),
+        }
+    }
+
+    /// Evaluate this predicate against the elements of `array`
+    ///
+    /// If `negate` is true the result of the predicate will be negated
+    #[inline(never)]
+    pub fn evaluate_array<O: OffsetSizeTrait>(
+        &self,
+        array: &GenericStringArray<O>,
+        negate: bool,
+    ) -> BooleanArray {
+        match self {
+            Predicate::Eq(v) => BooleanArray::from_unary(array, |haystack| {
+                (haystack.len() == v.len() && haystack == *v) != negate
+            }),
+            Predicate::IEqAscii(v) => BooleanArray::from_unary(array, 
|haystack| {
+                haystack.eq_ignore_ascii_case(v) != negate
+            }),
+            Predicate::Contains(v) => {
+                BooleanArray::from_unary(array, |haystack| 
haystack.contains(v) != negate)
+            }
+            Predicate::StartsWith(v) => BooleanArray::from_unary(array, 
|haystack| {
+                haystack.starts_with(v) != negate
+            }),
+            Predicate::IStartsWithAscii(v) => {
+                BooleanArray::from_unary(array, |haystack| {
+                    starts_with_ignore_ascii_case(haystack, v) != negate
+                })
+            }
+            Predicate::EndsWith(v) => BooleanArray::from_unary(array, 
|haystack| {
+                haystack.ends_with(v) != negate
+            }),
+            Predicate::IEndsWithAscii(v) => BooleanArray::from_unary(array, 
|haystack| {
+                ends_with_ignore_ascii_case(haystack, v) != negate
+            }),
+            Predicate::Regex(v) => {
+                BooleanArray::from_unary(array, |haystack| 
v.is_match(haystack) != negate)
+            }
+        }
+    }
+}
+
+fn starts_with_ignore_ascii_case(haystack: &str, needle: &str) -> bool {
+    let end = haystack.len().min(needle.len());
+    haystack.is_char_boundary(end) && 
needle.eq_ignore_ascii_case(&haystack[..end])
+}
+
+fn ends_with_ignore_ascii_case(haystack: &str, needle: &str) -> bool {
+    let start = haystack.len().saturating_sub(needle.len());
+    haystack.is_char_boundary(start) && 
needle.eq_ignore_ascii_case(&haystack[start..])
+}
+
+/// Transforms a like `pattern` to a regex compatible pattern. To achieve 
that, it does:
+///
+/// 1. Replace like wildcards for regex expressions as the pattern will be 
evaluated using regex match: `%` => `.*` and `_` => `.`
+/// 2. Escape regex meta characters to match them and not be evaluated as 
regex special chars. For example: `.` => `\\.`
+/// 3. Replace escaped like wildcards removing the escape characters to be 
able to match it as a regex. For example: `\\%` => `%`
+fn regex_like(pattern: &str, case_insensitive: bool) -> Result<Regex, 
ArrowError> {
+    let mut result = String::with_capacity(pattern.len() * 2);
+    result.push('^');
+    let mut chars_iter = pattern.chars().peekable();
+    while let Some(c) = chars_iter.next() {
+        if c == '\\' {
+            let next = chars_iter.peek();
+            match next {
+                Some(next) if is_like_pattern(*next) => {
+                    result.push(*next);
+                    // Skipping the next char as it is already appended
+                    chars_iter.next();
+                }
+                _ => {
+                    result.push('\\');
+                    result.push('\\');
+                }
+            }
+        } else if regex_syntax::is_meta_character(c) {
+            result.push('\\');
+            result.push(c);
+        } else if c == '%' {
+            result.push_str(".*");
+        } else if c == '_' {
+            result.push('.');
+        } else {
+            result.push(c);
+        }
+    }
+    result.push('$');
+    RegexBuilder::new(&result)
+        .case_insensitive(case_insensitive)
+        .dot_matches_new_line(true)
+        .build()
+        .map_err(|e| {
+            ArrowError::InvalidArgumentError(format!(
+                "Unable to build regex from LIKE pattern: {e}"
+            ))
+        })
+}
+
+fn is_like_pattern(c: char) -> bool {
+    c == '%' || c == '_'
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_replace_like_wildcards() {
+        let a_eq = "_%";
+        let expected = "^..*$";
+        let r = regex_like(a_eq, false).unwrap();
+        assert_eq!(r.to_string(), expected);
+    }
+
+    #[test]
+    fn test_replace_like_wildcards_leave_like_meta_chars() {
+        let a_eq = "\\%\\_";
+        let expected = "^%_$";
+        let r = regex_like(a_eq, false).unwrap();
+        assert_eq!(r.to_string(), expected);
+    }
+
+    #[test]
+    fn test_replace_like_wildcards_with_multiple_escape_chars() {
+        let a_eq = "\\\\%";
+        let expected = "^\\\\%$";
+        let r = regex_like(a_eq, false).unwrap();
+        assert_eq!(r.to_string(), expected);
+    }
+
+    #[test]
+    fn test_replace_like_wildcards_escape_regex_meta_char() {
+        let a_eq = ".";
+        let expected = "^\\.$";
+        let r = regex_like(a_eq, false).unwrap();
+        assert_eq!(r.to_string(), expected);
+    }
+}
diff --git a/arrow/benches/comparison_kernels.rs 
b/arrow/benches/comparison_kernels.rs
index b9fb6c8e33..02de70c5d7 100644
--- a/arrow/benches/comparison_kernels.rs
+++ b/arrow/benches/comparison_kernels.rs
@@ -32,22 +32,19 @@ use arrow_string::regexp::regexp_is_match_utf8_scalar;
 const SIZE: usize = 65536;
 
 fn bench_like_utf8_scalar(arr_a: &StringArray, value_b: &str) {
-    like_utf8_scalar(criterion::black_box(arr_a), 
criterion::black_box(value_b)).unwrap();
+    like(arr_a, &StringArray::new_scalar(value_b)).unwrap();
 }
 
 fn bench_nlike_utf8_scalar(arr_a: &StringArray, value_b: &str) {
-    nlike_utf8_scalar(criterion::black_box(arr_a), 
criterion::black_box(value_b))
-        .unwrap();
+    nlike(arr_a, &StringArray::new_scalar(value_b)).unwrap();
 }
 
 fn bench_ilike_utf8_scalar(arr_a: &StringArray, value_b: &str) {
-    ilike_utf8_scalar(criterion::black_box(arr_a), 
criterion::black_box(value_b))
-        .unwrap();
+    ilike(arr_a, &StringArray::new_scalar(value_b)).unwrap();
 }
 
 fn bench_nilike_utf8_scalar(arr_a: &StringArray, value_b: &str) {
-    nilike_utf8_scalar(criterion::black_box(arr_a), 
criterion::black_box(value_b))
-        .unwrap();
+    nilike(arr_a, &StringArray::new_scalar(value_b)).unwrap();
 }
 
 fn bench_regexp_is_match_utf8_scalar(arr_a: &StringArray, value_b: &str) {
@@ -103,45 +100,45 @@ fn add_benchmark(c: &mut Criterion) {
 
     let arr_a = create_primitive_array_with_seed::<Int32Type>(SIZE, 0.0, 42);
     let arr_b = create_primitive_array_with_seed::<Int32Type>(SIZE, 0.0, 43);
-    let scalar = Int32Array::from(vec![1]);
+    let scalar = Int32Array::new_scalar(1);
 
     c.bench_function("eq Int32", |b| b.iter(|| eq(&arr_a, &arr_b)));
     c.bench_function("eq scalar Int32", |b| {
-        b.iter(|| eq(&arr_a, &Scalar::new(&scalar)).unwrap())
+        b.iter(|| eq(&arr_a, &scalar).unwrap())
     });
 
     c.bench_function("neq Int32", |b| b.iter(|| neq(&arr_a, &arr_b)));
     c.bench_function("neq scalar Int32", |b| {
-        b.iter(|| neq(&arr_a, &Scalar::new(&scalar)).unwrap())
+        b.iter(|| neq(&arr_a, &scalar).unwrap())
     });
 
     c.bench_function("lt Int32", |b| b.iter(|| lt(&arr_a, &arr_b)));
     c.bench_function("lt scalar Int32", |b| {
-        b.iter(|| lt(&arr_a, &Scalar::new(&scalar)).unwrap())
+        b.iter(|| lt(&arr_a, &scalar).unwrap())
     });
 
     c.bench_function("lt_eq Int32", |b| b.iter(|| lt_eq(&arr_a, &arr_b)));
     c.bench_function("lt_eq scalar Int32", |b| {
-        b.iter(|| lt_eq(&arr_a, &Scalar::new(&scalar)).unwrap())
+        b.iter(|| lt_eq(&arr_a, &scalar).unwrap())
     });
 
     c.bench_function("gt Int32", |b| b.iter(|| gt(&arr_a, &arr_b)));
     c.bench_function("gt scalar Int32", |b| {
-        b.iter(|| gt(&arr_a, &Scalar::new(&scalar)).unwrap())
+        b.iter(|| gt(&arr_a, &scalar).unwrap())
     });
 
     c.bench_function("gt_eq Int32", |b| b.iter(|| gt_eq(&arr_a, &arr_b)));
     c.bench_function("gt_eq scalar Int32", |b| {
-        b.iter(|| gt_eq(&arr_a, &Scalar::new(&scalar)).unwrap())
+        b.iter(|| gt_eq(&arr_a, &scalar).unwrap())
     });
 
     c.bench_function("eq MonthDayNano", |b| {
         b.iter(|| eq(&arr_month_day_nano_a, &arr_month_day_nano_b))
     });
-    let scalar = IntervalMonthDayNanoArray::from(vec![123]);
+    let scalar = IntervalMonthDayNanoArray::new_scalar(123);
 
     c.bench_function("eq scalar MonthDayNano", |b| {
-        b.iter(|| eq(&arr_month_day_nano_b, &Scalar::new(&scalar)).unwrap())
+        b.iter(|| eq(&arr_month_day_nano_b, &scalar).unwrap())
     });
 
     c.bench_function("like_utf8 scalar equals", |b| {
@@ -246,11 +243,11 @@ fn add_benchmark(c: &mut Criterion) {
     );
 
     c.bench_function("like_utf8_scalar_dyn dictionary[10] string[4])", |b| {
-        b.iter(|| like_utf8_scalar_dyn(&dict_arr_a, "test"))
+        b.iter(|| like(&dict_arr_a, &StringArray::new_scalar("test")))
     });
 
     c.bench_function("ilike_utf8_scalar_dyn dictionary[10] string[4])", |b| {
-        b.iter(|| ilike_utf8_scalar_dyn(&dict_arr_a, "test"))
+        b.iter(|| ilike(&dict_arr_a, &StringArray::new_scalar("test")))
     });
 
     let strings = create_string_array::<i32>(20, 0.);

Reply via email to