This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7bc9906513 Implement Spark `url` function `parse_url` (#16937)
7bc9906513 is described below

commit 7bc9906513c86ad028bab0370fd80451be288f1e
Author: Alan Tang <jmtan...@gmail.com>
AuthorDate: Fri Aug 8 05:01:07 2025 +0800

    Implement Spark `url` function `parse_url` (#16937)
    
    * implement Spark url function parse_url
    
    Signed-off-by: Alan Tang <jmtan...@gmail.com>
    
    * feat: Support only when all three arguments are StringViewArray
    
    Signed-off-by: Alan Tang <jmtan...@gmail.com>
    
    * feat: only support three types on parse_url function
    
    Signed-off-by: Alan Tang <jmtan...@gmail.com>
    
    * chore: fix clippy error
    
    Signed-off-by: Alan Tang <jmtan...@gmail.com>
    
    ---------
    
    Signed-off-by: Alan Tang <jmtan...@gmail.com>
---
 Cargo.lock                                         |   1 +
 datafusion/spark/Cargo.toml                        |   1 +
 datafusion/spark/src/function/url/mod.rs           |  13 +-
 datafusion/spark/src/function/url/parse_url.rs     | 301 +++++++++++++++++++++
 .../test_files/spark/url/parse_url.slt             |  80 ++++--
 5 files changed, 374 insertions(+), 22 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 4e1d29a623..c06d5e6cfc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2616,6 +2616,7 @@ dependencies = [
  "log",
  "rand 0.9.2",
  "sha1",
+ "url",
  "xxhash-rust",
 ]
 
diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml
index 73b406257b..4ed82a453d 100644
--- a/datafusion/spark/Cargo.toml
+++ b/datafusion/spark/Cargo.toml
@@ -47,6 +47,7 @@ datafusion-functions = { workspace = true, features = 
["crypto_expressions"] }
 datafusion-macros = { workspace = true }
 log = { workspace = true }
 sha1 = "0.10"
+url = { workspace = true }
 xxhash-rust = { version = "0.8", features = ["xxh3"] }
 
 [dev-dependencies]
diff --git a/datafusion/spark/src/function/url/mod.rs 
b/datafusion/spark/src/function/url/mod.rs
index a87df9a2c8..7c959572a8 100644
--- a/datafusion/spark/src/function/url/mod.rs
+++ b/datafusion/spark/src/function/url/mod.rs
@@ -16,10 +16,19 @@
 // under the License.
 
 use datafusion_expr::ScalarUDF;
+use datafusion_functions::make_udf_function;
 use std::sync::Arc;
 
-pub mod expr_fn {}
+pub mod parse_url;
+
+make_udf_function!(parse_url::ParseUrl, parse_url);
+
+pub mod expr_fn {
+    use datafusion_functions::export_functions;
+
+    export_functions!((parse_url, "Extracts a part from a URL.", args));
+}
 
 pub fn functions() -> Vec<Arc<ScalarUDF>> {
-    vec![]
+    vec![parse_url()]
 }
diff --git a/datafusion/spark/src/function/url/parse_url.rs 
b/datafusion/spark/src/function/url/parse_url.rs
new file mode 100644
index 0000000000..6892e6a302
--- /dev/null
+++ b/datafusion/spark/src/function/url/parse_url.rs
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayRef, GenericStringBuilder, LargeStringArray, StringArray, 
StringArrayType,
+};
+use arrow::datatypes::DataType;
+use datafusion_common::cast::{
+    as_large_string_array, as_string_array, as_string_view_array,
+};
+use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
+    Volatility,
+};
+use datafusion_functions::utils::make_scalar_function;
+use url::Url;
+
+#[derive(Debug)]
+pub struct ParseUrl {
+    signature: Signature,
+}
+
+impl Default for ParseUrl {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ParseUrl {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::one_of(
+                vec![
+                    TypeSignature::Uniform(
+                        1,
+                        vec![DataType::Utf8View, DataType::Utf8, 
DataType::LargeUtf8],
+                    ),
+                    TypeSignature::Uniform(
+                        2,
+                        vec![DataType::Utf8View, DataType::Utf8, 
DataType::LargeUtf8],
+                    ),
+                    TypeSignature::Uniform(
+                        3,
+                        vec![DataType::Utf8View, DataType::Utf8, 
DataType::LargeUtf8],
+                    ),
+                ],
+                Volatility::Immutable,
+            ),
+        }
+    }
+    /// Parses a URL and extracts the specified component.
+    ///
+    /// This function takes a URL string and extracts different parts of it 
based on the
+    /// `part` parameter. For query parameters, an optional `key` can be 
specified to
+    /// extract a specific query parameter value.
+    ///
+    /// # Arguments
+    ///
+    /// * `value` - The URL string to parse
+    /// * `part` - The component of the URL to extract. Valid values are:
+    ///   - `"HOST"` - The hostname (e.g., "example.com")
+    ///   - `"PATH"` - The path portion (e.g., "/path/to/resource")
+    ///   - `"QUERY"` - The query string or a specific query parameter
+    ///   - `"REF"` - The fragment/anchor (the part after #)
+    ///   - `"PROTOCOL"` - The URL scheme (e.g., "https", "http")
+    ///   - `"FILE"` - The path with query string (e.g., "/path?query=value")
+    ///   - `"AUTHORITY"` - The authority component (host:port)
+    ///   - `"USERINFO"` - The user information (username:password)
+    /// * `key` - Optional parameter used only with `"QUERY"`. When provided, 
extracts
+    ///   the value of the specific query parameter with this key name.
+    ///
+    /// # Returns
+    ///
+    /// * `Ok(Some(String))` - The extracted URL component as a string
+    /// * `Ok(None)` - If the requested component doesn't exist or is empty
+    /// * `Err(DataFusionError)` - If the URL is malformed and cannot be parsed
+    ///
+    fn parse(value: &str, part: &str, key: Option<&str>) -> 
Result<Option<String>> {
+        Url::parse(value)
+            .map_err(|e| exec_datafusion_err!("{e:?}"))
+            .map(|url| match part {
+                "HOST" => url.host_str().map(String::from),
+                "PATH" => Some(url.path().to_string()),
+                "QUERY" => match key {
+                    None => url.query().map(String::from),
+                    Some(key) => url
+                        .query_pairs()
+                        .find(|(k, _)| k == key)
+                        .map(|(_, v)| v.into_owned()),
+                },
+                "REF" => url.fragment().map(String::from),
+                "PROTOCOL" => Some(url.scheme().to_string()),
+                "FILE" => {
+                    let path = url.path();
+                    match url.query() {
+                        Some(query) => Some(format!("{path}?{query}")),
+                        None => Some(path.to_string()),
+                    }
+                }
+                "AUTHORITY" => Some(url.authority().to_string()),
+                "USERINFO" => {
+                    let username = url.username();
+                    if username.is_empty() {
+                        return None;
+                    }
+                    match url.password() {
+                        Some(password) => 
Some(format!("{username}:{password}")),
+                        None => Some(username.to_string()),
+                    }
+                }
+                _ => None,
+            })
+    }
+}
+
+impl ScalarUDFImpl for ParseUrl {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "parse_url"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.len() < 2 || arg_types.len() > 3 {
+            return plan_err!(
+                "{} expects 2 or 3 arguments, but got {}",
+                self.name(),
+                arg_types.len()
+            );
+        }
+        match arg_types.len() {
+            2 | 3 => {
+                if arg_types
+                    .iter()
+                    .any(|arg| matches!(arg, DataType::LargeUtf8))
+                {
+                    Ok(DataType::LargeUtf8)
+                } else if arg_types
+                    .iter()
+                    .any(|arg| matches!(arg, DataType::Utf8View))
+                {
+                    Ok(DataType::Utf8View)
+                } else {
+                    Ok(DataType::Utf8)
+                }
+            }
+            _ => plan_err!(
+                "`{}` expects 2 or 3 arguments, got {}",
+                &self.name(),
+                arg_types.len()
+            ),
+        }
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let ScalarFunctionArgs { args, .. } = args;
+        make_scalar_function(spark_parse_url, vec![])(&args)
+    }
+}
+
+/// Core implementation of URL parsing function.
+///
+/// # Arguments
+///
+/// * `args` - A slice of ArrayRef containing the input arrays:
+///   - `args[0]` - URL array: The URLs to parse
+///   - `args[1]` - Part array: The URL components to extract (HOST, PATH, 
QUERY, etc.)
+///   - `args[2]` - Key array (optional): For QUERY part, the specific 
parameter names to extract
+///
+/// # Return Value
+///
+/// Returns `Result<ArrayRef>` containing:
+/// - A string array with extracted URL components
+/// - `None` values where extraction failed or component doesn't exist
+/// - The output array type (StringArray or LargeStringArray) is determined by 
input types
+///
+fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> {
+    if args.len() < 2 || args.len() > 3 {
+        return exec_err!(
+            "{} expects 2 or 3 arguments, but got {}",
+            "`parse_url`",
+            args.len()
+        );
+    }
+    // Required arguments
+    let url = &args[0];
+    let part = &args[1];
+
+    let result = if args.len() == 3 {
+        let key = &args[2];
+
+        match (url.data_type(), part.data_type(), key.data_type()) {
+            (DataType::Utf8, DataType::Utf8, DataType::Utf8) => {
+                process_parse_url::<_, _, _, StringArray>(
+                    as_string_array(url)?,
+                    as_string_array(part)?,
+                    as_string_array(key)?,
+                )
+            }
+            (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => {
+                process_parse_url::<_, _, _, StringArray>(
+                    as_string_view_array(url)?,
+                    as_string_view_array(part)?,
+                    as_string_view_array(key)?,
+                )
+            }
+            (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => 
{
+                process_parse_url::<_, _, _, LargeStringArray>(
+                    as_large_string_array(url)?,
+                    as_large_string_array(part)?,
+                    as_large_string_array(key)?,
+                )
+            }
+            _ => exec_err!("{} expects STRING arguments, got {:?}", 
"`parse_url`", args),
+        }
+    } else {
+        // The 'key' argument is omitted, assume all values are null
+        // Create 'null' string array for 'key' argument
+        let mut builder: GenericStringBuilder<i32> = 
GenericStringBuilder::new();
+        for _ in 0..args[0].len() {
+            builder.append_null();
+        }
+        let key = builder.finish();
+
+        match (url.data_type(), part.data_type()) {
+            (DataType::Utf8, DataType::Utf8) => {
+                process_parse_url::<_, _, _, StringArray>(
+                    as_string_array(url)?,
+                    as_string_array(part)?,
+                    &key,
+                )
+            }
+            (DataType::Utf8View, DataType::Utf8View) => {
+                process_parse_url::<_, _, _, StringArray>(
+                    as_string_view_array(url)?,
+                    as_string_view_array(part)?,
+                    &key,
+                )
+            }
+            (DataType::LargeUtf8, DataType::LargeUtf8) => {
+                process_parse_url::<_, _, _, LargeStringArray>(
+                    as_large_string_array(url)?,
+                    as_large_string_array(part)?,
+                    &key,
+                )
+            }
+            _ => exec_err!("{} expects STRING arguments, got {:?}", 
"`parse_url`", args),
+        }
+    };
+    result
+}
+
+fn process_parse_url<'a, A, B, C, T>(
+    url_array: &'a A,
+    part_array: &'a B,
+    key_array: &'a C,
+) -> Result<ArrayRef>
+where
+    &'a A: StringArrayType<'a>,
+    &'a B: StringArrayType<'a>,
+    &'a C: StringArrayType<'a>,
+    T: Array + FromIterator<Option<String>> + 'static,
+{
+    url_array
+        .iter()
+        .zip(part_array.iter())
+        .zip(key_array.iter())
+        .map(|((url, part), key)| {
+            if let (Some(url), Some(part), key) = (url, part, key) {
+                ParseUrl::parse(url, part, key)
+            } else {
+                Ok(None)
+            }
+        })
+        .collect::<Result<T>>()
+        .map(|array| Arc::new(array) as ArrayRef)
+}
diff --git a/datafusion/sqllogictest/test_files/spark/url/parse_url.slt 
b/datafusion/sqllogictest/test_files/spark/url/parse_url.slt
index dc4e0b3ac7..cca07ceb6f 100644
--- a/datafusion/sqllogictest/test_files/spark/url/parse_url.slt
+++ b/datafusion/sqllogictest/test_files/spark/url/parse_url.slt
@@ -15,23 +15,63 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# This file was originally created by a porting script from:
-#   
https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function
-# This file is part of the implementation of the datafusion-spark function 
library.
-# For more information, please see:
-#   https://github.com/apache/datafusion/issues/15914
-
-## Original Query: SELECT parse_url('http://spark.apache.org/path?query=1', 
'HOST');
-## PySpark 3.5.5 Result: {'parse_url(http://spark.apache.org/path?query=1, 
HOST)': 'spark.apache.org', 
'typeof(parse_url(http://spark.apache.org/path?query=1, HOST))': 'string', 
'typeof(http://spark.apache.org/path?query=1)': 'string', 'typeof(HOST)': 
'string'}
-#query
-#SELECT parse_url('http://spark.apache.org/path?query=1'::string, 
'HOST'::string);
-
-## Original Query: SELECT parse_url('http://spark.apache.org/path?query=1', 
'QUERY');
-## PySpark 3.5.5 Result: {'parse_url(http://spark.apache.org/path?query=1, 
QUERY)': 'query=1', 'typeof(parse_url(http://spark.apache.org/path?query=1, 
QUERY))': 'string', 'typeof(http://spark.apache.org/path?query=1)': 'string', 
'typeof(QUERY)': 'string'}
-#query
-#SELECT parse_url('http://spark.apache.org/path?query=1'::string, 
'QUERY'::string);
-
-## Original Query: SELECT parse_url('http://spark.apache.org/path?query=1', 
'QUERY', 'query');
-## PySpark 3.5.5 Result: {'parse_url(http://spark.apache.org/path?query=1, 
QUERY, query)': '1', 'typeof(parse_url(http://spark.apache.org/path?query=1, 
QUERY, query))': 'string', 'typeof(http://spark.apache.org/path?query=1)': 
'string', 'typeof(QUERY)': 'string', 'typeof(query)': 'string'}
-#query
-#SELECT parse_url('http://spark.apache.org/path?query=1'::string, 
'QUERY'::string, 'query'::string);
+query T
+SELECT parse_url('http://spark.apache.org/path?query=1'::string, 
'HOST'::string);
+----
+spark.apache.org
+
+query T
+SELECT parse_url('http://spark.apache.org/path?query=1'::string, 
'QUERY'::string);
+----
+query=1
+
+query T
+SELECT parse_url('http://spark.apache.org/path?query=1'::string, 
'QUERY'::string, 'query'::string);
+----
+1
+
+query T
+SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 
'HOST'::string);
+----
+spark.apache.org
+
+query T
+SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 
'PATH'::string);
+----
+/path
+
+query T
+SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 
'QUERY'::string);
+----
+query=1
+
+query T
+SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 
'REF'::string);
+----
+Ref
+
+query T
+SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 
'PROTOCOL'::string);
+----
+http
+
+query T
+SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 
'FILE'::string);
+----
+/path?query=1
+
+query T
+SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 
'AUTHORITY'::string);
+----
+useri...@spark.apache.org
+
+query T
+SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 
'USERINFO'::string);
+----
+userinfo
+
+statement error parse_url expects 2 or 3 arguments, but got 1
+SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string);
+
+statement error 'parse_url' does not support zero arguments
+SELECT parse_url();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to