This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new bca52c334 feat: Add spark-compat mode to integrate datafusion-spark
features au… (#1416)
bca52c334 is described below
commit bca52c334e20d56363628def56e3da79d15e761c
Author: Matt Cuento <[email protected]>
AuthorDate: Wed Feb 11 09:43:44 2026 -0800
feat: Add spark-compat mode to integrate datafusion-spark features au…
(#1416)
---
Cargo.lock | 35 +++++
Cargo.toml | 1 +
ballista/core/Cargo.toml | 2 +
ballista/core/src/extension.rs | 161 ++++++++++++++++++++-
ballista/core/src/object_store.rs | 8 +
ballista/core/src/registry.rs | 92 ++++++++++--
ballista/core/src/utils.rs | 14 +-
ballista/executor/Cargo.toml | 1 +
ballista/executor/src/standalone.rs | 13 +-
ballista/scheduler/Cargo.toml | 3 +-
docs/source/index.rst | 1 +
docs/source/user-guide/deployment/cargo-install.md | 25 ++++
.../user-guide/spark-compatible-functions.md | 121 ++++++++++++++++
examples/Cargo.toml | 11 +-
examples/README.md | 63 ++++++++
examples/examples/remote-spark-functions.rs | 64 ++++++++
16 files changed, 597 insertions(+), 18 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 392bda460..318e2ef74 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1001,6 +1001,7 @@ dependencies = [
"datafusion",
"datafusion-proto",
"datafusion-proto-common",
+ "datafusion-spark",
"futures",
"itertools 0.14.0",
"log",
@@ -2529,6 +2530,29 @@ dependencies = [
"parking_lot",
]
+[[package]]
+name = "datafusion-spark"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "556c431f5f2259620c8223254c0ef57aa9a85c576d4da0166157260f71eb0e25"
+dependencies = [
+ "arrow",
+ "bigdecimal",
+ "chrono",
+ "crc32fast",
+ "datafusion-catalog",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-functions",
+ "datafusion-functions-nested",
+ "log",
+ "percent-encoding",
+ "rand",
+ "sha1",
+ "url",
+]
+
[[package]]
name = "datafusion-sql"
version = "52.1.0"
@@ -5266,6 +5290,17 @@ dependencies = [
"unsafe-libyaml",
]
+[[package]]
+name = "sha1"
+version = "0.10.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
+dependencies = [
+ "cfg-if",
+ "cpufeatures",
+ "digest",
+]
+
[[package]]
name = "sha2"
version = "0.10.9"
diff --git a/Cargo.toml b/Cargo.toml
index d42a87d65..c7c4f6768 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,6 +33,7 @@ datafusion = "52"
datafusion-cli = "52"
datafusion-proto = "52"
datafusion-proto-common = "52"
+datafusion-spark = "52"
datafusion-substrait = "52"
object_store = "0.12"
prost = "0.14"
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 615a5e78e..21353adff 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -41,6 +41,7 @@ default = ["arrow-ipc-optimizations"]
docsrs = []
# Used for testing ONLY: causes all values to hash to the same value (test for
collisions)
force_hash_collisions = ["datafusion/force_hash_collisions"]
+spark-compat = ["dep:datafusion-spark"]
[dependencies]
arrow-flight = { workspace = true }
@@ -52,6 +53,7 @@ clap = { workspace = true, optional = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-proto-common = { workspace = true }
+datafusion-spark = { workspace = true, optional = true }
futures = { workspace = true }
itertools = "0.14"
log = { workspace = true }
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index f7b02395d..5341020d1 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -27,6 +27,11 @@ use crate::serde::{BallistaLogicalExtensionCodec,
BallistaPhysicalExtensionCodec
use datafusion::execution::context::{QueryPlanner, SessionConfig,
SessionState};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::session_state::SessionStateBuilder;
+use datafusion::functions::all_default_functions;
+use datafusion::functions_aggregate::all_default_aggregate_functions;
+use datafusion::functions_nested::all_default_nested_functions;
+use datafusion::functions_window::all_default_window_functions;
+use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::protobuf::LogicalPlanNode;
@@ -43,6 +48,47 @@ use tonic::{Request, Status};
pub type EndpointOverrideFn =
Arc<dyn Fn(Endpoint) -> Result<Endpoint, Box<dyn Error + Send + Sync>> +
Send + Sync>;
+#[cfg(feature = "spark-compat")]
+use datafusion_spark::{
+ all_default_aggregate_functions as spark_aggregate_functions,
+ all_default_scalar_functions as spark_scalar_functions,
+ all_default_window_functions as spark_window_functions,
+};
+
+/// Returns scalar functions for Ballista (DataFusion defaults + Spark when
enabled)
+pub fn ballista_scalar_functions() -> Vec<Arc<ScalarUDF>> {
+ #[allow(unused_mut)]
+ let mut functions = all_default_functions();
+ functions.append(&mut all_default_nested_functions());
+
+ #[cfg(feature = "spark-compat")]
+ functions.extend(spark_scalar_functions());
+
+ functions
+}
+
+/// Returns aggregate functions for Ballista (DataFusion defaults + Spark when
enabled)
+pub fn ballista_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
+ #[allow(unused_mut)]
+ let mut functions = all_default_aggregate_functions();
+
+ #[cfg(feature = "spark-compat")]
+ functions.extend(spark_aggregate_functions());
+
+ functions
+}
+
+/// Returns window functions for Ballista (DataFusion defaults + Spark when
enabled)
+pub fn ballista_window_functions() -> Vec<Arc<WindowUDF>> {
+ #[allow(unused_mut)]
+ let mut functions = all_default_window_functions();
+
+ #[cfg(feature = "spark-compat")]
+ functions.extend(spark_window_functions());
+
+ functions
+}
+
/// Provides methods which adapt [SessionState]
/// for Ballista usage
pub trait SessionStateExt {
@@ -209,6 +255,9 @@ impl SessionStateExt for SessionState {
.with_config(session_config)
.with_runtime_env(Arc::new(runtime_env))
.with_query_planner(Arc::new(planner))
+ .with_scalar_functions(ballista_scalar_functions())
+ .with_aggregate_functions(ballista_aggregate_functions())
+ .with_window_functions(ballista_window_functions())
.build();
Ok(session_state)
@@ -240,7 +289,13 @@ impl SessionStateExt for SessionState {
}
};
- Ok(builder.build())
+ let session_state = builder
+ .with_scalar_functions(ballista_scalar_functions())
+ .with_aggregate_functions(ballista_aggregate_functions())
+ .with_window_functions(ballista_window_functions())
+ .build();
+
+ Ok(session_state)
}
}
@@ -806,4 +861,108 @@ mod test {
.contains("TLS configuration failed")
);
}
+
+ // Tests for helper functions that register DataFusion + Spark functions
+
+ #[test]
+ fn test_ballista_functions_include_datafusion() {
+ use super::{
+ ballista_aggregate_functions, ballista_scalar_functions,
+ ballista_window_functions,
+ };
+
+ // Test scalar functions
+ let scalar_funcs = ballista_scalar_functions();
+ assert!(scalar_funcs.iter().any(|f| f.name() == "abs"));
+ assert!(scalar_funcs.iter().any(|f| f.name() == "ceil"));
+ assert!(scalar_funcs.iter().any(|f| f.name() == "array_length")); //
nested function
+
+ // Test aggregate functions
+ let agg_funcs = ballista_aggregate_functions();
+ assert!(agg_funcs.iter().any(|f| f.name() == "count"));
+ assert!(agg_funcs.iter().any(|f| f.name() == "sum"));
+ assert!(agg_funcs.iter().any(|f| f.name() == "avg"));
+
+ // Test window functions
+ let window_funcs = ballista_window_functions();
+ assert!(window_funcs.iter().any(|f| f.name() == "row_number"));
+ assert!(window_funcs.iter().any(|f| f.name() == "rank"));
+ assert!(window_funcs.iter().any(|f| f.name() == "dense_rank"));
+ }
+
+ #[test]
+ #[cfg(not(feature = "spark-compat"))]
+ fn test_ballista_functions_without_spark() {
+ use super::{
+ ballista_aggregate_functions, ballista_scalar_functions,
+ ballista_window_functions,
+ };
+
+ // Scalar functions should NOT include Spark functions
+ let scalar_funcs = ballista_scalar_functions();
+ assert!(!scalar_funcs.iter().any(|f| f.name() == "sha1"));
+ assert!(!scalar_funcs.iter().any(|f| f.name() == "expm1"));
+
+ // All function types should have baseline DataFusion functions
+ assert!(!ballista_aggregate_functions().is_empty());
+ assert!(!ballista_window_functions().is_empty());
+ }
+
+ #[test]
+ #[cfg(feature = "spark-compat")]
+ fn test_ballista_functions_with_spark() {
+ use super::{
+ ballista_aggregate_functions, ballista_scalar_functions,
+ ballista_window_functions,
+ };
+
+ // Scalar functions should include Spark functions
+ let scalar_funcs = ballista_scalar_functions();
+ assert!(scalar_funcs.iter().any(|f| f.name() == "sha1"));
+ assert!(scalar_funcs.iter().any(|f| f.name() == "expm1"));
+
+ // All function types should have functions available
+ assert!(!ballista_aggregate_functions().is_empty());
+ assert!(!ballista_window_functions().is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_ballista_functions_with_session_state() {
+ use super::{
+ ballista_aggregate_functions, ballista_scalar_functions,
+ ballista_window_functions,
+ };
+
+ let state = SessionStateBuilder::new()
+ .with_default_features()
+ .with_scalar_functions(ballista_scalar_functions())
+ .with_aggregate_functions(ballista_aggregate_functions())
+ .with_window_functions(ballista_window_functions())
+ .build();
+
+ // Verify all function types are registered in SessionState
+ assert!(state.scalar_functions().contains_key("abs"));
+ assert!(state.aggregate_functions().contains_key("count"));
+ assert!(state.window_functions().contains_key("row_number"));
+ }
+
+ #[tokio::test]
+ #[cfg(feature = "spark-compat")]
+ async fn test_ballista_spark_functions_with_session_state() {
+ use super::{
+ ballista_aggregate_functions, ballista_scalar_functions,
+ ballista_window_functions,
+ };
+
+ let state = SessionStateBuilder::new()
+ .with_default_features()
+ .with_scalar_functions(ballista_scalar_functions())
+ .with_aggregate_functions(ballista_aggregate_functions())
+ .with_window_functions(ballista_window_functions())
+ .build();
+
+ // Verify Spark functions are registered in SessionState
+ assert!(state.scalar_functions().contains_key("sha1"));
+ assert!(state.scalar_functions().contains_key("expm1"));
+ }
}
diff --git a/ballista/core/src/object_store.rs
b/ballista/core/src/object_store.rs
index dc6439e9e..aff9ee1fe 100644
--- a/ballista/core/src/object_store.rs
+++ b/ballista/core/src/object_store.rs
@@ -89,12 +89,20 @@ pub fn runtime_env_with_s3_support(
pub fn session_state_with_s3_support(
session_config: SessionConfig,
) -> datafusion::common::Result<SessionState> {
+ use crate::extension::{
+ ballista_aggregate_functions, ballista_scalar_functions,
+ ballista_window_functions,
+ };
+
let runtime_env = runtime_env_with_s3_support(&session_config)?;
Ok(SessionStateBuilder::new()
.with_runtime_env(runtime_env)
.with_config(session_config)
.with_default_features()
+ .with_scalar_functions(ballista_scalar_functions())
+ .with_aggregate_functions(ballista_aggregate_functions())
+ .with_window_functions(ballista_window_functions())
.build())
}
diff --git a/ballista/core/src/registry.rs b/ballista/core/src/registry.rs
index 70d549f53..1d45e7b58 100644
--- a/ballista/core/src/registry.rs
+++ b/ballista/core/src/registry.rs
@@ -25,7 +25,14 @@ use datafusion::logical_expr::{AggregateUDF, ScalarUDF,
WindowUDF};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
-/// A function registry containing scalar, aggregate, and window functions for
Ballista.
+#[cfg(feature = "spark-compat")]
+use datafusion_spark::{
+ all_default_aggregate_functions as spark_aggregate_functions,
+ all_default_scalar_functions as spark_scalar_functions,
+ all_default_window_functions as spark_window_functions,
+};
+
+/// A function registry containing scalar, aggregate, window, and table
functions for Ballista.
#[derive(Debug)]
pub struct BallistaFunctionRegistry {
/// Scalar user-defined functions.
@@ -38,20 +45,41 @@ pub struct BallistaFunctionRegistry {
impl Default for BallistaFunctionRegistry {
fn default() -> Self {
- let scalar_functions = all_default_functions()
+ let scalar_functions: HashMap<String, Arc<ScalarUDF>> =
all_default_functions()
.into_iter()
.map(|f| (f.name().to_string(), f))
.collect();
- let aggregate_functions = all_default_aggregate_functions()
- .into_iter()
- .map(|f| (f.name().to_string(), f))
- .collect();
-
- let window_functions = all_default_window_functions()
- .into_iter()
- .map(|f| (f.name().to_string(), f))
- .collect();
+ let aggregate_functions: HashMap<String, Arc<AggregateUDF>> =
+ all_default_aggregate_functions()
+ .into_iter()
+ .map(|f| (f.name().to_string(), f))
+ .collect();
+
+ let window_functions: HashMap<String, Arc<WindowUDF>> =
+ all_default_window_functions()
+ .into_iter()
+ .map(|f| (f.name().to_string(), f))
+ .collect();
+
+ #[cfg(feature = "spark-compat")]
+ let (scalar_functions, aggregate_functions, window_functions) = {
+ let mut scalar_functions = scalar_functions;
+ let mut aggregate_functions = aggregate_functions;
+ let mut window_functions = window_functions;
+
+ for f in spark_scalar_functions() {
+ scalar_functions.insert(f.name().to_string(), f);
+ }
+ for f in spark_aggregate_functions() {
+ aggregate_functions.insert(f.name().to_string(), f);
+ }
+ for f in spark_window_functions() {
+ window_functions.insert(f.name().to_string(), f);
+ }
+
+ (scalar_functions, aggregate_functions, window_functions)
+ };
Self {
scalar_functions,
@@ -122,3 +150,45 @@ impl From<&SessionState> for BallistaFunctionRegistry {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_datafusion_functions_available() {
+ let registry = BallistaFunctionRegistry::default();
+
+ // DataFusion's `abs` function should always be available
+ assert!(registry.udf("abs").is_ok());
+ assert!(registry.udfs().contains("abs"));
+ }
+
+ #[test]
+ #[cfg(not(feature = "spark-compat"))]
+ fn test_spark_functions_unavailable_without_feature() {
+ let registry = BallistaFunctionRegistry::default();
+
+ // Spark's `sha1` function should NOT be available without spark-compat
+ assert!(registry.udf("sha1").is_err());
+ assert!(!registry.udfs().contains("sha1"));
+
+ // Spark's `expm1` function should NOT be available without
spark-compat
+ assert!(registry.udf("expm1").is_err());
+ assert!(!registry.udfs().contains("expm1"));
+ }
+
+ #[test]
+ #[cfg(feature = "spark-compat")]
+ fn test_spark_functions_available_with_feature() {
+ let registry = BallistaFunctionRegistry::default();
+
+ // Spark's `sha1` function should be available with spark-compat
+ assert!(registry.udf("sha1").is_ok());
+ assert!(registry.udfs().contains("sha1"));
+
+ // Spark's `expm1` function should be available with spark-compat
+ assert!(registry.udf("expm1").is_ok());
+ assert!(registry.udfs().contains("expm1"));
+ }
+}
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index e7f4e2a57..0d6a3d833 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -131,11 +131,21 @@ impl Default for GrpcServerConfig {
pub fn default_session_builder(
config: SessionConfig,
) -> datafusion::common::Result<SessionState> {
- Ok(SessionStateBuilder::new()
+ use crate::extension::{
+ ballista_aggregate_functions, ballista_scalar_functions,
+ ballista_window_functions,
+ };
+
+ let state = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_runtime_env(Arc::new(RuntimeEnvBuilder::new().build()?))
- .build())
+ .with_scalar_functions(ballista_scalar_functions())
+ .with_aggregate_functions(ballista_aggregate_functions())
+ .with_window_functions(ballista_window_functions())
+ .build();
+
+ Ok(state)
}
/// Creates a default session configuration with Ballista extensions.
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index 43229fade..32816188d 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -36,6 +36,7 @@ required-features = ["build-binary"]
arrow-ipc-optimizations = []
build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing",
"ballista-core/build-binary"]
default = ["arrow-ipc-optimizations", "build-binary", "mimalloc"]
+spark-compat = ["ballista-core/spark-compat"]
[dependencies]
arrow = { workspace = true }
diff --git a/ballista/executor/src/standalone.rs
b/ballista/executor/src/standalone.rs
index 854155c91..257b84017 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -154,7 +154,18 @@ pub async fn new_standalone_executor(
concurrent_tasks: usize,
codec: BallistaCodec,
) -> Result<()> {
- let session_state =
SessionStateBuilder::new().with_default_features().build();
+ use ballista_core::extension::{
+ ballista_aggregate_functions, ballista_scalar_functions,
+ ballista_window_functions,
+ };
+
+ let session_state = SessionStateBuilder::new()
+ .with_default_features()
+ .with_scalar_functions(ballista_scalar_functions())
+ .with_aggregate_functions(ballista_aggregate_functions())
+ .with_window_functions(ballista_window_functions())
+ .build();
+
let runtime = session_state.runtime_env().clone();
let runtime_producer: RuntimeProducer = Arc::new(move |_|
Ok(runtime.clone()));
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 0252e1d11..13bac80bd 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -35,13 +35,14 @@ required-features = ["build-binary"]
[features]
build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing",
"ballista-core/build-binary"]
default = ["build-binary", "substrait"]
-# job info can cache stage plans, in some cases where
+# job info can cache stage plans, in some cases where
# task plans can be re-computed, cache behavior may need to be disabled.
disable-stage-plan-cache = []
graphviz-support = ["dep:graphviz-rust"]
keda-scaler = ["dep:tonic-prost-build", "dep:tonic-prost"]
prometheus-metrics = ["prometheus", "once_cell"]
rest-api = []
+spark-compat = ["ballista-core/spark-compat"]
substrait = ["dep:datafusion-substrait"]
[dependencies]
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 625496198..457fa309b 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -55,6 +55,7 @@ Table of content
user-guide/metrics
user-guide/faq
user-guide/extending-components
+ user-guide/spark-compatible-functions
user-guide/extensions-example
.. _toc.contributors:
diff --git a/docs/source/user-guide/deployment/cargo-install.md
b/docs/source/user-guide/deployment/cargo-install.md
index 9bd99e67a..9069bc0d7 100644
--- a/docs/source/user-guide/deployment/cargo-install.md
+++ b/docs/source/user-guide/deployment/cargo-install.md
@@ -47,3 +47,28 @@ manually specifying a bind port. For example:
```bash
RUST_LOG=info ballista-executor --bind-port 50052
```
+
+## Installing with Optional Features
+
+Ballista supports optional features that can be enabled during installation
using the `--features` flag.
+
+### Spark-Compatible Functions
+
+To enable Spark-compatible scalar, aggregate, and window functions from the
`datafusion-spark` crate:
+
+```bash
+# Install scheduler with spark-compat feature
+cargo install --locked --features spark-compat ballista-scheduler
+
+# Install executor with spark-compat feature
+cargo install --locked --features spark-compat ballista-executor
+
+# Install CLI with spark-compat feature
+cargo install --locked --features spark-compat ballista-cli
+```
+
+When the `spark-compat` feature is enabled, additional functions like `sha1`,
`expm1`, `sha2`, and others become available in SQL queries.
+
+> **Note:** The `spark-compat` feature provides Spark-compatible expressions
and functions only, not full Apache Spark API compatibility.
+
+For more details about Spark-compatible functions, see [Spark-Compatible
Functions](../spark-compatible-functions.md).
diff --git a/docs/source/user-guide/spark-compatible-functions.md
b/docs/source/user-guide/spark-compatible-functions.md
new file mode 100644
index 000000000..8b518f734
--- /dev/null
+++ b/docs/source/user-guide/spark-compatible-functions.md
@@ -0,0 +1,121 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Spark-Compatible Functions
+
+Ballista provides an optional `spark-compat` Cargo feature that enables
Spark-compatible scalar, aggregate, and window functions from the
[datafusion-spark](https://crates.io/crates/datafusion-spark) crate.
+
+## Enabling the Feature
+
+The `spark-compat` feature must be explicitly enabled at build time. It is
_not_ enabled by default.
+
+### Building from Source
+
+To build Ballista components with Spark-compatible functions:
+
+```bash
+# Build all components with spark-compat feature
+cargo build --features spark-compat --release
+
+# Build scheduler only
+cargo build -p ballista-scheduler --features spark-compat --release
+
+# Build executor only
+cargo build -p ballista-executor --features spark-compat --release
+
+# Build CLI with spark-compat
+cargo build -p ballista-cli --features spark-compat --release
+```
+
+For more installation options, see [Installing with
Cargo](deployment/cargo-install.md).
+
+## What's Included
+
+When the `spark-compat` feature is enabled, Ballista's function registry
automatically includes additional functions from the `datafusion-spark` crate:
+
+> **Note:** For a comprehensive list of available functions, refer to the
[datafusion-spark crate
documentation](https://docs.rs/datafusion-spark/latest/datafusion_spark/).
These functions are provided in addition to DataFusion's default functions.
+
+### Scalar Functions
+
+Spark-compatible scalar functions provide additional string, mathematical, and
cryptographic operations.
+
+### Aggregate Functions
+
+Spark-compatible aggregate functions extend DataFusion's built-in aggregations
with additional statistical and analytical functions.
+
+### Window Functions
+
+Spark-compatible window functions provide additional analytical capabilities
for windowed operations.
+
+## Usage Examples
+
+Once the `spark-compat` feature is enabled at build time, the functions are
automatically available in SQL queries:
+
+### Example 1: Using SHA-1 Hash Function
+
+```sql
+SELECT sha1('Ballista') AS hash_value;
+```
+
+Output:
+
+```
++------------------------------------------+
+| hash_value |
++------------------------------------------+
+| 8b8e1f0e55f8f0e3c7a8... (hex string) |
++------------------------------------------+
+```
+
+### Example 2: Using expm1 for Precision
+
+```sql
+SELECT
+ expm1(0.001) AS precise_value,
+ exp(0.001) - 1 AS standard_value;
+```
+
+The `expm1` function provides better numerical precision for small values
compared to computing `exp(x) - 1` directly.
+
+### Example 3: Combining with DataFusion Functions
+
+Spark-compatible functions work alongside DataFusion's built-in functions:
+
+```sql
+SELECT
+ name,
+ upper(name) AS name_upper, -- DataFusion function
+ sha1(name) AS name_hash, -- Spark-compat function
+ length(name) AS name_length -- DataFusion function
+FROM users;
+```
+
+## Use Cases
+
+The `spark-compat` feature is useful when:
+
+- **Migrating from Spark**: Easing the transition by providing familiar
function names and behaviors
+- **Cross-Platform Queries**: Writing queries that use similar functions
across Spark and Ballista environments
+- **Specific Function Needs**: Requiring particular Spark-style functions
(like `sha1`, `conv`, etc.) that aren't in DataFusion's default set
+- **Team Familiarity**: Your team is more familiar with Spark's function
library
+
+## See Also
+
+- [datafusion-spark crate](https://crates.io/crates/datafusion-spark) - Source
of the Spark-compatible functions
+- [Installing with Cargo](deployment/cargo-install.md) - Detailed installation
instructions
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index b91bfc3a2..e5daadc37 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -38,6 +38,14 @@ name = "mtls-cluster"
path = "examples/mtls-cluster.rs"
required-features = ["tls"]
+[[example]]
+name = "remote-spark-functions"
+required-features = ["ballista-core/spark-compat"]
+
+[[example]]
+name = "standalone-substrait"
+required-features = ["ballista-scheduler/substrait"]
+
[dependencies]
# Optional dependency for TLS support
rustls = { version = "0.23", features = ["ring"], optional = true }
@@ -70,9 +78,8 @@ url = { workspace = true }
uuid = { workspace = true }
[features]
-default = ["substrait", "standalone"]
+default = ["standalone"]
standalone = ["ballista/standalone"]
-substrait = ["ballista-scheduler/substrait"]
testcontainers = []
# Enable TLS support for the mtls-cluster example
tls = ["tonic/tls-ring", "dep:rustls"]
diff --git a/examples/README.md b/examples/README.md
index 04854762b..5c20c54ab 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -269,3 +269,66 @@ async fn main() -> Result<()> {
Ok(())
}
```
+
+### Distributed datafusion-spark example
+
+
+The release binaries must be built with the spark-compat feature enabled for
this example.
+```bash
+cargo build --release --features spark-compat
+```
+
+```bash
+cargo run --release --example remote-spark-functions --features="spark-compat"
+```
+
+#### Source code for distributed spark functions example
+
+```rust
+use ballista::datafusion::{
+ common::Result,
+ execution::SessionStateBuilder,
+ prelude::{CsvReadOptions, SessionConfig, SessionContext},
+};
+use ballista::prelude::*;
+use ballista_examples::test_util;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let config = SessionConfig::new_with_ballista()
+ .with_target_partitions(4)
+ .with_ballista_job_name("Remote Datafusion Spark Example");
+
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_default_features()
+ .build();
+
+ let ctx = SessionContext::remote_with_state("df://localhost:50050",
state).await?;
+
+ let test_data = test_util::examples_test_data();
+
+ ctx.register_csv(
+ "test",
+ &format!("{test_data}/aggregate_test_100.csv"),
+ CsvReadOptions::new(),
+ )
+ .await?;
+
+ let df = ctx
+ .sql(
+ "SELECT
+ sha1(c13) AS hash,
+ upper(c13) AS uppercase,
+ length(c13) AS length,
+ expm1(0.001) AS precise_value,
+ exp(0.001) - 1 AS standard_value
+ FROM test",
+ )
+ .await?;
+
+ df.show().await?;
+
+ Ok(())
+}
+```
diff --git a/examples/examples/remote-spark-functions.rs
b/examples/examples/remote-spark-functions.rs
new file mode 100644
index 000000000..2243be27f
--- /dev/null
+++ b/examples/examples/remote-spark-functions.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ballista::datafusion::{
+ common::Result,
+ execution::SessionStateBuilder,
+ prelude::{CsvReadOptions, SessionConfig, SessionContext},
+};
+use ballista::prelude::*;
+use ballista_examples::test_util;
+
+/// This example demonstrates using Datafusion Spark functions using SQL
+#[tokio::main]
+async fn main() -> Result<()> {
+ let config = SessionConfig::new_with_ballista()
+ .with_target_partitions(4)
+ .with_ballista_job_name("Remote Datafusion Spark Example");
+
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_default_features()
+ .build();
+
+ let ctx = SessionContext::remote_with_state("df://localhost:50050",
state).await?;
+
+ let test_data = test_util::examples_test_data();
+
+ ctx.register_csv(
+ "test",
+ &format!("{test_data}/aggregate_test_100.csv"),
+ CsvReadOptions::new(),
+ )
+ .await?;
+
+ let df = ctx
+ .sql(
+ "SELECT
+ sha1(c13) AS hash,
+ upper(c13) AS uppercase,
+ length(c13) AS length,
+ expm1(0.001) AS precise_value,
+ exp(0.001) - 1 AS standard_value
+ FROM test",
+ )
+ .await?;
+
+ df.show().await?;
+
+ Ok(())
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]