This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new bca52c334 feat: Add spark-compat mode to integrate datafusion-spark 
features au… (#1416)
bca52c334 is described below

commit bca52c334e20d56363628def56e3da79d15e761c
Author: Matt Cuento <[email protected]>
AuthorDate: Wed Feb 11 09:43:44 2026 -0800

    feat: Add spark-compat mode to integrate datafusion-spark features au… 
(#1416)
---
 Cargo.lock                                         |  35 +++++
 Cargo.toml                                         |   1 +
 ballista/core/Cargo.toml                           |   2 +
 ballista/core/src/extension.rs                     | 161 ++++++++++++++++++++-
 ballista/core/src/object_store.rs                  |   8 +
 ballista/core/src/registry.rs                      |  92 ++++++++++--
 ballista/core/src/utils.rs                         |  14 +-
 ballista/executor/Cargo.toml                       |   1 +
 ballista/executor/src/standalone.rs                |  13 +-
 ballista/scheduler/Cargo.toml                      |   3 +-
 docs/source/index.rst                              |   1 +
 docs/source/user-guide/deployment/cargo-install.md |  25 ++++
 .../user-guide/spark-compatible-functions.md       | 121 ++++++++++++++++
 examples/Cargo.toml                                |  11 +-
 examples/README.md                                 |  63 ++++++++
 examples/examples/remote-spark-functions.rs        |  64 ++++++++
 16 files changed, 597 insertions(+), 18 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 392bda460..318e2ef74 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1001,6 +1001,7 @@ dependencies = [
  "datafusion",
  "datafusion-proto",
  "datafusion-proto-common",
+ "datafusion-spark",
  "futures",
  "itertools 0.14.0",
  "log",
@@ -2529,6 +2530,29 @@ dependencies = [
  "parking_lot",
 ]
 
+[[package]]
+name = "datafusion-spark"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "556c431f5f2259620c8223254c0ef57aa9a85c576d4da0166157260f71eb0e25"
+dependencies = [
+ "arrow",
+ "bigdecimal",
+ "chrono",
+ "crc32fast",
+ "datafusion-catalog",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-functions",
+ "datafusion-functions-nested",
+ "log",
+ "percent-encoding",
+ "rand",
+ "sha1",
+ "url",
+]
+
 [[package]]
 name = "datafusion-sql"
 version = "52.1.0"
@@ -5266,6 +5290,17 @@ dependencies = [
  "unsafe-libyaml",
 ]
 
+[[package]]
+name = "sha1"
+version = "0.10.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
+dependencies = [
+ "cfg-if",
+ "cpufeatures",
+ "digest",
+]
+
 [[package]]
 name = "sha2"
 version = "0.10.9"
diff --git a/Cargo.toml b/Cargo.toml
index d42a87d65..c7c4f6768 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,6 +33,7 @@ datafusion = "52"
 datafusion-cli = "52"
 datafusion-proto = "52"
 datafusion-proto-common = "52"
+datafusion-spark = "52"
 datafusion-substrait = "52"
 object_store = "0.12"
 prost = "0.14"
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 615a5e78e..21353adff 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -41,6 +41,7 @@ default = ["arrow-ipc-optimizations"]
 docsrs = []
 # Used for testing ONLY: causes all values to hash to the same value (test for 
collisions)
 force_hash_collisions = ["datafusion/force_hash_collisions"]
+spark-compat = ["dep:datafusion-spark"]
 
 [dependencies]
 arrow-flight = { workspace = true }
@@ -52,6 +53,7 @@ clap = { workspace = true, optional = true }
 datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
 datafusion-proto-common = { workspace = true }
+datafusion-spark = { workspace = true, optional = true }
 futures = { workspace = true }
 itertools = "0.14"
 log = { workspace = true }
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index f7b02395d..5341020d1 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -27,6 +27,11 @@ use crate::serde::{BallistaLogicalExtensionCodec, 
BallistaPhysicalExtensionCodec
 use datafusion::execution::context::{QueryPlanner, SessionConfig, 
SessionState};
 use datafusion::execution::runtime_env::RuntimeEnvBuilder;
 use datafusion::execution::session_state::SessionStateBuilder;
+use datafusion::functions::all_default_functions;
+use datafusion::functions_aggregate::all_default_aggregate_functions;
+use datafusion::functions_nested::all_default_nested_functions;
+use datafusion::functions_window::all_default_window_functions;
+use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
 use datafusion_proto::logical_plan::LogicalExtensionCodec;
 use datafusion_proto::physical_plan::PhysicalExtensionCodec;
 use datafusion_proto::protobuf::LogicalPlanNode;
@@ -43,6 +48,47 @@ use tonic::{Request, Status};
 pub type EndpointOverrideFn =
     Arc<dyn Fn(Endpoint) -> Result<Endpoint, Box<dyn Error + Send + Sync>> + 
Send + Sync>;
 
+#[cfg(feature = "spark-compat")]
+use datafusion_spark::{
+    all_default_aggregate_functions as spark_aggregate_functions,
+    all_default_scalar_functions as spark_scalar_functions,
+    all_default_window_functions as spark_window_functions,
+};
+
+/// Returns scalar functions for Ballista (DataFusion defaults + Spark when 
enabled)
+pub fn ballista_scalar_functions() -> Vec<Arc<ScalarUDF>> {
+    #[allow(unused_mut)]
+    let mut functions = all_default_functions();
+    functions.append(&mut all_default_nested_functions());
+
+    #[cfg(feature = "spark-compat")]
+    functions.extend(spark_scalar_functions());
+
+    functions
+}
+
+/// Returns aggregate functions for Ballista (DataFusion defaults + Spark when 
enabled)
+pub fn ballista_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
+    #[allow(unused_mut)]
+    let mut functions = all_default_aggregate_functions();
+
+    #[cfg(feature = "spark-compat")]
+    functions.extend(spark_aggregate_functions());
+
+    functions
+}
+
+/// Returns window functions for Ballista (DataFusion defaults + Spark when 
enabled)
+pub fn ballista_window_functions() -> Vec<Arc<WindowUDF>> {
+    #[allow(unused_mut)]
+    let mut functions = all_default_window_functions();
+
+    #[cfg(feature = "spark-compat")]
+    functions.extend(spark_window_functions());
+
+    functions
+}
+
 /// Provides methods which adapt [SessionState]
 /// for Ballista usage
 pub trait SessionStateExt {
@@ -209,6 +255,9 @@ impl SessionStateExt for SessionState {
             .with_config(session_config)
             .with_runtime_env(Arc::new(runtime_env))
             .with_query_planner(Arc::new(planner))
+            .with_scalar_functions(ballista_scalar_functions())
+            .with_aggregate_functions(ballista_aggregate_functions())
+            .with_window_functions(ballista_window_functions())
             .build();
 
         Ok(session_state)
@@ -240,7 +289,13 @@ impl SessionStateExt for SessionState {
             }
         };
 
-        Ok(builder.build())
+        let session_state = builder
+            .with_scalar_functions(ballista_scalar_functions())
+            .with_aggregate_functions(ballista_aggregate_functions())
+            .with_window_functions(ballista_window_functions())
+            .build();
+
+        Ok(session_state)
     }
 }
 
@@ -806,4 +861,108 @@ mod test {
                 .contains("TLS configuration failed")
         );
     }
+
+    // Tests for helper functions that register DataFusion + Spark functions
+
+    #[test]
+    fn test_ballista_functions_include_datafusion() {
+        use super::{
+            ballista_aggregate_functions, ballista_scalar_functions,
+            ballista_window_functions,
+        };
+
+        // Test scalar functions
+        let scalar_funcs = ballista_scalar_functions();
+        assert!(scalar_funcs.iter().any(|f| f.name() == "abs"));
+        assert!(scalar_funcs.iter().any(|f| f.name() == "ceil"));
+        assert!(scalar_funcs.iter().any(|f| f.name() == "array_length")); // 
nested function
+
+        // Test aggregate functions
+        let agg_funcs = ballista_aggregate_functions();
+        assert!(agg_funcs.iter().any(|f| f.name() == "count"));
+        assert!(agg_funcs.iter().any(|f| f.name() == "sum"));
+        assert!(agg_funcs.iter().any(|f| f.name() == "avg"));
+
+        // Test window functions
+        let window_funcs = ballista_window_functions();
+        assert!(window_funcs.iter().any(|f| f.name() == "row_number"));
+        assert!(window_funcs.iter().any(|f| f.name() == "rank"));
+        assert!(window_funcs.iter().any(|f| f.name() == "dense_rank"));
+    }
+
+    #[test]
+    #[cfg(not(feature = "spark-compat"))]
+    fn test_ballista_functions_without_spark() {
+        use super::{
+            ballista_aggregate_functions, ballista_scalar_functions,
+            ballista_window_functions,
+        };
+
+        // Scalar functions should NOT include Spark functions
+        let scalar_funcs = ballista_scalar_functions();
+        assert!(!scalar_funcs.iter().any(|f| f.name() == "sha1"));
+        assert!(!scalar_funcs.iter().any(|f| f.name() == "expm1"));
+
+        // All function types should have baseline DataFusion functions
+        assert!(!ballista_aggregate_functions().is_empty());
+        assert!(!ballista_window_functions().is_empty());
+    }
+
+    #[test]
+    #[cfg(feature = "spark-compat")]
+    fn test_ballista_functions_with_spark() {
+        use super::{
+            ballista_aggregate_functions, ballista_scalar_functions,
+            ballista_window_functions,
+        };
+
+        // Scalar functions should include Spark functions
+        let scalar_funcs = ballista_scalar_functions();
+        assert!(scalar_funcs.iter().any(|f| f.name() == "sha1"));
+        assert!(scalar_funcs.iter().any(|f| f.name() == "expm1"));
+
+        // All function types should have functions available
+        assert!(!ballista_aggregate_functions().is_empty());
+        assert!(!ballista_window_functions().is_empty());
+    }
+
+    #[tokio::test]
+    async fn test_ballista_functions_with_session_state() {
+        use super::{
+            ballista_aggregate_functions, ballista_scalar_functions,
+            ballista_window_functions,
+        };
+
+        let state = SessionStateBuilder::new()
+            .with_default_features()
+            .with_scalar_functions(ballista_scalar_functions())
+            .with_aggregate_functions(ballista_aggregate_functions())
+            .with_window_functions(ballista_window_functions())
+            .build();
+
+        // Verify all function types are registered in SessionState
+        assert!(state.scalar_functions().contains_key("abs"));
+        assert!(state.aggregate_functions().contains_key("count"));
+        assert!(state.window_functions().contains_key("row_number"));
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "spark-compat")]
+    async fn test_ballista_spark_functions_with_session_state() {
+        use super::{
+            ballista_aggregate_functions, ballista_scalar_functions,
+            ballista_window_functions,
+        };
+
+        let state = SessionStateBuilder::new()
+            .with_default_features()
+            .with_scalar_functions(ballista_scalar_functions())
+            .with_aggregate_functions(ballista_aggregate_functions())
+            .with_window_functions(ballista_window_functions())
+            .build();
+
+        // Verify Spark functions are registered in SessionState
+        assert!(state.scalar_functions().contains_key("sha1"));
+        assert!(state.scalar_functions().contains_key("expm1"));
+    }
 }
diff --git a/ballista/core/src/object_store.rs 
b/ballista/core/src/object_store.rs
index dc6439e9e..aff9ee1fe 100644
--- a/ballista/core/src/object_store.rs
+++ b/ballista/core/src/object_store.rs
@@ -89,12 +89,20 @@ pub fn runtime_env_with_s3_support(
 pub fn session_state_with_s3_support(
     session_config: SessionConfig,
 ) -> datafusion::common::Result<SessionState> {
+    use crate::extension::{
+        ballista_aggregate_functions, ballista_scalar_functions,
+        ballista_window_functions,
+    };
+
     let runtime_env = runtime_env_with_s3_support(&session_config)?;
 
     Ok(SessionStateBuilder::new()
         .with_runtime_env(runtime_env)
         .with_config(session_config)
         .with_default_features()
+        .with_scalar_functions(ballista_scalar_functions())
+        .with_aggregate_functions(ballista_aggregate_functions())
+        .with_window_functions(ballista_window_functions())
         .build())
 }
 
diff --git a/ballista/core/src/registry.rs b/ballista/core/src/registry.rs
index 70d549f53..1d45e7b58 100644
--- a/ballista/core/src/registry.rs
+++ b/ballista/core/src/registry.rs
@@ -25,7 +25,14 @@ use datafusion::logical_expr::{AggregateUDF, ScalarUDF, 
WindowUDF};
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
-/// A function registry containing scalar, aggregate, and window functions for 
Ballista.
+#[cfg(feature = "spark-compat")]
+use datafusion_spark::{
+    all_default_aggregate_functions as spark_aggregate_functions,
+    all_default_scalar_functions as spark_scalar_functions,
+    all_default_window_functions as spark_window_functions,
+};
+
+/// A function registry containing scalar, aggregate, window, and table 
functions for Ballista.
 #[derive(Debug)]
 pub struct BallistaFunctionRegistry {
     /// Scalar user-defined functions.
@@ -38,20 +45,41 @@ pub struct BallistaFunctionRegistry {
 
 impl Default for BallistaFunctionRegistry {
     fn default() -> Self {
-        let scalar_functions = all_default_functions()
+        let scalar_functions: HashMap<String, Arc<ScalarUDF>> = 
all_default_functions()
             .into_iter()
             .map(|f| (f.name().to_string(), f))
             .collect();
 
-        let aggregate_functions = all_default_aggregate_functions()
-            .into_iter()
-            .map(|f| (f.name().to_string(), f))
-            .collect();
-
-        let window_functions = all_default_window_functions()
-            .into_iter()
-            .map(|f| (f.name().to_string(), f))
-            .collect();
+        let aggregate_functions: HashMap<String, Arc<AggregateUDF>> =
+            all_default_aggregate_functions()
+                .into_iter()
+                .map(|f| (f.name().to_string(), f))
+                .collect();
+
+        let window_functions: HashMap<String, Arc<WindowUDF>> =
+            all_default_window_functions()
+                .into_iter()
+                .map(|f| (f.name().to_string(), f))
+                .collect();
+
+        #[cfg(feature = "spark-compat")]
+        let (scalar_functions, aggregate_functions, window_functions) = {
+            let mut scalar_functions = scalar_functions;
+            let mut aggregate_functions = aggregate_functions;
+            let mut window_functions = window_functions;
+
+            for f in spark_scalar_functions() {
+                scalar_functions.insert(f.name().to_string(), f);
+            }
+            for f in spark_aggregate_functions() {
+                aggregate_functions.insert(f.name().to_string(), f);
+            }
+            for f in spark_window_functions() {
+                window_functions.insert(f.name().to_string(), f);
+            }
+
+            (scalar_functions, aggregate_functions, window_functions)
+        };
 
         Self {
             scalar_functions,
@@ -122,3 +150,45 @@ impl From<&SessionState> for BallistaFunctionRegistry {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_datafusion_functions_available() {
+        let registry = BallistaFunctionRegistry::default();
+
+        // DataFusion's `abs` function should always be available
+        assert!(registry.udf("abs").is_ok());
+        assert!(registry.udfs().contains("abs"));
+    }
+
+    #[test]
+    #[cfg(not(feature = "spark-compat"))]
+    fn test_spark_functions_unavailable_without_feature() {
+        let registry = BallistaFunctionRegistry::default();
+
+        // Spark's `sha1` function should NOT be available without spark-compat
+        assert!(registry.udf("sha1").is_err());
+        assert!(!registry.udfs().contains("sha1"));
+
+        // Spark's `expm1` function should NOT be available without 
spark-compat
+        assert!(registry.udf("expm1").is_err());
+        assert!(!registry.udfs().contains("expm1"));
+    }
+
+    #[test]
+    #[cfg(feature = "spark-compat")]
+    fn test_spark_functions_available_with_feature() {
+        let registry = BallistaFunctionRegistry::default();
+
+        // Spark's `sha1` function should be available with spark-compat
+        assert!(registry.udf("sha1").is_ok());
+        assert!(registry.udfs().contains("sha1"));
+
+        // Spark's `expm1` function should be available with spark-compat
+        assert!(registry.udf("expm1").is_ok());
+        assert!(registry.udfs().contains("expm1"));
+    }
+}
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index e7f4e2a57..0d6a3d833 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -131,11 +131,21 @@ impl Default for GrpcServerConfig {
 pub fn default_session_builder(
     config: SessionConfig,
 ) -> datafusion::common::Result<SessionState> {
-    Ok(SessionStateBuilder::new()
+    use crate::extension::{
+        ballista_aggregate_functions, ballista_scalar_functions,
+        ballista_window_functions,
+    };
+
+    let state = SessionStateBuilder::new()
         .with_default_features()
         .with_config(config)
         .with_runtime_env(Arc::new(RuntimeEnvBuilder::new().build()?))
-        .build())
+        .with_scalar_functions(ballista_scalar_functions())
+        .with_aggregate_functions(ballista_aggregate_functions())
+        .with_window_functions(ballista_window_functions())
+        .build();
+
+    Ok(state)
 }
 
 /// Creates a default session configuration with Ballista extensions.
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index 43229fade..32816188d 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -36,6 +36,7 @@ required-features = ["build-binary"]
 arrow-ipc-optimizations = []
 build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", 
"ballista-core/build-binary"]
 default = ["arrow-ipc-optimizations", "build-binary", "mimalloc"]
+spark-compat = ["ballista-core/spark-compat"]
 
 [dependencies]
 arrow = { workspace = true }
diff --git a/ballista/executor/src/standalone.rs 
b/ballista/executor/src/standalone.rs
index 854155c91..257b84017 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -154,7 +154,18 @@ pub async fn new_standalone_executor(
     concurrent_tasks: usize,
     codec: BallistaCodec,
 ) -> Result<()> {
-    let session_state = 
SessionStateBuilder::new().with_default_features().build();
+    use ballista_core::extension::{
+        ballista_aggregate_functions, ballista_scalar_functions,
+        ballista_window_functions,
+    };
+
+    let session_state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_scalar_functions(ballista_scalar_functions())
+        .with_aggregate_functions(ballista_aggregate_functions())
+        .with_window_functions(ballista_window_functions())
+        .build();
+
     let runtime = session_state.runtime_env().clone();
     let runtime_producer: RuntimeProducer = Arc::new(move |_| 
Ok(runtime.clone()));
 
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 0252e1d11..13bac80bd 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -35,13 +35,14 @@ required-features = ["build-binary"]
 [features]
 build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", 
"ballista-core/build-binary"]
 default = ["build-binary", "substrait"]
-# job info can cache stage plans, in some cases where 
+# job info can cache stage plans, in some cases where
 # task plans can be re-computed, cache behavior may need to be disabled.
 disable-stage-plan-cache = []
 graphviz-support = ["dep:graphviz-rust"]
 keda-scaler = ["dep:tonic-prost-build", "dep:tonic-prost"]
 prometheus-metrics = ["prometheus", "once_cell"]
 rest-api = []
+spark-compat = ["ballista-core/spark-compat"]
 substrait = ["dep:datafusion-substrait"]
 
 [dependencies]
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 625496198..457fa309b 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -55,6 +55,7 @@ Table of content
    user-guide/metrics
    user-guide/faq
    user-guide/extending-components
+   user-guide/spark-compatible-functions
    user-guide/extensions-example
 
 .. _toc.contributors:
diff --git a/docs/source/user-guide/deployment/cargo-install.md 
b/docs/source/user-guide/deployment/cargo-install.md
index 9bd99e67a..9069bc0d7 100644
--- a/docs/source/user-guide/deployment/cargo-install.md
+++ b/docs/source/user-guide/deployment/cargo-install.md
@@ -47,3 +47,28 @@ manually specifying a bind port. For example:
 ```bash
 RUST_LOG=info ballista-executor --bind-port 50052
 ```
+
+## Installing with Optional Features
+
+Ballista supports optional features that can be enabled during installation 
using the `--features` flag.
+
+### Spark-Compatible Functions
+
+To enable Spark-compatible scalar, aggregate, and window functions from the 
`datafusion-spark` crate:
+
+```bash
+# Install scheduler with spark-compat feature
+cargo install --locked --features spark-compat ballista-scheduler
+
+# Install executor with spark-compat feature
+cargo install --locked --features spark-compat ballista-executor
+
+# Install CLI with spark-compat feature
+cargo install --locked --features spark-compat ballista-cli
+```
+
+When the `spark-compat` feature is enabled, additional functions like `sha1`, 
`expm1`, `sha2`, and others become available in SQL queries.
+
+> **Note:** The `spark-compat` feature provides Spark-compatible expressions 
and functions only, not full Apache Spark API compatibility.
+
+For more details about Spark-compatible functions, see [Spark-Compatible 
Functions](../spark-compatible-functions.md).
diff --git a/docs/source/user-guide/spark-compatible-functions.md 
b/docs/source/user-guide/spark-compatible-functions.md
new file mode 100644
index 000000000..8b518f734
--- /dev/null
+++ b/docs/source/user-guide/spark-compatible-functions.md
@@ -0,0 +1,121 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Spark-Compatible Functions
+
+Ballista provides an optional `spark-compat` Cargo feature that enables 
Spark-compatible scalar, aggregate, and window functions from the 
[datafusion-spark](https://crates.io/crates/datafusion-spark) crate.
+
+## Enabling the Feature
+
+The `spark-compat` feature must be explicitly enabled at build time. It is 
_not_ enabled by default.
+
+### Building from Source
+
+To build Ballista components with Spark-compatible functions:
+
+```bash
+# Build all components with spark-compat feature
+cargo build --features spark-compat --release
+
+# Build scheduler only
+cargo build -p ballista-scheduler --features spark-compat --release
+
+# Build executor only
+cargo build -p ballista-executor --features spark-compat --release
+
+# Build CLI with spark-compat
+cargo build -p ballista-cli --features spark-compat --release
+```
+
+For more installation options, see [Installing with 
Cargo](deployment/cargo-install.md).
+
+## What's Included
+
+When the `spark-compat` feature is enabled, Ballista's function registry 
automatically includes additional functions from the `datafusion-spark` crate:
+
+> **Note:** For a comprehensive list of available functions, refer to the 
[datafusion-spark crate 
documentation](https://docs.rs/datafusion-spark/latest/datafusion_spark/). 
These functions are provided in addition to DataFusion's default functions.
+
+### Scalar Functions
+
+Spark-compatible scalar functions provide additional string, mathematical, and 
cryptographic operations.
+
+### Aggregate Functions
+
+Spark-compatible aggregate functions extend DataFusion's built-in aggregations 
with additional statistical and analytical functions.
+
+### Window Functions
+
+Spark-compatible window functions provide additional analytical capabilities 
for windowed operations.
+
+## Usage Examples
+
+Once the `spark-compat` feature is enabled at build time, the functions are 
automatically available in SQL queries:
+
+### Example 1: Using SHA-1 Hash Function
+
+```sql
+SELECT sha1('Ballista') AS hash_value;
+```
+
+Output:
+
+```
++------------------------------------------+
+| hash_value                               |
++------------------------------------------+
+| 8b8e1f0e55f8f0e3c7a8... (hex string)    |
++------------------------------------------+
+```
+
+### Example 2: Using expm1 for Precision
+
+```sql
+SELECT
+    expm1(0.001) AS precise_value,
+    exp(0.001) - 1 AS standard_value;
+```
+
+The `expm1` function provides better numerical precision for small values 
compared to computing `exp(x) - 1` directly.
+
+### Example 3: Combining with DataFusion Functions
+
+Spark-compatible functions work alongside DataFusion's built-in functions:
+
+```sql
+SELECT
+    name,
+    upper(name) AS name_upper,           -- DataFusion function
+    sha1(name) AS name_hash,             -- Spark-compat function
+    length(name) AS name_length          -- DataFusion function
+FROM users;
+```
+
+## Use Cases
+
+The `spark-compat` feature is useful when:
+
+- **Migrating from Spark**: Easing the transition by providing familiar 
function names and behaviors
+- **Cross-Platform Queries**: Writing queries that use similar functions 
across Spark and Ballista environments
+- **Specific Function Needs**: Requiring particular Spark-style functions 
(like `sha1`, `conv`, etc.) that aren't in DataFusion's default set
+- **Team Familiarity**: Your team is more familiar with Spark's function 
library
+
+## See Also
+
+- [datafusion-spark crate](https://crates.io/crates/datafusion-spark) - Source 
of the Spark-compatible functions
+- [Installing with Cargo](deployment/cargo-install.md) - Detailed installation 
instructions
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index b91bfc3a2..e5daadc37 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -38,6 +38,14 @@ name = "mtls-cluster"
 path = "examples/mtls-cluster.rs"
 required-features = ["tls"]
 
+[[example]]
+name = "remote-spark-functions"
+required-features = ["ballista-core/spark-compat"]
+
+[[example]]
+name = "standalone-substrait"
+required-features = ["ballista-scheduler/substrait"]
+
 [dependencies]
 # Optional dependency for TLS support
 rustls = { version = "0.23", features = ["ring"], optional = true }
@@ -70,9 +78,8 @@ url = { workspace = true }
 uuid = { workspace = true }
 
 [features]
-default = ["substrait", "standalone"]
+default = ["standalone"]
 standalone = ["ballista/standalone"]
-substrait = ["ballista-scheduler/substrait"]
 testcontainers = []
 # Enable TLS support for the mtls-cluster example
 tls = ["tonic/tls-ring", "dep:rustls"]
diff --git a/examples/README.md b/examples/README.md
index 04854762b..5c20c54ab 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -269,3 +269,66 @@ async fn main() -> Result<()> {
     Ok(())
 }
 ```
+
+### Distributed datafusion-spark example
+
+
+The release binaries must be built with the spark-compat feature enabled for 
this example.
+```bash
+cargo build --release --features spark-compat
+```
+
+```bash
+cargo run --release --example remote-spark-functions --features="spark-compat"
+```
+
+#### Source code for distributed spark functions example
+
+```rust
+use ballista::datafusion::{
+    common::Result,
+    execution::SessionStateBuilder,
+    prelude::{CsvReadOptions, SessionConfig, SessionContext},
+};
+use ballista::prelude::*;
+use ballista_examples::test_util;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let config = SessionConfig::new_with_ballista()
+        .with_target_partitions(4)
+        .with_ballista_job_name("Remote Datafusion Spark Example");
+
+    let state = SessionStateBuilder::new()
+        .with_config(config)
+        .with_default_features()
+        .build();
+
+    let ctx = SessionContext::remote_with_state("df://localhost:50050", 
state).await?;
+
+    let test_data = test_util::examples_test_data();
+
+    ctx.register_csv(
+        "test",
+        &format!("{test_data}/aggregate_test_100.csv"),
+        CsvReadOptions::new(),
+    )
+    .await?;
+
+    let df = ctx
+        .sql(
+            "SELECT 
+            sha1(c13) AS hash, 
+            upper(c13) AS uppercase,
+            length(c13) AS length,
+            expm1(0.001) AS precise_value,
+            exp(0.001) - 1 AS standard_value
+        FROM test",
+        )
+        .await?;
+
+    df.show().await?;
+
+    Ok(())
+}
+```
diff --git a/examples/examples/remote-spark-functions.rs 
b/examples/examples/remote-spark-functions.rs
new file mode 100644
index 000000000..2243be27f
--- /dev/null
+++ b/examples/examples/remote-spark-functions.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ballista::datafusion::{
+    common::Result,
+    execution::SessionStateBuilder,
+    prelude::{CsvReadOptions, SessionConfig, SessionContext},
+};
+use ballista::prelude::*;
+use ballista_examples::test_util;
+
+/// This example demonstrates using Datafusion Spark functions using SQL
+#[tokio::main]
+async fn main() -> Result<()> {
+    let config = SessionConfig::new_with_ballista()
+        .with_target_partitions(4)
+        .with_ballista_job_name("Remote Datafusion Spark Example");
+
+    let state = SessionStateBuilder::new()
+        .with_config(config)
+        .with_default_features()
+        .build();
+
+    let ctx = SessionContext::remote_with_state("df://localhost:50050", 
state).await?;
+
+    let test_data = test_util::examples_test_data();
+
+    ctx.register_csv(
+        "test",
+        &format!("{test_data}/aggregate_test_100.csv"),
+        CsvReadOptions::new(),
+    )
+    .await?;
+
+    let df = ctx
+        .sql(
+            "SELECT 
+            sha1(c13) AS hash, 
+            upper(c13) AS uppercase,
+            length(c13) AS length,
+            expm1(0.001) AS precise_value,
+            exp(0.001) - 1 AS standard_value
+        FROM test",
+        )
+        .await?;
+
+    df.show().await?;
+
+    Ok(())
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to