This is an automated email from the ASF dual-hosted git repository.
parthchandra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new b68c615274 feat: make parse_url compatible (#4413)
b68c615274 is described below
commit b68c61527440d894a3fc73ce7905fd2e54abce23
Author: Parth Chandra <[email protected]>
AuthorDate: Thu May 28 13:07:52 2026 -0700
feat: make parse_url compatible (#4413)
* feat: make parse_url compatible
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
.../contributor-guide/spark_expressions_support.md | 2 +-
native/core/src/execution/jni_api.rs | 7 +-
native/spark-expr/src/lib.rs | 1 +
native/spark-expr/src/url_funcs/mod.rs | 20 +
native/spark-expr/src/url_funcs/parse_url.rs | 684 +++++++++++++++++++++
.../main/scala/org/apache/comet/serde/url.scala | 10 -
.../sql-tests/expressions/url/parse_url.sql | 143 ++++-
.../sql-tests/expressions/url/parse_url_ansi.sql | 14 +-
8 files changed, 857 insertions(+), 24 deletions(-)
diff --git a/docs/source/contributor-guide/spark_expressions_support.md
b/docs/source/contributor-guide/spark_expressions_support.md
index 0f81d268e7..7093120d51 100644
--- a/docs/source/contributor-guide/spark_expressions_support.md
+++ b/docs/source/contributor-guide/spark_expressions_support.md
@@ -605,7 +605,7 @@
### url_funcs
-- [x] parse_url (Incompatible: native diverges from Spark on edge cases)
+- [x] parse_url
- [x] try_url_decode
- 4.0.1, 2026-05-05
- [x] url_decode
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index 0dcd78ba0f..ac223a462b 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -42,6 +42,7 @@ use datafusion::{
prelude::{SessionConfig, SessionContext},
};
use datafusion_comet_proto::spark_operator::Operator;
+use datafusion_comet_spark_expr::url_funcs::{CometParseUrl, CometTryParseUrl};
use datafusion_spark::function::array::array_contains::SparkArrayContains;
use datafusion_spark::function::bitwise::bit_count::SparkBitCount;
use datafusion_spark::function::bitwise::bit_get::SparkBitGet;
@@ -69,8 +70,6 @@ use datafusion_spark::function::string::char::CharFunc;
use datafusion_spark::function::string::concat::SparkConcat;
use datafusion_spark::function::string::luhn_check::SparkLuhnCheck;
use datafusion_spark::function::string::space::SparkSpace;
-use datafusion_spark::function::url::parse_url::ParseUrl as SparkParseUrl;
-use datafusion_spark::function::url::try_parse_url::TryParseUrl as
SparkTryParseUrl;
use datafusion_spark::function::url::try_url_decode::TryUrlDecode as
SparkTryUrlDecode;
use datafusion_spark::function::url::url_decode::UrlDecode as SparkUrlDecode;
use datafusion_spark::function::url::url_encode::UrlEncode as SparkUrlEncode;
@@ -603,8 +602,8 @@ fn register_datafusion_spark_function(session_ctx:
&SessionContext) {
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkUrlEncode::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkTryUrlDecode::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCsc::default()));
-
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkParseUrl::default()));
-
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkTryParseUrl::default()));
+
session_ctx.register_udf(ScalarUDF::new_from_impl(CometParseUrl::default()));
+
session_ctx.register_udf(ScalarUDF::new_from_impl(CometTryParseUrl::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkFactorial::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSec::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkRint::default()));
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index f60f01f48e..d27b15b75d 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -63,6 +63,7 @@ mod map_funcs;
pub use map_funcs::spark_map_sort;
mod math_funcs;
mod nondetermenistic_funcs;
+pub mod url_funcs;
pub use array_funcs::*;
pub use conditional_funcs::*;
diff --git a/native/spark-expr/src/url_funcs/mod.rs
b/native/spark-expr/src/url_funcs/mod.rs
new file mode 100644
index 0000000000..20a95d97e8
--- /dev/null
+++ b/native/spark-expr/src/url_funcs/mod.rs
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod parse_url;
+
+pub use parse_url::{CometParseUrl, CometTryParseUrl};
diff --git a/native/spark-expr/src/url_funcs/parse_url.rs
b/native/spark-expr/src/url_funcs/parse_url.rs
new file mode 100644
index 0000000000..3f78d9ce71
--- /dev/null
+++ b/native/spark-expr/src/url_funcs/parse_url.rs
@@ -0,0 +1,684 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Spark-compatible parse_url / try_parse_url UDFs.
+//!
+//! The upstream datafusion-spark crate uses the `url` crate (WHATWG URL
Standard),
+//! which diverges from Spark's java.net.URI (RFC 3986) on several edge cases.
+//! This module uses RFC 3986 Appendix B regex parsing to match Spark exactly.
+
+use std::any::Any;
+use std::sync::{Arc, LazyLock};
+
+use arrow::array::{
+ Array, ArrayRef, LargeStringArray, StringArray, StringArrayType,
StringViewArray,
+};
+use arrow::datatypes::DataType;
+use datafusion::common::cast::{as_large_string_array, as_string_array,
as_string_view_array};
+use datafusion::common::{exec_datafusion_err, exec_err, Result, ScalarValue};
+use datafusion::logical_expr::{
+ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
TypeSignature, Volatility,
+};
+use regex::Regex;
+
+// RFC 3986 Appendix B decomposition regex.
+// Groups: 2=scheme, 4=authority, 5=path, 7=query, 9=fragment
+static URI_REGEX: LazyLock<Regex> = LazyLock::new(|| {
+
Regex::new(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?$").unwrap()
+});
+
+fn extract_host(authority: &str) -> Option<String> {
+ let host_port = match authority.rfind('@') {
+ Some(pos) => &authority[pos + 1..],
+ None => authority,
+ };
+ if host_port.is_empty() {
+ return None;
+ }
+ if host_port.starts_with('[') {
+ // IPv6: [::1] or [::1]:8080
+ let bracket_end = host_port.find(']')?;
+ Some(host_port[..=bracket_end].to_string())
+ } else {
+ // host or host:port - strip port
+ match host_port.rfind(':') {
+ Some(colon_pos) => {
+ let after_colon = &host_port[colon_pos + 1..];
+ if after_colon.is_empty() || after_colon.bytes().all(|b|
b.is_ascii_digit()) {
+ Some(host_port[..colon_pos].to_string())
+ } else {
+ // java.net.URI rejects non-digit ports
+ None
+ }
+ }
+ None => Some(host_port.to_string()),
+ }
+ }
+}
+
+fn extract_userinfo(authority: &str) -> Option<String> {
+ authority.rfind('@').map(|pos| authority[..pos].to_string())
+}
+
+fn extract_query_value(query: &str, key: &str) -> Option<String> {
+ // Spark uses Pattern.compile("(&|^)" + key + "=([^&]*)") with no escaping,
+ // so the key is treated as a regex pattern.
+ let pattern = format!("(&|^){}=([^&]*)", key);
+ match Regex::new(&pattern) {
+ Ok(re) => re
+ .captures(query)
+ .and_then(|caps| caps.get(2).map(|m| m.as_str().to_string())),
+ Err(_) => None,
+ }
+}
+
+fn has_invalid_uri_chars(s: &str) -> bool {
+ s.bytes()
+ .any(|b| b == b' ' || b == b'{' || b == b'}' || b == b'<' || b == b'>'
|| b < 0x20)
+}
+
+fn invalid_url_err(value: &str) -> datafusion::common::DataFusionError {
+ exec_datafusion_err!(
+ "[INVALID_URL] The provided URL '{}' is not valid. Use `try_parse_url`
to tolerate \
+ malformed URLs and return NULL instead. SQLSTATE: 22P02",
+ value
+ )
+}
+
+fn parse_url_component(value: &str, part: &str, key: Option<&str>) ->
Result<Option<String>> {
+ if key.is_some() && part != "QUERY" {
+ return Ok(None);
+ }
+
+ if value.is_empty() {
+ return match part {
+ "PATH" | "FILE" => Ok(Some(String::new())),
+ _ => Ok(None),
+ };
+ }
+
+ if has_invalid_uri_chars(value) {
+ return Err(invalid_url_err(value));
+ }
+
+ let caps = match URI_REGEX.captures(value) {
+ Some(c) => c,
+ None => return Ok(None),
+ };
+
+ let scheme = caps.get(2).map(|m| m.as_str());
+ let authority = caps.get(4).map(|m| m.as_str());
+ let path = caps.get(5).map_or("", |m| m.as_str());
+ let query = caps.get(7).map(|m| m.as_str());
+ let fragment = caps.get(9).map(|m| m.as_str());
+
+ if scheme.is_none() {
+ if value.contains("://") {
+ return Err(invalid_url_err(value));
+ }
+ return Ok(None);
+ }
+
+ // java.net.URI rejects unbalanced brackets in authority
+ if let Some(auth) = authority {
+ let has_open = auth.contains('[');
+ let has_close = auth.contains(']');
+ if has_open != has_close {
+ return Err(invalid_url_err(value));
+ }
+ }
+
+ match part {
+ "HOST" => Ok(authority.and_then(extract_host)),
+ "PATH" => Ok(Some(path.to_string())),
+ "QUERY" => match key {
+ None => Ok(query.map(String::from)),
+ Some(k) => Ok(query.and_then(|q| extract_query_value(q, k))),
+ },
+ "REF" => Ok(fragment.map(String::from)),
+ "PROTOCOL" => Ok(scheme.map(String::from)),
+ "FILE" => match query {
+ Some(q) => Ok(Some(format!("{path}?{q}"))),
+ None => Ok(Some(path.to_string())),
+ },
+ "AUTHORITY" => Ok(authority.filter(|a|
!a.is_empty()).map(String::from)),
+ "USERINFO" => Ok(authority.and_then(extract_userinfo)),
+ _ => Ok(None),
+ }
+}
+
+// ---------------------------------------------------------------------------
+// CometParseUrl UDF (failOnError = true)
+// ---------------------------------------------------------------------------
+
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct CometParseUrl {
+ signature: Signature,
+}
+
+impl Default for CometParseUrl {
+ fn default() -> Self {
+ Self {
+ signature: Signature::one_of(
+ vec![TypeSignature::String(2), TypeSignature::String(3)],
+ Volatility::Immutable,
+ ),
+ }
+ }
+}
+
+impl ScalarUDFImpl for CometParseUrl {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+ fn name(&self) -> &str {
+ "parse_url"
+ }
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ Ok(arg_types[0].clone())
+ }
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ invoke_parse_url(&args.args, |x| x)
+ }
+}
+
+// ---------------------------------------------------------------------------
+// CometTryParseUrl UDF (failOnError = false)
+// ---------------------------------------------------------------------------
+
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct CometTryParseUrl {
+ signature: Signature,
+}
+
+impl Default for CometTryParseUrl {
+ fn default() -> Self {
+ Self {
+ signature: Signature::one_of(
+ vec![TypeSignature::String(2), TypeSignature::String(3)],
+ Volatility::Immutable,
+ ),
+ }
+ }
+}
+
+impl ScalarUDFImpl for CometTryParseUrl {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+ fn name(&self) -> &str {
+ "try_parse_url"
+ }
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ Ok(arg_types[0].clone())
+ }
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ invoke_parse_url(&args.args, |x| match x {
+ Err(_) => Ok(None),
+ ok => ok,
+ })
+ }
+}
+
+fn invoke_parse_url(
+ args: &[ColumnarValue],
+ handle: impl Fn(Result<Option<String>>) -> Result<Option<String>>,
+) -> Result<ColumnarValue> {
+ let is_scalar = args.iter().all(|a| matches!(a, ColumnarValue::Scalar(_)));
+ let arrays = ColumnarValue::values_to_arrays(args)?;
+ let result = dispatch_parse_url(&arrays, handle)?;
+ if is_scalar {
+ ScalarValue::try_from_array(&result, 0).map(ColumnarValue::Scalar)
+ } else {
+ Ok(ColumnarValue::Array(result))
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Array dispatch
+// ---------------------------------------------------------------------------
+
+fn dispatch_parse_url(
+ args: &[ArrayRef],
+ handle: impl Fn(Result<Option<String>>) -> Result<Option<String>>,
+) -> Result<ArrayRef> {
+ if args.len() < 2 || args.len() > 3 {
+ return exec_err!("parse_url expects 2 or 3 arguments, but got {}",
args.len());
+ }
+ let url = &args[0];
+ let part = &args[1];
+
+ if args.len() == 3 {
+ let key = &args[2];
+ match (url.data_type(), part.data_type(), key.data_type()) {
+ (DataType::Utf8, DataType::Utf8, DataType::Utf8) => {
+ process_parse_url::<_, _, _, StringArray>(
+ as_string_array(url)?,
+ as_string_array(part)?,
+ as_string_array(key)?,
+ handle,
+ )
+ }
+ (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => {
+ process_parse_url::<_, _, _, StringViewArray>(
+ as_string_view_array(url)?,
+ as_string_view_array(part)?,
+ as_string_view_array(key)?,
+ handle,
+ )
+ }
+ (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) =>
{
+ process_parse_url::<_, _, _, LargeStringArray>(
+ as_large_string_array(url)?,
+ as_large_string_array(part)?,
+ as_large_string_array(key)?,
+ handle,
+ )
+ }
+ _ => exec_err!(
+ "parse_url expects STRING arguments, got ({}, {}, {})",
+ url.data_type(),
+ part.data_type(),
+ key.data_type()
+ ),
+ }
+ } else {
+ let null_keys = StringArray::new_null(url.len());
+
+ match (url.data_type(), part.data_type()) {
+ (DataType::Utf8, DataType::Utf8) => process_parse_url::<_, _, _,
StringArray>(
+ as_string_array(url)?,
+ as_string_array(part)?,
+ &null_keys,
+ handle,
+ ),
+ (DataType::Utf8View, DataType::Utf8View) => {
+ process_parse_url::<_, _, _, StringViewArray>(
+ as_string_view_array(url)?,
+ as_string_view_array(part)?,
+ &null_keys,
+ handle,
+ )
+ }
+ (DataType::LargeUtf8, DataType::LargeUtf8) => {
+ process_parse_url::<_, _, _, LargeStringArray>(
+ as_large_string_array(url)?,
+ as_large_string_array(part)?,
+ &null_keys,
+ handle,
+ )
+ }
+ _ => exec_err!(
+ "parse_url expects STRING arguments, got ({}, {})",
+ url.data_type(),
+ part.data_type()
+ ),
+ }
+ }
+}
+
+fn process_parse_url<'a, A, B, C, T>(
+ url_array: &'a A,
+ part_array: &'a B,
+ key_array: &'a C,
+ handle: impl Fn(Result<Option<String>>) -> Result<Option<String>>,
+) -> Result<ArrayRef>
+where
+ &'a A: StringArrayType<'a>,
+ &'a B: StringArrayType<'a>,
+ &'a C: StringArrayType<'a>,
+ T: Array + FromIterator<Option<String>> + 'static,
+{
+ url_array
+ .iter()
+ .zip(part_array.iter())
+ .zip(key_array.iter())
+ .map(|((url, part), key)| {
+ if let (Some(url), Some(part)) = (url, part) {
+ handle(parse_url_component(url, part, key))
+ } else {
+ Ok(None)
+ }
+ })
+ .collect::<Result<T>>()
+ .map(|array| Arc::new(array) as ArrayRef)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn sa(vals: &[Option<&str>]) -> ArrayRef {
+ Arc::new(StringArray::from(vals.to_vec())) as ArrayRef
+ }
+
+ // -----------------------------------------------------------------------
+ // Unit tests for parse_url_component
+ // -----------------------------------------------------------------------
+
+ #[test]
+ fn test_host() -> Result<()> {
+ assert_eq!(
+ parse_url_component("https://example.com/a?x=1", "HOST", None)?,
+ Some("example.com".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_path_normal() -> Result<()> {
+ assert_eq!(
+ parse_url_component("https://example.com/a/b", "PATH", None)?,
+ Some("/a/b".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_fix1_empty_string_path() -> Result<()> {
+ assert_eq!(parse_url_component("", "PATH", None)?, Some("".into()));
+ assert_eq!(parse_url_component("", "FILE", None)?, Some("".into()));
+ assert_eq!(parse_url_component("", "HOST", None)?, None);
+ assert_eq!(parse_url_component("", "QUERY", None)?, None);
+ assert_eq!(parse_url_component("", "PROTOCOL", None)?, None);
+ assert_eq!(parse_url_component("", "REF", None)?, None);
+ assert_eq!(parse_url_component("", "AUTHORITY", None)?, None);
+ assert_eq!(parse_url_component("", "USERINFO", None)?, None);
+ Ok(())
+ }
+
+ #[test]
+ fn test_fix2_file_without_path() -> Result<()> {
+ assert_eq!(
+ parse_url_component("http://host?foo=bar", "FILE", None)?,
+ Some("?foo=bar".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_fix3_path_trailing_slash() -> Result<()> {
+ assert_eq!(
+ parse_url_component("https://example.com/", "PATH", None)?,
+ Some("/".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_fix4_query_key_no_decode() -> Result<()> {
+ assert_eq!(
+ parse_url_component(
+ "http://example.com/path?key=value%20encoded",
+ "QUERY",
+ Some("key")
+ )?,
+ Some("value%20encoded".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_query_no_key() -> Result<()> {
+ assert_eq!(
+ parse_url_component("https://ex.com/p?a=1&b=2", "QUERY", None)?,
+ Some("a=1&b=2".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_query_with_key() -> Result<()> {
+ assert_eq!(
+ parse_url_component("https://ex.com/p?a=1&b=2", "QUERY",
Some("a"))?,
+ Some("1".into())
+ );
+ assert_eq!(
+ parse_url_component("https://ex.com/p?a=1&b=2", "QUERY",
Some("c"))?,
+ None
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_ref_protocol_userinfo_file_authority() -> Result<()> {
+ let url = "ftp://user:[email protected]:21/files?x=1#frag";
+ assert_eq!(parse_url_component(url, "REF", None)?,
Some("frag".into()));
+ assert_eq!(
+ parse_url_component(url, "PROTOCOL", None)?,
+ Some("ftp".into())
+ );
+ assert_eq!(
+ parse_url_component(url, "USERINFO", None)?,
+ Some("user:pwd".into())
+ );
+ assert_eq!(
+ parse_url_component(url, "FILE", None)?,
+ Some("/files?x=1".into())
+ );
+ // Authority includes port (matches java.net.URI)
+ assert_eq!(
+ parse_url_component(url, "AUTHORITY", None)?,
+ Some("user:[email protected]:21".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_malformed_no_scheme() -> Result<()> {
+ assert_eq!(parse_url_component("notaurl", "HOST", None)?, None);
+ Ok(())
+ }
+
+ #[test]
+ fn test_malformed_with_invalid_chars() {
+ let result = parse_url_component("not a url at all", "HOST", None);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_malformed_with_scheme_separator() {
+ let result = parse_url_component("://missing-scheme", "HOST", None);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_ipv6_host() -> Result<()> {
+ assert_eq!(
+ parse_url_component("http://[::1]:8080/path", "HOST", None)?,
+ Some("[::1]".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_null_handling_array() -> Result<()> {
+ let urls = sa(&[Some("https://example.com/path?k=v"), None]);
+ let parts = sa(&[Some("HOST"), Some("HOST")]);
+ let out = dispatch_parse_url(&[urls, parts], |x| x)?;
+ let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(out_sa.value(0), "example.com");
+ assert!(out_sa.is_null(1));
+ Ok(())
+ }
+
+ #[test]
+ fn test_three_arg_query_key() -> Result<()> {
+ let urls = sa(&[
+ Some("https://example.com/a?x=1&y=2"),
+ Some("https://ex.com/?a=1"),
+ ]);
+ let parts = sa(&[Some("QUERY"), Some("QUERY")]);
+ let keys = sa(&[Some("y"), Some("b")]);
+ let out = dispatch_parse_url(&[urls, parts, keys], |x| x)?;
+ let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(out_sa.value(0), "2");
+ assert!(out_sa.is_null(1));
+ Ok(())
+ }
+
+ #[test]
+ fn test_try_parse_url_tolerates_errors() -> Result<()> {
+ let handler = |x: Result<Option<String>>| match x {
+ Err(_) => Ok(None),
+ ok => ok,
+ };
+ let urls = sa(&[Some("://bad"), Some("http://ok.com/p")]);
+ let parts = sa(&[Some("HOST"), Some("HOST")]);
+ let out = dispatch_parse_url(&[urls, parts], handler)?;
+ let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap();
+ assert!(out_sa.is_null(0));
+ assert_eq!(out_sa.value(1), "ok.com");
+ Ok(())
+ }
+
+ #[test]
+ fn test_spaces_in_url() {
+ let result = parse_url_component("http://ho st/path", "HOST", None);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_double_slashes_in_path() -> Result<()> {
+ assert_eq!(
+ parse_url_component("http://example.com//double//slashes", "PATH",
None)?,
+ Some("//double//slashes".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_empty_query() -> Result<()> {
+ assert_eq!(
+ parse_url_component("http://example.com/path?", "QUERY", None)?,
+ Some("".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_file_with_no_fragment() -> Result<()> {
+ assert_eq!(
+ parse_url_component("http://example.com#frag", "FILE", None)?,
+ Some("".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_duplicate_query_keys() -> Result<()> {
+ assert_eq!(
+ parse_url_component("http://example.com/path?a=1&a=2", "QUERY",
Some("a"))?,
+ Some("1".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_query_value_contains_equals() -> Result<()> {
+ assert_eq!(
+ parse_url_component("http://host/p?a=b=c", "QUERY", Some("a"))?,
+ Some("b=c".into())
+ );
+ assert_eq!(
+ parse_url_component("http://host/p?tok=abc=def=&x=1", "QUERY",
Some("tok"))?,
+ Some("abc=def=".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_three_arg_non_query_returns_null() -> Result<()> {
+ assert_eq!(
+ parse_url_component("http://host/path", "HOST", Some("key"))?,
+ None
+ );
+ assert_eq!(
+ parse_url_component("http://host/path", "PATH", Some("key"))?,
+ None
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_host_empty_port() -> Result<()> {
+ assert_eq!(
+ parse_url_component("http://host:/path", "HOST", None)?,
+ Some("host".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_empty_authority() -> Result<()> {
+ assert_eq!(
+ parse_url_component("http:///path", "AUTHORITY", None)?,
+ None
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_regex_metachar_in_query_key() -> Result<()> {
+ // Spark treats key as regex: ".bc" matches "abc"
+ assert_eq!(
+ parse_url_component("http://h/p?abc=1", "QUERY", Some(".bc"))?,
+ Some("1".into())
+ );
+ assert_eq!(
+ parse_url_component("http://h/p?abc=1", "QUERY", Some("a.c"))?,
+ Some("1".into())
+ );
+ // Literal key that doesn't match as regex
+ assert_eq!(
+ parse_url_component("http://h/p?abc=1", "QUERY", Some("x.c"))?,
+ None
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_non_digit_port_returns_null_host() -> Result<()> {
+ assert_eq!(parse_url_component("http://host:abc/", "HOST", None)?,
None);
+ // Other parts still work
+ assert_eq!(
+ parse_url_component("http://host:abc/", "PATH", None)?,
+ Some("/".into())
+ );
+ assert_eq!(
+ parse_url_component("http://host:abc/", "AUTHORITY", None)?,
+ Some("host:abc".into())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_unbalanced_ipv6_bracket() {
+ let result = parse_url_component("http://[::1/path", "AUTHORITY",
None);
+ assert!(result.is_err());
+ let result = parse_url_component("http://[::1/path", "PATH", None);
+ assert!(result.is_err());
+ let result = parse_url_component("http://[::1/path", "HOST", None);
+ assert!(result.is_err());
+ }
+}
diff --git a/spark/src/main/scala/org/apache/comet/serde/url.scala
b/spark/src/main/scala/org/apache/comet/serde/url.scala
index fa5742860c..b672cc17bb 100644
--- a/spark/src/main/scala/org/apache/comet/serde/url.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/url.scala
@@ -23,18 +23,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
ParseUrl}
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal,
optExprWithInfo, scalarFunctionExprToProto}
-// On Spark 4.x ParseUrl is RuntimeReplaceable and handled via CometExprShim
(ParseUrlEvaluator).
object CometParseUrl extends CometExpressionSerde[ParseUrl] {
- private val incompatibleReason =
- "Native parse_url diverges from Spark on several edge cases " +
- "(https://github.com/apache/datafusion/issues/21943)"
-
- override def getIncompatibleReasons(): Seq[String] = Seq(incompatibleReason)
-
- override def getSupportLevel(expr: ParseUrl): SupportLevel =
- Incompatible(Some(incompatibleReason))
-
override def convert(
expr: ParseUrl,
inputs: Seq[Attribute],
diff --git a/spark/src/test/resources/sql-tests/expressions/url/parse_url.sql
b/spark/src/test/resources/sql-tests/expressions/url/parse_url.sql
index b6882ec19b..fc87e3d77c 100644
--- a/spark/src/test/resources/sql-tests/expressions/url/parse_url.sql
+++ b/spark/src/test/resources/sql-tests/expressions/url/parse_url.sql
@@ -15,6 +15,8 @@
-- specific language governing permissions and limitations
-- under the License.
+-- Test parse_url() - native execution (Compatible)
+
statement
CREATE TABLE test_parse_url(url string) USING parquet
@@ -22,16 +24,149 @@ statement
INSERT INTO test_parse_url VALUES
('http://spark.apache.org/path?query=1'),
('https://user:pass@host:8080/path?k=v#ref'),
+ ('http://example.com/path?a=1&b=2&a=3'),
+ ('ftp://ftp.example.com/dir/file.txt'),
(NULL)
-query expect_fallback(not fully compatible with Spark)
+-- HOST
+query
SELECT parse_url(url, 'HOST') FROM test_parse_url
-query expect_fallback(not fully compatible with Spark)
+-- PATH
+query
SELECT parse_url(url, 'PATH') FROM test_parse_url
-query expect_fallback(not fully compatible with Spark)
+-- QUERY (no key)
+query
SELECT parse_url(url, 'QUERY') FROM test_parse_url
-query expect_fallback(not fully compatible with Spark)
+-- QUERY with key
+query
SELECT parse_url(url, 'QUERY', 'k') FROM test_parse_url
+
+-- PROTOCOL
+query
+SELECT parse_url(url, 'PROTOCOL') FROM test_parse_url
+
+-- REF (fragment)
+query
+SELECT parse_url(url, 'REF') FROM test_parse_url
+
+-- AUTHORITY
+query
+SELECT parse_url(url, 'AUTHORITY') FROM test_parse_url
+
+-- USERINFO
+query
+SELECT parse_url(url, 'USERINFO') FROM test_parse_url
+
+-- FILE
+query
+SELECT parse_url(url, 'FILE') FROM test_parse_url
+
+-- literal arguments
+query
+SELECT parse_url('http://example.com/path?foo=bar', 'HOST')
+
+query
+SELECT parse_url('http://example.com/path?foo=bar', 'PATH')
+
+query
+SELECT parse_url('http://example.com/path?foo=bar', 'QUERY')
+
+query
+SELECT parse_url('http://example.com/path?foo=bar', 'QUERY', 'foo')
+
+query
+SELECT parse_url('http://example.com/path?foo=bar', 'PROTOCOL')
+
+-- NULL handling
+query
+SELECT parse_url(NULL, 'HOST')
+
+query
+SELECT parse_url('http://example.com', NULL)
+
+-- invalid part key
+query
+SELECT parse_url('http://example.com', 'INVALID')
+
+-- malformed URL returns NULL
+query
+SELECT parse_url('not a url at all', 'HOST')
+
+query
+SELECT parse_url('', 'HOST')
+
+-- column-valued part key
+statement
+CREATE TABLE test_parse_url_parts(url string, part string, key string) USING
parquet
+
+statement
+INSERT INTO test_parse_url_parts VALUES
+ ('http://example.com/path?foo=bar', 'HOST', NULL),
+ ('http://example.com/path?foo=bar', 'PATH', NULL),
+ ('http://example.com/path?foo=bar', 'QUERY', 'foo'),
+ ('https://user:pw@host:9090/p?a=1#frag', 'REF', NULL),
+ ('https://user:pw@host:9090/p?a=1#frag', 'USERINFO', NULL)
+
+query
+SELECT parse_url(url, part) FROM test_parse_url_parts
+
+query
+SELECT parse_url(url, 'QUERY', key) FROM test_parse_url_parts WHERE key IS NOT
NULL
+
+-- previously divergent edge cases (now fixed)
+query
+SELECT parse_url('http://example.com//double//slashes', 'PATH')
+
+query
+SELECT parse_url('http://example.com/path?key=value%20encoded', 'QUERY', 'key')
+
+query
+SELECT parse_url('http://example.com/path?', 'QUERY')
+
+query
+SELECT parse_url('http://example.com#frag', 'FILE')
+
+query
+SELECT parse_url('http://[::1]:8080/path', 'HOST')
+
+query
+SELECT parse_url('http://example.com/path?a=1&a=2', 'QUERY', 'a')
+
+-- Fix #2: FILE without explicit path
+query
+SELECT parse_url('http://example.com?foo=bar', 'FILE')
+
+-- Fix #3: PATH on trailing slash
+query
+SELECT parse_url('http://example.com/', 'PATH')
+
+-- query value containing '='
+query
+SELECT parse_url('http://host/p?a=b=c', 'QUERY', 'a')
+
+-- 3-arg with non-QUERY part returns NULL
+query
+SELECT parse_url('http://host/path', 'HOST', 'key')
+
+-- empty port
+query
+SELECT parse_url('http://host:/path', 'HOST')
+
+-- regex metachar in query key (Spark treats key as regex pattern)
+query
+SELECT parse_url('http://h/p?abc=1', 'QUERY', '.bc')
+
+-- non-digit port returns NULL for HOST
+query
+SELECT parse_url('http://host:abc/', 'HOST')
+
+-- non-digit port: other parts still work
+query
+SELECT parse_url('http://host:abc/', 'AUTHORITY')
+
+-- unbalanced IPv6 bracket is invalid
+query
+SELECT parse_url('http://[::1/path', 'AUTHORITY')
diff --git
a/spark/src/test/resources/sql-tests/expressions/url/parse_url_ansi.sql
b/spark/src/test/resources/sql-tests/expressions/url/parse_url_ansi.sql
index 1e71031f41..b3a0ed779d 100644
--- a/spark/src/test/resources/sql-tests/expressions/url/parse_url_ansi.sql
+++ b/spark/src/test/resources/sql-tests/expressions/url/parse_url_ansi.sql
@@ -17,7 +17,6 @@
-- Test parse_url() in ANSI mode (failOnError=true -> native "parse_url" path)
-- Config: spark.sql.ansi.enabled=true
--- Config: spark.comet.expression.ParseUrl.allowIncompatible=true
-- valid URLs should work identically in ANSI mode
query
@@ -42,12 +41,17 @@ SELECT parse_url('https://user:pass@host:8080/p?k=v#ref',
'REF')
query
SELECT parse_url(NULL, 'HOST')
--- invalid URL throws in ANSI mode (native returns NULL instead of throwing)
-query ignore(known divergence: native parse_url does not throw INVALID_URL for
malformed URLs)
+-- invalid URL throws in ANSI mode
+query expect_error(not a url at all)
SELECT parse_url('not a url at all', 'HOST')
-query ignore(known divergence: native parse_url does not throw INVALID_URL for
malformed URLs)
+query expect_error(://missing-scheme)
SELECT parse_url('://missing-scheme', 'HOST')
-query ignore(known divergence: native parse_url does not throw INVALID_URL for
malformed URLs)
+-- empty string is a valid URI (no throw), just returns NULL for HOST
+query
SELECT parse_url('', 'HOST')
+
+-- unbalanced IPv6 bracket throws in ANSI mode
+query expect_error(http://[::1/path)
+SELECT parse_url('http://[::1/path', 'AUTHORITY')
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]