This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 7bc9906513 Implement Spark `url` function `parse_url` (#16937) 7bc9906513 is described below commit 7bc9906513c86ad028bab0370fd80451be288f1e Author: Alan Tang <jmtan...@gmail.com> AuthorDate: Fri Aug 8 05:01:07 2025 +0800 Implement Spark `url` function `parse_url` (#16937) * implement Spark url function parse_url Signed-off-by: Alan Tang <jmtan...@gmail.com> * feat: Support only when all three arguments are StringViewArray Signed-off-by: Alan Tang <jmtan...@gmail.com> * feat: only support three types on parse_url function Signed-off-by: Alan Tang <jmtan...@gmail.com> * chore: fix clippy error Signed-off-by: Alan Tang <jmtan...@gmail.com> --------- Signed-off-by: Alan Tang <jmtan...@gmail.com> --- Cargo.lock | 1 + datafusion/spark/Cargo.toml | 1 + datafusion/spark/src/function/url/mod.rs | 13 +- datafusion/spark/src/function/url/parse_url.rs | 301 +++++++++++++++++++++ .../test_files/spark/url/parse_url.slt | 80 ++++-- 5 files changed, 374 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e1d29a623..c06d5e6cfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2616,6 +2616,7 @@ dependencies = [ "log", "rand 0.9.2", "sha1", + "url", "xxhash-rust", ] diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 73b406257b..4ed82a453d 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -47,6 +47,7 @@ datafusion-functions = { workspace = true, features = ["crypto_expressions"] } datafusion-macros = { workspace = true } log = { workspace = true } sha1 = "0.10" +url = { workspace = true } xxhash-rust = { version = "0.8", features = ["xxh3"] } [dev-dependencies] diff --git a/datafusion/spark/src/function/url/mod.rs b/datafusion/spark/src/function/url/mod.rs index a87df9a2c8..7c959572a8 100644 --- a/datafusion/spark/src/function/url/mod.rs +++ b/datafusion/spark/src/function/url/mod.rs @@ -16,10 +16,19 @@ // under the License. use datafusion_expr::ScalarUDF; +use datafusion_functions::make_udf_function; use std::sync::Arc; -pub mod expr_fn {} +pub mod parse_url; + +make_udf_function!(parse_url::ParseUrl, parse_url); + +pub mod expr_fn { + use datafusion_functions::export_functions; + + export_functions!((parse_url, "Extracts a part from a URL.", args)); +} pub fn functions() -> Vec<Arc<ScalarUDF>> { - vec![] + vec![parse_url()] } diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs new file mode 100644 index 0000000000..6892e6a302 --- /dev/null +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, GenericStringBuilder, LargeStringArray, StringArray, StringArrayType, +}; +use arrow::datatypes::DataType; +use datafusion_common::cast::{ + as_large_string_array, as_string_array, as_string_view_array, +}; +use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, + Volatility, +}; +use datafusion_functions::utils::make_scalar_function; +use url::Url; + +#[derive(Debug)] +pub struct ParseUrl { + signature: Signature, +} + +impl Default for ParseUrl { + fn default() -> Self { + Self::new() + } +} + +impl ParseUrl { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Uniform( + 1, + vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8], + ), + TypeSignature::Uniform( + 2, + vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8], + ), + TypeSignature::Uniform( + 3, + vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8], + ), + ], + Volatility::Immutable, + ), + } + } + /// Parses a URL and extracts the specified component. + /// + /// This function takes a URL string and extracts different parts of it based on the + /// `part` parameter. For query parameters, an optional `key` can be specified to + /// extract a specific query parameter value. + /// + /// # Arguments + /// + /// * `value` - The URL string to parse + /// * `part` - The component of the URL to extract. Valid values are: + /// - `"HOST"` - The hostname (e.g., "example.com") + /// - `"PATH"` - The path portion (e.g., "/path/to/resource") + /// - `"QUERY"` - The query string or a specific query parameter + /// - `"REF"` - The fragment/anchor (the part after #) + /// - `"PROTOCOL"` - The URL scheme (e.g., "https", "http") + /// - `"FILE"` - The path with query string (e.g., "/path?query=value") + /// - `"AUTHORITY"` - The authority component (host:port) + /// - `"USERINFO"` - The user information (username:password) + /// * `key` - Optional parameter used only with `"QUERY"`. When provided, extracts + /// the value of the specific query parameter with this key name. + /// + /// # Returns + /// + /// * `Ok(Some(String))` - The extracted URL component as a string + /// * `Ok(None)` - If the requested component doesn't exist or is empty + /// * `Err(DataFusionError)` - If the URL is malformed and cannot be parsed + /// + fn parse(value: &str, part: &str, key: Option<&str>) -> Result<Option<String>> { + Url::parse(value) + .map_err(|e| exec_datafusion_err!("{e:?}")) + .map(|url| match part { + "HOST" => url.host_str().map(String::from), + "PATH" => Some(url.path().to_string()), + "QUERY" => match key { + None => url.query().map(String::from), + Some(key) => url + .query_pairs() + .find(|(k, _)| k == key) + .map(|(_, v)| v.into_owned()), + }, + "REF" => url.fragment().map(String::from), + "PROTOCOL" => Some(url.scheme().to_string()), + "FILE" => { + let path = url.path(); + match url.query() { + Some(query) => Some(format!("{path}?{query}")), + None => Some(path.to_string()), + } + } + "AUTHORITY" => Some(url.authority().to_string()), + "USERINFO" => { + let username = url.username(); + if username.is_empty() { + return None; + } + match url.password() { + Some(password) => Some(format!("{username}:{password}")), + None => Some(username.to_string()), + } + } + _ => None, + }) + } +} + +impl ScalarUDFImpl for ParseUrl { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "parse_url" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { + if arg_types.len() < 2 || arg_types.len() > 3 { + return plan_err!( + "{} expects 2 or 3 arguments, but got {}", + self.name(), + arg_types.len() + ); + } + match arg_types.len() { + 2 | 3 => { + if arg_types + .iter() + .any(|arg| matches!(arg, DataType::LargeUtf8)) + { + Ok(DataType::LargeUtf8) + } else if arg_types + .iter() + .any(|arg| matches!(arg, DataType::Utf8View)) + { + Ok(DataType::Utf8View) + } else { + Ok(DataType::Utf8) + } + } + _ => plan_err!( + "`{}` expects 2 or 3 arguments, got {}", + &self.name(), + arg_types.len() + ), + } + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { + let ScalarFunctionArgs { args, .. } = args; + make_scalar_function(spark_parse_url, vec![])(&args) + } +} + +/// Core implementation of URL parsing function. +/// +/// # Arguments +/// +/// * `args` - A slice of ArrayRef containing the input arrays: +/// - `args[0]` - URL array: The URLs to parse +/// - `args[1]` - Part array: The URL components to extract (HOST, PATH, QUERY, etc.) +/// - `args[2]` - Key array (optional): For QUERY part, the specific parameter names to extract +/// +/// # Return Value +/// +/// Returns `Result<ArrayRef>` containing: +/// - A string array with extracted URL components +/// - `None` values where extraction failed or component doesn't exist +/// - The output array type (StringArray or LargeStringArray) is determined by input types +/// +fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> { + if args.len() < 2 || args.len() > 3 { + return exec_err!( + "{} expects 2 or 3 arguments, but got {}", + "`parse_url`", + args.len() + ); + } + // Required arguments + let url = &args[0]; + let part = &args[1]; + + let result = if args.len() == 3 { + let key = &args[2]; + + match (url.data_type(), part.data_type(), key.data_type()) { + (DataType::Utf8, DataType::Utf8, DataType::Utf8) => { + process_parse_url::<_, _, _, StringArray>( + as_string_array(url)?, + as_string_array(part)?, + as_string_array(key)?, + ) + } + (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => { + process_parse_url::<_, _, _, StringArray>( + as_string_view_array(url)?, + as_string_view_array(part)?, + as_string_view_array(key)?, + ) + } + (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_large_string_array(part)?, + as_large_string_array(key)?, + ) + } + _ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args), + } + } else { + // The 'key' argument is omitted, assume all values are null + // Create 'null' string array for 'key' argument + let mut builder: GenericStringBuilder<i32> = GenericStringBuilder::new(); + for _ in 0..args[0].len() { + builder.append_null(); + } + let key = builder.finish(); + + match (url.data_type(), part.data_type()) { + (DataType::Utf8, DataType::Utf8) => { + process_parse_url::<_, _, _, StringArray>( + as_string_array(url)?, + as_string_array(part)?, + &key, + ) + } + (DataType::Utf8View, DataType::Utf8View) => { + process_parse_url::<_, _, _, StringArray>( + as_string_view_array(url)?, + as_string_view_array(part)?, + &key, + ) + } + (DataType::LargeUtf8, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_large_string_array(part)?, + &key, + ) + } + _ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args), + } + }; + result +} + +fn process_parse_url<'a, A, B, C, T>( + url_array: &'a A, + part_array: &'a B, + key_array: &'a C, +) -> Result<ArrayRef> +where + &'a A: StringArrayType<'a>, + &'a B: StringArrayType<'a>, + &'a C: StringArrayType<'a>, + T: Array + FromIterator<Option<String>> + 'static, +{ + url_array + .iter() + .zip(part_array.iter()) + .zip(key_array.iter()) + .map(|((url, part), key)| { + if let (Some(url), Some(part), key) = (url, part, key) { + ParseUrl::parse(url, part, key) + } else { + Ok(None) + } + }) + .collect::<Result<T>>() + .map(|array| Arc::new(array) as ArrayRef) +} diff --git a/datafusion/sqllogictest/test_files/spark/url/parse_url.slt b/datafusion/sqllogictest/test_files/spark/url/parse_url.slt index dc4e0b3ac7..cca07ceb6f 100644 --- a/datafusion/sqllogictest/test_files/spark/url/parse_url.slt +++ b/datafusion/sqllogictest/test_files/spark/url/parse_url.slt @@ -15,23 +15,63 @@ # specific language governing permissions and limitations # under the License. -# This file was originally created by a porting script from: -# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function -# This file is part of the implementation of the datafusion-spark function library. -# For more information, please see: -# https://github.com/apache/datafusion/issues/15914 - -## Original Query: SELECT parse_url('http://spark.apache.org/path?query=1', 'HOST'); -## PySpark 3.5.5 Result: {'parse_url(http://spark.apache.org/path?query=1, HOST)': 'spark.apache.org', 'typeof(parse_url(http://spark.apache.org/path?query=1, HOST))': 'string', 'typeof(http://spark.apache.org/path?query=1)': 'string', 'typeof(HOST)': 'string'} -#query -#SELECT parse_url('http://spark.apache.org/path?query=1'::string, 'HOST'::string); - -## Original Query: SELECT parse_url('http://spark.apache.org/path?query=1', 'QUERY'); -## PySpark 3.5.5 Result: {'parse_url(http://spark.apache.org/path?query=1, QUERY)': 'query=1', 'typeof(parse_url(http://spark.apache.org/path?query=1, QUERY))': 'string', 'typeof(http://spark.apache.org/path?query=1)': 'string', 'typeof(QUERY)': 'string'} -#query -#SELECT parse_url('http://spark.apache.org/path?query=1'::string, 'QUERY'::string); - -## Original Query: SELECT parse_url('http://spark.apache.org/path?query=1', 'QUERY', 'query'); -## PySpark 3.5.5 Result: {'parse_url(http://spark.apache.org/path?query=1, QUERY, query)': '1', 'typeof(parse_url(http://spark.apache.org/path?query=1, QUERY, query))': 'string', 'typeof(http://spark.apache.org/path?query=1)': 'string', 'typeof(QUERY)': 'string', 'typeof(query)': 'string'} -#query -#SELECT parse_url('http://spark.apache.org/path?query=1'::string, 'QUERY'::string, 'query'::string); +query T +SELECT parse_url('http://spark.apache.org/path?query=1'::string, 'HOST'::string); +---- +spark.apache.org + +query T +SELECT parse_url('http://spark.apache.org/path?query=1'::string, 'QUERY'::string); +---- +query=1 + +query T +SELECT parse_url('http://spark.apache.org/path?query=1'::string, 'QUERY'::string, 'query'::string); +---- +1 + +query T +SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 'HOST'::string); +---- +spark.apache.org + +query T +SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 'PATH'::string); +---- +/path + +query T +SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 'QUERY'::string); +---- +query=1 + +query T +SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 'REF'::string); +---- +Ref + +query T +SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 'PROTOCOL'::string); +---- +http + +query T +SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 'FILE'::string); +---- +/path?query=1 + +query T +SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 'AUTHORITY'::string); +---- +useri...@spark.apache.org + +query T +SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string, 'USERINFO'::string); +---- +userinfo + +statement error parse_url expects 2 or 3 arguments, but got 1 +SELECT parse_url('http://useri...@spark.apache.org/path?query=1#Ref'::string); + +statement error 'parse_url' does not support zero arguments +SELECT parse_url(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org