This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 38fca8b99 doc: Add Ballista extensions example to the docs. (#1382)
38fca8b99 is described below
commit 38fca8b995697f5ba2e9539f8b94b295f8fa6043
Author: Louis Burke <[email protected]>
AuthorDate: Sun Jan 18 13:51:35 2026 +0000
doc: Add Ballista extensions example to the docs. (#1382)
* Rewords the introduction.
* Rewords the cargo install section.
* Rewords the docker section and updates builder Dockerfile.
* Rewords the docker compose, kubernetes, and scheduler sections.
* Rewords the docker compose, kubernetes, and scheduler sections.
* Adds a link to the docker section for the docker compose section.
* Adds a full stop
* Adds full stop
* Prettifies the docs.
* Rolls back change to builder Dockerfile.
* Adds extension docs markdown
* Adds extension docs markdown
* Adds license to example md
* Pretty the extention example file
* address comments, removing links
---------
Co-authored-by: Marko Milenković <[email protected]>
---
docs/source/index.rst | 1 +
.../user-guide/ballista_extensions.excalidraw.svg | 4 +
docs/source/user-guide/extending-components.md | 18 -
docs/source/user-guide/extensions-example.md | 485 +++++++++++++++++++++
4 files changed, 490 insertions(+), 18 deletions(-)
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 9289eab75..d4c607500 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -56,6 +56,7 @@ Table of content
user-guide/metrics
user-guide/faq
user-guide/extending-components
+ user-guide/extensions-example
.. _toc.contributors:
diff --git a/docs/source/user-guide/ballista_extensions.excalidraw.svg
b/docs/source/user-guide/ballista_extensions.excalidraw.svg
new file mode 100644
index 000000000..8cac476cc
--- /dev/null
+++ b/docs/source/user-guide/ballista_extensions.excalidraw.svg
@@ -0,0 +1,4 @@
+<?xml version="1.0" standalone="no"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg version="1.1" xmlns="http://www.w3.org/2000/svg" viewBox="0 0 1114.5
301.5" width="3343.5" height="904.5"><!-- svg-source:excalidraw
--><metadata><!-- payload-type:application/vnd.excalidraw+json --><!--
payload-version:2 --><!-- payload-start
-->eyJ2ZXJzaW9uIjoiMSIsImVuY29kaW5nIjoiYnN0cmluZyIsImNvbXByZXNzZWQiOnRydWUsImVuY29kZWQiOiJ4nO1cXGtX2spcdTAwMWH+3l/h8nzdzZ55595v3lCrVSpWrOfsxYpcdTAwMTAwXHUwMDFhXHUwMDEzTFx1MDAwMih79b+fXHRaXHUwMDEyyMVcdTAwMDCxW3elqyqT25vM87z3yd9cdTAwMWbW1tbDh761
[...]
+ @font-face { font-family: Excalifont; src:
url(data:font/woff2;base64,d09GMgABAAAAAAzAAA4AAAAAFYwAAAxrAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGhYbg1gcNAZgAGQRCAqdfJYiCywAATYCJANUBCAFgxgHIBuVEKOihLQyyP4iwdiGxdoLHd2IRNQKonCH5Hdwc6znfE/XDc/f7H1Fz9XSAARNYWKz285wR+5gEf+xo/0PpQtdKIIRiq5gBEqWk5hMv2MMuI0n8A8id/tlgQTGr3fcHsLaQmnN22Y9XGH3axIqI15ZE0NWTt8ud3ptrxGkMgJUp/OlnRWzQN734Y+lXpRG8rUd8LWfUdS76m+HEbnm44lNh5l2o2g8HA2Yx1MaEG2JXRHrlXAt6iENAAGAQgVcGASOCIg9niRQPHSaUzKA82ZvraclZ2tFneS2tTcSKHA/N0auo7UR
[...]
\ No newline at end of file
diff --git a/docs/source/user-guide/extending-components.md
b/docs/source/user-guide/extending-components.md
index 60de1b7b1..77036d540 100644
--- a/docs/source/user-guide/extending-components.md
+++ b/docs/source/user-guide/extending-components.md
@@ -230,21 +230,3 @@ let expected = [
assert_batches_eq!(expected, &result);
```
-
-## Example: Client Side Logical/Physical Codec
-
-Default physical and logical codecs can be replaced if needed. For scheduler
and executor procedure is similar to previous example. At the client side
procedure is slightly different, `ballista::prelude::SessionConfigExt` provides
methods to be used to override physical and logical codecs on client side.
-
-```rust
-let session_config = SessionConfig::new_with_ballista()
- .with_information_schema(true)
-
.with_ballista_physical_extension_codec(Arc::new(BetterPhysicalCodec::default()))
-
.with_ballista_logical_extension_codec(Arc::new(BetterLogicalCodec::default()));
-
-let state = SessionStateBuilder::new()
- .with_default_features()
- .with_config(session_config)
- .build();
-
-let ctx: SessionContext = SessionContext::standalone_with_state(state).await?;
-```
diff --git a/docs/source/user-guide/extensions-example.md
b/docs/source/user-guide/extensions-example.md
new file mode 100644
index 000000000..b7c94d9ed
--- /dev/null
+++ b/docs/source/user-guide/extensions-example.md
@@ -0,0 +1,485 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Extensions Example
+
+This project demonstrates possible extensions mechanisms.
+
+The goal of this small project is to enhance Ballista's capabilities by
providing new logical and physical operators,
+utilities, and integration tools to support additional data processing
workflows.
+
+This example will implement [
+`sample()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sample.html)
+operator which will return a sampled subset of original `DataFrame`:
+
+```rust
+let ctx = SessionContext::remote_with_state("df://localhost:50050",
state).await?;
+let df = ctx.read_parquet("data/", Default::default ()).await?;
+
+// The `sample` operator, defined in this project,
+// samples 30% of the data and displays the result.
+let df = df.sample(0.30, None) ?;
+```
+
+To implement this functionality, it is necessary to implement new logical plan
extension, physical operators and extend
+`DataFrame` to expose new operator.
+
+> [!WARNING]
+> Please do not use implemented sampling operator for production,
statisticians would not approve it, probably.
+
+This demo will provide:
+
+- Custom DataFusion (logical and physical) nodes.
+- Logical and physical extension codecs.
+- Custom protocol buffer definitions.
+- Extension query planner.
+
+## Logical Plan Extension
+
+The first step is to implement a custom logical plan extension:
+
+```rust
+//! This module defines the implementation of the `UserDefinedLogicalNodeCore`
trait for the `Sample` logical plan node.
+//!
+//! The `Sample` node represents a custom logical plan extension for sampling
data within a query plan.
+//!
+use std::{hash::Hash, vec};
+
+use datafusion::{
+ error::DataFusionError,
+ logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore},
+};
+
+#[derive(Debug, Clone, PartialEq, PartialOrd)]
+pub struct Sample {
+ pub fraction: f32,
+ pub seed: Option<i64>,
+ pub input: LogicalPlan,
+}
+
+impl Hash for Sample {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.seed.hash(state);
+ self.input.hash(state);
+ }
+}
+
+impl Eq for Sample {}
+
+impl Sample {
+ pub fn new(fraction: f32, seed: Option<i64>, input: LogicalPlan) -> Self {
+ Self {
+ fraction,
+ seed,
+ input,
+ }
+ }
+}
+
+impl UserDefinedLogicalNodeCore for Sample {
+ fn name(&self) -> &str {
+ "Sample"
+ }
+
+ fn inputs(&self) -> Vec<&LogicalPlan> {
+ vec![&self.input]
+ }
+
+ fn schema(&self) -> &datafusion::common::DFSchemaRef {
+ self.input.schema()
+ }
+
+ fn expressions(&self) -> Vec<datafusion::prelude::Expr> {
+ vec![]
+ }
+
+ fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result
{
+ f.write_fmt(format_args!(
+ "Sample: fraction: {}, seed: {:?}",
+ self.fraction, self.seed
+ ))?;
+ Ok(())
+ }
+
+ fn with_exprs_and_inputs(
+ &self,
+ _exprs: Vec<datafusion::prelude::Expr>,
+ inputs: Vec<LogicalPlan>,
+ ) -> datafusion::error::Result<Self> {
+ Ok(Self {
+ seed: self.seed,
+ fraction: self.fraction,
+ input: inputs
+ .first()
+ .ok_or(DataFusionError::Plan("expected single
input".to_string()))?
+ .clone(),
+ })
+ }
+}
+```
+
+## DataFrame Extension
+
+To expose this functionality to end users, a DataFrame extension] is
implemented. This extension creates a
+`LogicalPlan::Extension(extension)` node:
+
+```rust
+use std::sync::Arc;
+
+use datafusion::{
+ error::DataFusionError,
+ logical_expr::{Extension, LogicalPlan},
+ prelude::DataFrame,
+};
+
+use crate::logical::sample_extension::Sample;
+
+pub trait DataFrameExt {
+ fn sample(self, fraction: f32, seed: Option<i64>) ->
datafusion::error::Result<DataFrame>;
+}
+
+/// Returns a new `DataFrame` containing a random sample of rows from the
original `DataFrame`.
+///
+/// # Arguments
+///
+/// * `fraction` - The fraction of rows to sample, must be in the range (0.0,
1.0].
+/// * `seed` - An optional seed for the random number generator to ensure
reproducibility.
+///
+/// # Errors
+///
+/// Returns a `DataFusionError::Configuration` if `fraction` is not within the
valid range.
+///
+impl DataFrameExt for DataFrame {
+ fn sample(self, fraction: f32, seed: Option<i64>) ->
datafusion::error::Result<DataFrame> {
+ if !(fraction > 0.0 && fraction <= 1.0) {
+ Err(DataFusionError::Configuration(
+ "fraction should be in 0 ..= 1 range".to_string(),
+ ))?
+ }
+
+ if seed.unwrap_or(0) < 0 {
+ Err(DataFusionError::Configuration(
+ "seed should be positive number".to_string(),
+ ))?
+ }
+
+ let (state, input) = self.into_parts();
+
+ let node = Arc::new(Sample {
+ fraction,
+ seed,
+ input,
+ });
+ let extension = Extension { node };
+ let plan = LogicalPlan::Extension(extension);
+
+ Ok(DataFrame::new(state, plan))
+ }
+}
+```
+
+This approach enables the addition of new methods to the DataFusion DataFrame
implementation:
+
+```rust
+let ctx = SessionContext::remote_with_state("df://localhost:50050",
state).await?;
+let df = ctx.read_parquet("data/", Default::default ()).await?;
+
+// The DataFrame extension provides the `sample` method
+let df = df.sample(0.30, None) ?;
+```
+
+
+
+## Logical Extension Codec
+
+With the extension in place, a custom logical extension codec is required to
transmit the client logical plan to the
+scheduler.
+
+The logical extension codec typically consists of two components: Google
Protocol Buffer definitions:
+
+```proto
+message LMessage {
+ oneof Extension {
+ LSample sample = 1;
+ }
+}
+
+message LSample {
+ float fraction = 1;
+ optional int64 seed = 2;
+}
+```
+
+`LogicalExtensionCodec` extends `BallistaLogicalExtensionCodec` handling newly
defined operator messages:
+
+```rust
+#[derive(Debug, Default)]
+pub struct ExtendedBallistaLogicalCodec {
+ inner: BallistaLogicalExtensionCodec,
+}
+
+impl LogicalExtensionCodec for ExtendedBallistaLogicalCodec {
+ fn try_decode(
+ &self,
+ buf: &[u8],
+ inputs: &[datafusion::logical_expr::LogicalPlan],
+ _ctx: &datafusion::prelude::SessionContext,
+ ) -> datafusion::error::Result<datafusion::logical_expr::Extension> {
+ let message =
+ LMessage::decode(buf).map_err(|e|
DataFusionError::Internal(e.to_string()))?;
+
+ match message.extension {
+ Some(Extension::Sample(sample)) => {
+ let node = Arc::new(Sample {
+ input: inputs
+ .first()
+ .ok_or(DataFusionError::Plan("expected
input".to_string()))?
+ .clone(),
+ seed: sample.seed,
+ fraction: sample.fraction,
+ });
+
+ Ok(datafusion::logical_expr::Extension { node })
+ }
+ None => plan_err!("Can't cast logical extension "),
+ }
+ }
+
+ fn try_encode(
+ &self,
+ node: &datafusion::logical_expr::Extension,
+ buf: &mut Vec<u8>,
+ ) -> datafusion::error::Result<()> {
+ if let Some(Sample { seed, fraction, .. }) =
node.node.as_any().downcast_ref::<Sample>() {
+ let sample = LSample {
+ seed: *seed,
+ fraction: *fraction,
+ };
+ let message = LMessage {
+ extension:
Some(super::messages::l_message::Extension::Sample(sample)),
+ };
+
+ message
+ .encode(buf)
+ .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ Ok(())
+ } else {
+ self.inner.try_encode(node, buf)
+ }
+ }
+ // Additional implementation omitted for brevity
+}
+```
+
+in short,implementation of the `LogicalExtensionCodec` trait, which handles
conversion between Rust structures and
+protocol buffer definitions.
+
+## Logical to Physical Plan Translation
+
+Once the logical plan extension is provided, a translation from the logical
node to a physical node is required. The
+transformation is performed using implementing `ExtensionPlanner` trait:
+
+```rust
+#[derive(Debug, Clone, Default)]
+pub struct CustomPlannerExtension {}
+
+#[async_trait]
+impl ExtensionPlanner for CustomPlannerExtension {
+ async fn plan_extension(
+ &self,
+ _planner: &dyn PhysicalPlanner,
+ node: &dyn UserDefinedLogicalNode,
+ _logical_inputs: &[&LogicalPlan],
+ physical_inputs: &[Arc<dyn ExecutionPlan>],
+ _session_state: &SessionState,
+ ) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> {
+ if let Some(Sample { fraction, seed, .. }) =
node.as_any().downcast_ref::<Sample>() {
+ let input = physical_inputs
+ .first()
+ .ok_or(DataFusionError::Plan("expected single
input".to_string()))?
+ .clone();
+ let node = SampleExec::new(*fraction, *seed, input);
+ let node = Arc::new(node);
+
+ Ok(Some(node))
+ } else {
+ Ok(None)
+ }
+ }
+}
+```
+
+The custom planner is registered in the session state as follows:
+
+```rust
+let query_planner = Arc::new(QueryPlannerWithExtensions::default ());
+
+let state = SessionStateBuilder::new()
+.with_query_planner(query_planner)
+.with_default_features()
+.build();
+```
+
+Finally, the generated physical plan is serialized using the physical plan
extension codec and
+transmitted to the executor(s). Implementation is an extension of
`BallistaPhysicalExtensionCodec`:
+
+```rust
+#[derive(Debug, Default)]
+pub struct ExtendedBallistaPhysicalCodec {
+ inner: BallistaPhysicalExtensionCodec,
+}
+
+impl PhysicalExtensionCodec for ExtendedBallistaPhysicalCodec {
+ fn try_decode(
+ &self,
+ buf: &[u8],
+ inputs: &[std::sync::Arc<dyn
datafusion::physical_plan::ExecutionPlan>],
+ registry: &dyn datafusion::execution::FunctionRegistry,
+ ) -> datafusion::error::Result<std::sync::Arc<dyn
datafusion::physical_plan::ExecutionPlan>>
+ {
+ let message =
+ PMessage::decode(buf).map_err(|e|
DataFusionError::Internal(e.to_string()))?;
+
+ match message.extension {
+ Some(super::messages::p_message::Extension::Sample(PSample {
+ fraction,
seed, ..
+ })) => {
+ let input = inputs
+ .first()
+ .ok_or(DataFusionError::Plan("expected
input".to_string()))?
+ .clone();
+
+ let node = Arc::new(SampleExec::new(fraction, seed, input));
+
+ Ok(node)
+ }
+
+ Some(super::messages::p_message::Extension::Opaque(opaque)) => {
+ self.inner.try_decode(&opaque, inputs, registry)
+ }
+ None => plan_err!("Can't cast physical extension "),
+ }
+ }
+
+ fn try_encode(
+ &self,
+ node: std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>,
+ buf: &mut Vec<u8>,
+ ) -> datafusion::error::Result<()> {
+ if let Some(SampleExec { fraction, seed, .. }) =
node.as_any().downcast_ref::<SampleExec>()
+ {
+ let message = PMessage {
+ extension:
Some(super::messages::p_message::Extension::Sample(PSample {
+ fraction: *fraction,
+ seed: *seed,
+ })),
+ };
+
+ message
+ .encode(buf)
+ .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ Ok(())
+ } else {
+ let mut opaque = vec![];
+ self.inner
+ .try_encode(node, &mut opaque)
+ .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ let message = PMessage {
+ extension:
Some(super::messages::p_message::Extension::Opaque(opaque)),
+ };
+
+ message
+ .encode(buf)
+ .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ Ok(())
+ }
+ }
+}
+
+```
+
+This should be all moving parts necessary to extend ballista functionality.
Last step would be to
+configure scheduler and executor to use new features.
+
+`SchedulerConfig` should be configured overriding logical, physical codec and
session builder function:
+
+```rust
+let config: SchedulerConfig = SchedulerConfig {
+override_logical_codec: Some(Arc::new(ExtendedBallistaLogicalCodec::default
())),
+override_physical_codec: Some(Arc::new(ExtendedBallistaPhysicalCodec::default
())),
+override_session_builder: Some(Arc::new(extended_state_producer)),
+..Default::default ()
+};
+
+let address = format!("{}:{}", config.bind_host, config.bind_port);
+let address = address
+.parse()
+.map_err( | e: AddrParseError| BallistaError::Configuration(e.to_string())) ?;
+
+let cluster = BallistaCluster::new_from_config( & config).await?;
+
+start_server(cluster, address, Arc::new(config)).await?;
+```
+
+```rust
+pub fn extended_state_producer(config: SessionConfig) ->
datafusion::error::Result<SessionState> {
+ // we need custom query planner to convert logical to physical operator
+ let query_planner = Arc::new(QueryPlannerWithExtensions::default());
+
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_query_planner(query_planner)
+ .with_default_features()
+ .build();
+
+ Ok(state)
+}
+```
+
+similarly for `ExecutorProcessConfig`:
+
+```rust
+let config: ExecutorProcessConfig = ExecutorProcessConfig {
+override_logical_codec: Some(Arc::new(ExtendedBallistaLogicalCodec::default
())),
+override_physical_codec: Some(Arc::new(ExtendedBallistaPhysicalCodec::default
())),
+..Default::default ()
+};
+
+start_executor_process(Arc::new(config)).await
+```
+
+## Conclusion
+
+This project demonstrates how to extend Ballista with custom logical and
physical operators, codecs, and planner logic.
+By following the outlined steps, you can introduce new DataFrame operations
and ensure they are supported throughout the
+distributed query lifecycle.
+
+For more details, refer to the source code and the linked example files.
Contributions and feedback are welcome!
+
+---
+
+**Related links:**
+
+- [Ballista Extensions Source
Code](https://github.com/milenkovicm/ballista_extensions)
+- [DataFusion Documentation](https://datafusion.apache.org)
+- [Rust Tonic (GRPC) support](https://docs.rs/tonic/latest/tonic/)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]