This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/asf-site by this push:
new ef0434a8c Publish built docs triggered by
38fca8b995697f5ba2e9539f8b94b295f8fa6043
ef0434a8c is described below
commit ef0434a8c9ddb4948a4730ef66cc79ffc0870053
Author: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Jan 18 13:51:58 2026 +0000
Publish built docs triggered by 38fca8b995697f5ba2e9539f8b94b295f8fa6043
---
_images/ballista_extensions.excalidraw.svg | 4 +
_sources/index.rst.txt | 1 +
_sources/user-guide/extending-components.md.txt | 18 -
_sources/user-guide/extensions-example.md.txt | 485 ++++++++++++++
community/communication.html | 5 +
contributors-guide/architecture.html | 11 +-
contributors-guide/code-organization.html | 5 +
contributors-guide/development.html | 5 +
genindex.html | 5 +
index.html | 6 +
objects.inv | Bin 765 -> 787 bytes
search.html | 5 +
searchindex.js | 2 +-
user-guide/cli.html | 5 +
user-guide/configs.html | 5 +
user-guide/deployment/cargo-install.html | 5 +
user-guide/deployment/docker-compose.html | 5 +
user-guide/deployment/docker.html | 5 +
user-guide/deployment/index.html | 5 +
user-guide/deployment/kubernetes.html | 5 +
user-guide/deployment/quick-start.html | 5 +
user-guide/extending-components.html | 33 +-
user-guide/extensions-example.html | 830 ++++++++++++++++++++++++
user-guide/faq.html | 5 +
user-guide/introduction.html | 5 +
user-guide/metrics.html | 5 +
user-guide/python.html | 5 +
user-guide/rust.html | 5 +
user-guide/scheduler.html | 5 +
user-guide/tuning-guide.html | 5 +
30 files changed, 1443 insertions(+), 47 deletions(-)
diff --git a/_images/ballista_extensions.excalidraw.svg
b/_images/ballista_extensions.excalidraw.svg
new file mode 100644
index 000000000..8cac476cc
--- /dev/null
+++ b/_images/ballista_extensions.excalidraw.svg
@@ -0,0 +1,4 @@
+<?xml version="1.0" standalone="no"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg version="1.1" xmlns="http://www.w3.org/2000/svg" viewBox="0 0 1114.5
301.5" width="3343.5" height="904.5"><!-- svg-source:excalidraw
--><metadata><!-- payload-type:application/vnd.excalidraw+json --><!--
payload-version:2 --><!-- payload-start
-->eyJ2ZXJzaW9uIjoiMSIsImVuY29kaW5nIjoiYnN0cmluZyIsImNvbXByZXNzZWQiOnRydWUsImVuY29kZWQiOiJ4nO1cXGtX2spcdTAwMWH+3l/h8nzdzZ55595v3lCrVSpWrOfsxYpcdTAwMTAwXHUwMDFhXHUwMDEzTFx1MDAwMih79b+fXHRaXHUwMDEyyMVcdTAwMDCxW3elqyqT25vM87z3yd9cdTAwMWbW1tbDh761
[...]
+ @font-face { font-family: Excalifont; src:
url(data:font/woff2;base64,d09GMgABAAAAAAzAAA4AAAAAFYwAAAxrAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGhYbg1gcNAZgAGQRCAqdfJYiCywAATYCJANUBCAFgxgHIBuVEKOihLQyyP4iwdiGxdoLHd2IRNQKonCH5Hdwc6znfE/XDc/f7H1Fz9XSAARNYWKz285wR+5gEf+xo/0PpQtdKIIRiq5gBEqWk5hMv2MMuI0n8A8id/tlgQTGr3fcHsLaQmnN22Y9XGH3axIqI15ZE0NWTt8ud3ptrxGkMgJUp/OlnRWzQN734Y+lXpRG8rUd8LWfUdS76m+HEbnm44lNh5l2o2g8HA2Yx1MaEG2JXRHrlXAt6iENAAGAQgVcGASOCIg9niRQPHSaUzKA82ZvraclZ2tFneS2tTcSKHA/N0auo7UR
[...]
\ No newline at end of file
diff --git a/_sources/index.rst.txt b/_sources/index.rst.txt
index 9289eab75..d4c607500 100644
--- a/_sources/index.rst.txt
+++ b/_sources/index.rst.txt
@@ -56,6 +56,7 @@ Table of content
user-guide/metrics
user-guide/faq
user-guide/extending-components
+ user-guide/extensions-example
.. _toc.contributors:
diff --git a/_sources/user-guide/extending-components.md.txt
b/_sources/user-guide/extending-components.md.txt
index 60de1b7b1..77036d540 100644
--- a/_sources/user-guide/extending-components.md.txt
+++ b/_sources/user-guide/extending-components.md.txt
@@ -230,21 +230,3 @@ let expected = [
assert_batches_eq!(expected, &result);
```
-
-## Example: Client Side Logical/Physical Codec
-
-Default physical and logical codecs can be replaced if needed. For scheduler
and executor procedure is similar to previous example. At the client side
procedure is slightly different, `ballista::prelude::SessionConfigExt` provides
methods to be used to override physical and logical codecs on client side.
-
-```rust
-let session_config = SessionConfig::new_with_ballista()
- .with_information_schema(true)
-
.with_ballista_physical_extension_codec(Arc::new(BetterPhysicalCodec::default()))
-
.with_ballista_logical_extension_codec(Arc::new(BetterLogicalCodec::default()));
-
-let state = SessionStateBuilder::new()
- .with_default_features()
- .with_config(session_config)
- .build();
-
-let ctx: SessionContext = SessionContext::standalone_with_state(state).await?;
-```
diff --git a/_sources/user-guide/extensions-example.md.txt
b/_sources/user-guide/extensions-example.md.txt
new file mode 100644
index 000000000..b7c94d9ed
--- /dev/null
+++ b/_sources/user-guide/extensions-example.md.txt
@@ -0,0 +1,485 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Extensions Example
+
+This project demonstrates possible extensions mechanisms.
+
+The goal of this small project is to enhance Ballista's capabilities by
providing new logical and physical operators,
+utilities, and integration tools to support additional data processing
workflows.
+
+This example will implement [
+`sample()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sample.html)
+operator which will return a sampled subset of original `DataFrame`:
+
+```rust
+let ctx = SessionContext::remote_with_state("df://localhost:50050",
state).await?;
+let df = ctx.read_parquet("data/", Default::default ()).await?;
+
+// The `sample` operator, defined in this project,
+// samples 30% of the data and displays the result.
+let df = df.sample(0.30, None) ?;
+```
+
+To implement this functionality, it is necessary to implement new logical plan
extension, physical operators and extend
+`DataFrame` to expose new operator.
+
+> [!WARNING]
+> Please do not use implemented sampling operator for production,
statisticians would not approve it, probably.
+
+This demo will provide:
+
+- Custom DataFusion (logical and physical) nodes.
+- Logical and physical extension codecs.
+- Custom protocol buffer definitions.
+- Extension query planner.
+
+## Logical Plan Extension
+
+The first step is to implement a custom logical plan extension:
+
+```rust
+//! This module defines the implementation of the `UserDefinedLogicalNodeCore`
trait for the `Sample` logical plan node.
+//!
+//! The `Sample` node represents a custom logical plan extension for sampling
data within a query plan.
+//!
+use std::{hash::Hash, vec};
+
+use datafusion::{
+ error::DataFusionError,
+ logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore},
+};
+
+#[derive(Debug, Clone, PartialEq, PartialOrd)]
+pub struct Sample {
+ pub fraction: f32,
+ pub seed: Option<i64>,
+ pub input: LogicalPlan,
+}
+
+impl Hash for Sample {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.seed.hash(state);
+ self.input.hash(state);
+ }
+}
+
+impl Eq for Sample {}
+
+impl Sample {
+ pub fn new(fraction: f32, seed: Option<i64>, input: LogicalPlan) -> Self {
+ Self {
+ fraction,
+ seed,
+ input,
+ }
+ }
+}
+
+impl UserDefinedLogicalNodeCore for Sample {
+ fn name(&self) -> &str {
+ "Sample"
+ }
+
+ fn inputs(&self) -> Vec<&LogicalPlan> {
+ vec![&self.input]
+ }
+
+ fn schema(&self) -> &datafusion::common::DFSchemaRef {
+ self.input.schema()
+ }
+
+ fn expressions(&self) -> Vec<datafusion::prelude::Expr> {
+ vec![]
+ }
+
+ fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result
{
+ f.write_fmt(format_args!(
+ "Sample: fraction: {}, seed: {:?}",
+ self.fraction, self.seed
+ ))?;
+ Ok(())
+ }
+
+ fn with_exprs_and_inputs(
+ &self,
+ _exprs: Vec<datafusion::prelude::Expr>,
+ inputs: Vec<LogicalPlan>,
+ ) -> datafusion::error::Result<Self> {
+ Ok(Self {
+ seed: self.seed,
+ fraction: self.fraction,
+ input: inputs
+ .first()
+ .ok_or(DataFusionError::Plan("expected single
input".to_string()))?
+ .clone(),
+ })
+ }
+}
+```
+
+## DataFrame Extension
+
+To expose this functionality to end users, a DataFrame extension] is
implemented. This extension creates a
+`LogicalPlan::Extension(extension)` node:
+
+```rust
+use std::sync::Arc;
+
+use datafusion::{
+ error::DataFusionError,
+ logical_expr::{Extension, LogicalPlan},
+ prelude::DataFrame,
+};
+
+use crate::logical::sample_extension::Sample;
+
+pub trait DataFrameExt {
+ fn sample(self, fraction: f32, seed: Option<i64>) ->
datafusion::error::Result<DataFrame>;
+}
+
+/// Returns a new `DataFrame` containing a random sample of rows from the
original `DataFrame`.
+///
+/// # Arguments
+///
+/// * `fraction` - The fraction of rows to sample, must be in the range (0.0,
1.0].
+/// * `seed` - An optional seed for the random number generator to ensure
reproducibility.
+///
+/// # Errors
+///
+/// Returns a `DataFusionError::Configuration` if `fraction` is not within the
valid range.
+///
+impl DataFrameExt for DataFrame {
+ fn sample(self, fraction: f32, seed: Option<i64>) ->
datafusion::error::Result<DataFrame> {
+ if !(fraction > 0.0 && fraction <= 1.0) {
+ Err(DataFusionError::Configuration(
+ "fraction should be in 0 ..= 1 range".to_string(),
+ ))?
+ }
+
+ if seed.unwrap_or(0) < 0 {
+ Err(DataFusionError::Configuration(
+ "seed should be positive number".to_string(),
+ ))?
+ }
+
+ let (state, input) = self.into_parts();
+
+ let node = Arc::new(Sample {
+ fraction,
+ seed,
+ input,
+ });
+ let extension = Extension { node };
+ let plan = LogicalPlan::Extension(extension);
+
+ Ok(DataFrame::new(state, plan))
+ }
+}
+```
+
+This approach enables the addition of new methods to the DataFusion DataFrame
implementation:
+
+```rust
+let ctx = SessionContext::remote_with_state("df://localhost:50050",
state).await?;
+let df = ctx.read_parquet("data/", Default::default ()).await?;
+
+// The DataFrame extension provides the `sample` method
+let df = df.sample(0.30, None) ?;
+```
+
+
+
+## Logical Extension Codec
+
+With the extension in place, a custom logical extension codec is required to
transmit the client logical plan to the
+scheduler.
+
+The logical extension codec typically consists of two components: Google
Protocol Buffer definitions:
+
+```proto
+message LMessage {
+ oneof Extension {
+ LSample sample = 1;
+ }
+}
+
+message LSample {
+ float fraction = 1;
+ optional int64 seed = 2;
+}
+```
+
+`LogicalExtensionCodec` extends `BallistaLogicalExtensionCodec` handling newly
defined operator messages:
+
+```rust
+#[derive(Debug, Default)]
+pub struct ExtendedBallistaLogicalCodec {
+ inner: BallistaLogicalExtensionCodec,
+}
+
+impl LogicalExtensionCodec for ExtendedBallistaLogicalCodec {
+ fn try_decode(
+ &self,
+ buf: &[u8],
+ inputs: &[datafusion::logical_expr::LogicalPlan],
+ _ctx: &datafusion::prelude::SessionContext,
+ ) -> datafusion::error::Result<datafusion::logical_expr::Extension> {
+ let message =
+ LMessage::decode(buf).map_err(|e|
DataFusionError::Internal(e.to_string()))?;
+
+ match message.extension {
+ Some(Extension::Sample(sample)) => {
+ let node = Arc::new(Sample {
+ input: inputs
+ .first()
+ .ok_or(DataFusionError::Plan("expected
input".to_string()))?
+ .clone(),
+ seed: sample.seed,
+ fraction: sample.fraction,
+ });
+
+ Ok(datafusion::logical_expr::Extension { node })
+ }
+ None => plan_err!("Can't cast logical extension "),
+ }
+ }
+
+ fn try_encode(
+ &self,
+ node: &datafusion::logical_expr::Extension,
+ buf: &mut Vec<u8>,
+ ) -> datafusion::error::Result<()> {
+ if let Some(Sample { seed, fraction, .. }) =
node.node.as_any().downcast_ref::<Sample>() {
+ let sample = LSample {
+ seed: *seed,
+ fraction: *fraction,
+ };
+ let message = LMessage {
+ extension:
Some(super::messages::l_message::Extension::Sample(sample)),
+ };
+
+ message
+ .encode(buf)
+ .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ Ok(())
+ } else {
+ self.inner.try_encode(node, buf)
+ }
+ }
+ // Additional implementation omitted for brevity
+}
+```
+
+in short,implementation of the `LogicalExtensionCodec` trait, which handles
conversion between Rust structures and
+protocol buffer definitions.
+
+## Logical to Physical Plan Translation
+
+Once the logical plan extension is provided, a translation from the logical
node to a physical node is required. The
+transformation is performed using implementing `ExtensionPlanner` trait:
+
+```rust
+#[derive(Debug, Clone, Default)]
+pub struct CustomPlannerExtension {}
+
+#[async_trait]
+impl ExtensionPlanner for CustomPlannerExtension {
+ async fn plan_extension(
+ &self,
+ _planner: &dyn PhysicalPlanner,
+ node: &dyn UserDefinedLogicalNode,
+ _logical_inputs: &[&LogicalPlan],
+ physical_inputs: &[Arc<dyn ExecutionPlan>],
+ _session_state: &SessionState,
+ ) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> {
+ if let Some(Sample { fraction, seed, .. }) =
node.as_any().downcast_ref::<Sample>() {
+ let input = physical_inputs
+ .first()
+ .ok_or(DataFusionError::Plan("expected single
input".to_string()))?
+ .clone();
+ let node = SampleExec::new(*fraction, *seed, input);
+ let node = Arc::new(node);
+
+ Ok(Some(node))
+ } else {
+ Ok(None)
+ }
+ }
+}
+```
+
+The custom planner is registered in the session state as follows:
+
+```rust
+let query_planner = Arc::new(QueryPlannerWithExtensions::default ());
+
+let state = SessionStateBuilder::new()
+.with_query_planner(query_planner)
+.with_default_features()
+.build();
+```
+
+Finally, the generated physical plan is serialized using the physical plan
extension codec and
+transmitted to the executor(s). Implementation is an extension of
`BallistaPhysicalExtensionCodec`:
+
+```rust
+#[derive(Debug, Default)]
+pub struct ExtendedBallistaPhysicalCodec {
+ inner: BallistaPhysicalExtensionCodec,
+}
+
+impl PhysicalExtensionCodec for ExtendedBallistaPhysicalCodec {
+ fn try_decode(
+ &self,
+ buf: &[u8],
+ inputs: &[std::sync::Arc<dyn
datafusion::physical_plan::ExecutionPlan>],
+ registry: &dyn datafusion::execution::FunctionRegistry,
+ ) -> datafusion::error::Result<std::sync::Arc<dyn
datafusion::physical_plan::ExecutionPlan>>
+ {
+ let message =
+ PMessage::decode(buf).map_err(|e|
DataFusionError::Internal(e.to_string()))?;
+
+ match message.extension {
+ Some(super::messages::p_message::Extension::Sample(PSample {
+ fraction,
seed, ..
+ })) => {
+ let input = inputs
+ .first()
+ .ok_or(DataFusionError::Plan("expected
input".to_string()))?
+ .clone();
+
+ let node = Arc::new(SampleExec::new(fraction, seed, input));
+
+ Ok(node)
+ }
+
+ Some(super::messages::p_message::Extension::Opaque(opaque)) => {
+ self.inner.try_decode(&opaque, inputs, registry)
+ }
+ None => plan_err!("Can't cast physical extension "),
+ }
+ }
+
+ fn try_encode(
+ &self,
+ node: std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>,
+ buf: &mut Vec<u8>,
+ ) -> datafusion::error::Result<()> {
+ if let Some(SampleExec { fraction, seed, .. }) =
node.as_any().downcast_ref::<SampleExec>()
+ {
+ let message = PMessage {
+ extension:
Some(super::messages::p_message::Extension::Sample(PSample {
+ fraction: *fraction,
+ seed: *seed,
+ })),
+ };
+
+ message
+ .encode(buf)
+ .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ Ok(())
+ } else {
+ let mut opaque = vec![];
+ self.inner
+ .try_encode(node, &mut opaque)
+ .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ let message = PMessage {
+ extension:
Some(super::messages::p_message::Extension::Opaque(opaque)),
+ };
+
+ message
+ .encode(buf)
+ .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ Ok(())
+ }
+ }
+}
+
+```
+
+This should be all moving parts necessary to extend ballista functionality.
Last step would be to
+configure scheduler and executor to use new features.
+
+`SchedulerConfig` should be configured overriding logical, physical codec and
session builder function:
+
+```rust
+let config: SchedulerConfig = SchedulerConfig {
+override_logical_codec: Some(Arc::new(ExtendedBallistaLogicalCodec::default
())),
+override_physical_codec: Some(Arc::new(ExtendedBallistaPhysicalCodec::default
())),
+override_session_builder: Some(Arc::new(extended_state_producer)),
+..Default::default ()
+};
+
+let address = format!("{}:{}", config.bind_host, config.bind_port);
+let address = address
+.parse()
+.map_err( | e: AddrParseError| BallistaError::Configuration(e.to_string())) ?;
+
+let cluster = BallistaCluster::new_from_config( & config).await?;
+
+start_server(cluster, address, Arc::new(config)).await?;
+```
+
+```rust
+pub fn extended_state_producer(config: SessionConfig) ->
datafusion::error::Result<SessionState> {
+ // we need custom query planner to convert logical to physical operator
+ let query_planner = Arc::new(QueryPlannerWithExtensions::default());
+
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_query_planner(query_planner)
+ .with_default_features()
+ .build();
+
+ Ok(state)
+}
+```
+
+similarly for `ExecutorProcessConfig`:
+
+```rust
+let config: ExecutorProcessConfig = ExecutorProcessConfig {
+override_logical_codec: Some(Arc::new(ExtendedBallistaLogicalCodec::default
())),
+override_physical_codec: Some(Arc::new(ExtendedBallistaPhysicalCodec::default
())),
+..Default::default ()
+};
+
+start_executor_process(Arc::new(config)).await
+```
+
+## Conclusion
+
+This project demonstrates how to extend Ballista with custom logical and
physical operators, codecs, and planner logic.
+By following the outlined steps, you can introduce new DataFrame operations
and ensure they are supported throughout the
+distributed query lifecycle.
+
+For more details, refer to the source code and the linked example files.
Contributions and feedback are welcome!
+
+---
+
+**Related links:**
+
+- [Ballista Extensions Source
Code](https://github.com/milenkovicm/ballista_extensions)
+- [DataFusion Documentation](https://datafusion.apache.org)
+- [Rust Tonic (GRPC) support](https://docs.rs/tonic/latest/tonic/)
diff --git a/community/communication.html b/community/communication.html
index 9fbffa329..5ac0d0ee9 100644
--- a/community/communication.html
+++ b/community/communication.html
@@ -181,6 +181,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../user-guide/extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/contributors-guide/architecture.html
b/contributors-guide/architecture.html
index d21c1d129..0fa0d0a8f 100644
--- a/contributors-guide/architecture.html
+++ b/contributors-guide/architecture.html
@@ -34,7 +34,7 @@
<link rel="index" title="Index" href="../genindex.html" />
<link rel="search" title="Search" href="../search.html" />
<link rel="next" title="Ballista Code Organization"
href="code-organization.html" />
- <link rel="prev" title="Extending Ballista Scheduler And Executors"
href="../user-guide/extending-components.html" />
+ <link rel="prev" title="Extensions Example"
href="../user-guide/extensions-example.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="en">
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../user-guide/extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
@@ -544,11 +549,11 @@ and <a class="reference external"
href="https://github.com/apache/datafusion-bal
<!-- Previous / next buttons -->
<div class='prev-next-area'>
- <a class='left-prev' id="prev-link"
href="../user-guide/extending-components.html" title="previous page">
+ <a class='left-prev' id="prev-link"
href="../user-guide/extensions-example.html" title="previous page">
<i class="fas fa-angle-left"></i>
<div class="prev-next-info">
<p class="prev-next-subtitle">previous</p>
- <p class="prev-next-title">Extending Ballista Scheduler And
Executors</p>
+ <p class="prev-next-title">Extensions Example</p>
</div>
</a>
<a class='right-next' id="next-link" href="code-organization.html"
title="next page">
diff --git a/contributors-guide/code-organization.html
b/contributors-guide/code-organization.html
index 23621c8bf..cc463d977 100644
--- a/contributors-guide/code-organization.html
+++ b/contributors-guide/code-organization.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../user-guide/extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/contributors-guide/development.html
b/contributors-guide/development.html
index 364f18250..e7e33d69d 100644
--- a/contributors-guide/development.html
+++ b/contributors-guide/development.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../user-guide/extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/genindex.html b/genindex.html
index 9518f56aa..affddf4dd 100644
--- a/genindex.html
+++ b/genindex.html
@@ -179,6 +179,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="user-guide/extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/index.html b/index.html
index 077e88608..bf9736d73 100644
--- a/index.html
+++ b/index.html
@@ -181,6 +181,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="user-guide/extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
@@ -322,6 +327,7 @@
<li class="toctree-l1"><a class="reference internal"
href="user-guide/metrics.html">Ballista Scheduler Metrics</a></li>
<li class="toctree-l1"><a class="reference internal"
href="user-guide/faq.html">Frequently Asked Questions</a></li>
<li class="toctree-l1"><a class="reference internal"
href="user-guide/extending-components.html">Extending Ballista Scheduler And
Executors</a></li>
+<li class="toctree-l1"><a class="reference internal"
href="user-guide/extensions-example.html">Extensions Example</a></li>
</ul>
</div>
<div class="toctree-wrapper compound" id="toc-contributors">
diff --git a/objects.inv b/objects.inv
index 451829d3e..f887278bd 100644
Binary files a/objects.inv and b/objects.inv differ
diff --git a/search.html b/search.html
index e4409eec5..e9cc6d948 100644
--- a/search.html
+++ b/search.html
@@ -186,6 +186,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="user-guide/extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/searchindex.js b/searchindex.js
index 325691b0e..99c3de495 100644
--- a/searchindex.js
+++ b/searchindex.js
@@ -1 +1 @@
-Search.setIndex({"alltitles": {"Apache DataFusion Ballista": [[4, null]],
"Arrow-native": [[1, "arrow-native"]], "Autoscaling Executors": [[11,
"autoscaling-executors"]], "Ballista Architecture": [[1, null]], "Ballista Code
Organization": [[2, null]], "Ballista Command-line Interface": [[5, null]],
"Ballista Configuration Settings": [[6, "ballista-configuration-settings"]],
"Ballista Development": [[3, null]], "Ballista Python Bindings": [[17, null]],
"Ballista Quickstart": [[12, null]], [...]
\ No newline at end of file
+Search.setIndex({"alltitles": {"Apache DataFusion Ballista": [[4, null]],
"Arrow-native": [[1, "arrow-native"]], "Autoscaling Executors": [[11,
"autoscaling-executors"]], "Ballista Architecture": [[1, null]], "Ballista Code
Organization": [[2, null]], "Ballista Command-line Interface": [[5, null]],
"Ballista Configuration Settings": [[6, "ballista-configuration-settings"]],
"Ballista Development": [[3, null]], "Ballista Python Bindings": [[18, null]],
"Ballista Quickstart": [[12, null]], [...]
\ No newline at end of file
diff --git a/user-guide/cli.html b/user-guide/cli.html
index 5227a93f1..ec4d34bd0 100644
--- a/user-guide/cli.html
+++ b/user-guide/cli.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/configs.html b/user-guide/configs.html
index a888eefe7..5eaf3b185 100644
--- a/user-guide/configs.html
+++ b/user-guide/configs.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/deployment/cargo-install.html
b/user-guide/deployment/cargo-install.html
index d8e12e23b..d71e57d6c 100644
--- a/user-guide/deployment/cargo-install.html
+++ b/user-guide/deployment/cargo-install.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/deployment/docker-compose.html
b/user-guide/deployment/docker-compose.html
index 0afcc8370..7ae44a87d 100644
--- a/user-guide/deployment/docker-compose.html
+++ b/user-guide/deployment/docker-compose.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/deployment/docker.html
b/user-guide/deployment/docker.html
index c71f26af7..b221735f2 100644
--- a/user-guide/deployment/docker.html
+++ b/user-guide/deployment/docker.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/deployment/index.html b/user-guide/deployment/index.html
index 75126f99b..6de1ba619 100644
--- a/user-guide/deployment/index.html
+++ b/user-guide/deployment/index.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/deployment/kubernetes.html
b/user-guide/deployment/kubernetes.html
index 118087914..833ae2d7d 100644
--- a/user-guide/deployment/kubernetes.html
+++ b/user-guide/deployment/kubernetes.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/deployment/quick-start.html
b/user-guide/deployment/quick-start.html
index d9e443d9d..960a2fab3 100644
--- a/user-guide/deployment/quick-start.html
+++ b/user-guide/deployment/quick-start.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/extending-components.html
b/user-guide/extending-components.html
index ea17cc214..c97ed451c 100644
--- a/user-guide/extending-components.html
+++ b/user-guide/extending-components.html
@@ -33,7 +33,7 @@
<script src="../_static/sphinx_highlight.js?v=dc90522c"></script>
<link rel="index" title="Index" href="../genindex.html" />
<link rel="search" title="Search" href="../search.html" />
- <link rel="next" title="Ballista Architecture"
href="../contributors-guide/architecture.html" />
+ <link rel="next" title="Extensions Example" href="extensions-example.html"
/>
<link rel="prev" title="Frequently Asked Questions" href="faq.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="en">
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
@@ -277,11 +282,6 @@
</li>
</ul>
</li>
- <li class="toc-h2 nav-item toc-entry">
- <a class="reference internal nav-link"
href="#example-client-side-logical-physical-codec">
- Example: Client Side Logical/Physical Codec
- </a>
- </li>
</ul>
</nav>
@@ -534,23 +534,6 @@ new configuration extensions, object stores, logical and
physical codecs …</p>
</div>
</section>
</section>
-<section id="example-client-side-logical-physical-codec">
-<h2>Example: Client Side Logical/Physical Codec<a class="headerlink"
href="#example-client-side-logical-physical-codec" title="Link to this
heading">¶</a></h2>
-<p>Default physical and logical codecs can be replaced if needed. For
scheduler and executor procedure is similar to previous example. At the client
side procedure is slightly different, <code class="docutils literal
notranslate"><span
class="pre">ballista::prelude::SessionConfigExt</span></code> provides methods
to be used to override physical and logical codecs on client side.</p>
-<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="kd">let</span><span class="w">
</span><span class="n">session_config</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span
class="n">SessionConfig</span><span class="p">::</span><span
class="n">new_with_ballista</span><span class="p">()</span>
-<span class="w"> </span><span class="p">.</span><span
class="n">with_information_schema</span><span class="p">(</span><span
class="kc">true</span><span class="p">)</span>
-<span class="w"> </span><span class="p">.</span><span
class="n">with_ballista_physical_extension_codec</span><span
class="p">(</span><span class="n">Arc</span><span class="p">::</span><span
class="n">new</span><span class="p">(</span><span
class="n">BetterPhysicalCodec</span><span class="p">::</span><span
class="n">default</span><span class="p">()))</span>
-<span class="w"> </span><span class="p">.</span><span
class="n">with_ballista_logical_extension_codec</span><span
class="p">(</span><span class="n">Arc</span><span class="p">::</span><span
class="n">new</span><span class="p">(</span><span
class="n">BetterLogicalCodec</span><span class="p">::</span><span
class="n">default</span><span class="p">()));</span>
-
-<span class="kd">let</span><span class="w"> </span><span
class="n">state</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="n">SessionStateBuilder</span><span
class="p">::</span><span class="n">new</span><span class="p">()</span>
-<span class="w"> </span><span class="p">.</span><span
class="n">with_default_features</span><span class="p">()</span>
-<span class="w"> </span><span class="p">.</span><span
class="n">with_config</span><span class="p">(</span><span
class="n">session_config</span><span class="p">)</span>
-<span class="w"> </span><span class="p">.</span><span
class="n">build</span><span class="p">();</span>
-
-<span class="kd">let</span><span class="w"> </span><span
class="n">ctx</span><span class="p">:</span><span class="w"> </span><span
class="nc">SessionContext</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span
class="n">SessionContext</span><span class="p">::</span><span
class="n">standalone_with_state</span><span class="p">(</span><span
class="n">state</span><span class="p">).</span><span
class="k">await</span><span class="o">?</span><span class="p">;</span>
-</pre></div>
-</div>
-</section>
</section>
@@ -566,10 +549,10 @@ new configuration extensions, object stores, logical and
physical codecs …</p>
<p class="prev-next-title">Frequently Asked Questions</p>
</div>
</a>
- <a class='right-next' id="next-link"
href="../contributors-guide/architecture.html" title="next page">
+ <a class='right-next' id="next-link" href="extensions-example.html"
title="next page">
<div class="prev-next-info">
<p class="prev-next-subtitle">next</p>
- <p class="prev-next-title">Ballista Architecture</p>
+ <p class="prev-next-title">Extensions Example</p>
</div>
<i class="fas fa-angle-right"></i>
</a>
diff --git a/user-guide/extensions-example.html
b/user-guide/extensions-example.html
new file mode 100644
index 000000000..8f0547ea6
--- /dev/null
+++ b/user-guide/extensions-example.html
@@ -0,0 +1,830 @@
+<!DOCTYPE html>
+
+<html lang="en" data-content_root="../">
+ <head>
+ <meta charset="utf-8" />
+ <meta name="viewport" content="width=device-width, initial-scale=1.0"
/><meta name="viewport" content="width=device-width, initial-scale=1" />
+
+ <title>Extensions Example — Apache DataFusion Ballista
documentation</title>
+
+ <link href="../_static/styles/theme.css?digest=1999514e3f237ded88cf"
rel="stylesheet">
+<link
href="../_static/styles/pydata-sphinx-theme.css?digest=1999514e3f237ded88cf"
rel="stylesheet">
+
+
+ <link rel="stylesheet"
+ href="../_static/vendor/fontawesome/5.13.0/css/all.min.css">
+ <link rel="preload" as="font" type="font/woff2" crossorigin
+ href="../_static/vendor/fontawesome/5.13.0/webfonts/fa-solid-900.woff2">
+ <link rel="preload" as="font" type="font/woff2" crossorigin
+ href="../_static/vendor/fontawesome/5.13.0/webfonts/fa-brands-400.woff2">
+
+
+
+
+
+ <link rel="stylesheet" type="text/css"
href="../_static/pygments.css?v=8f2a1f02" />
+ <link rel="stylesheet" type="text/css"
href="../_static/styles/pydata-sphinx-theme.css?v=1140d252" />
+ <link rel="stylesheet" type="text/css"
href="../_static/theme_overrides.css?v=ef9fea58" />
+
+ <link rel="preload" as="script"
href="../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf">
+
+ <script src="../_static/documentation_options.js?v=8a448e45"></script>
+ <script src="../_static/doctools.js?v=9bcbadda"></script>
+ <script src="../_static/sphinx_highlight.js?v=dc90522c"></script>
+ <link rel="index" title="Index" href="../genindex.html" />
+ <link rel="search" title="Search" href="../search.html" />
+ <link rel="next" title="Ballista Architecture"
href="../contributors-guide/architecture.html" />
+ <link rel="prev" title="Extending Ballista Scheduler And Executors"
href="extending-components.html" />
+ <meta name="viewport" content="width=device-width, initial-scale=1" />
+ <meta name="docsearch:language" content="en">
+
+
+ <!-- Google Analytics -->
+
+ </head>
+ <body data-spy="scroll" data-target="#bd-toc-nav" data-offset="80">
+
+ <div class="container-fluid" id="banner"></div>
+
+
+
+
+ <div class="container-xl">
+ <div class="row">
+
+
+ <!-- Only show if we have sidebars configured, else just a small
margin -->
+ <div class="col-12 col-md-3 bd-sidebar">
+ <div class="sidebar-start-items">
+<a class="navbar-brand" href="../index.html">
+ <img src="../_static/images/ballista-logo.png" class="logo" alt="logo">
+</a>
+
+<form class="bd-search d-flex align-items-center" action="../search.html"
method="get">
+ <i class="icon fas fa-search"></i>
+ <input type="search" class="form-control" name="q" id="search-input"
placeholder="Search the docs ..." aria-label="Search the docs ..."
autocomplete="off" >
+</form>
+
+<nav class="bd-links" id="bd-docs-nav" aria-label="Main navigation">
+ <div class="bd-toc-item active">
+
+ <p aria-level="2" class="caption" role="heading">
+ <span class="caption-text">
+ User Guide
+ </span>
+</p>
+<ul class="nav bd-sidenav">
+ <li class="toctree-l1">
+ <a class="reference internal" href="introduction.html">
+ Introduction
+ </a>
+ </li>
+</ul>
+<p aria-level="2" class="caption" role="heading">
+ <span class="caption-text">
+ Cluster Deployment
+ </span>
+</p>
+<ul class="nav bd-sidenav">
+ <li class="toctree-l1 has-children">
+ <a class="reference internal" href="deployment/index.html">
+ Deployment
+ </a>
+ <input class="toctree-checkbox" id="toctree-checkbox-1"
name="toctree-checkbox-1" type="checkbox"/>
+ <label for="toctree-checkbox-1">
+ <i class="fas fa-chevron-down">
+ </i>
+ </label>
+ <ul>
+ <li class="toctree-l2">
+ <a class="reference internal" href="deployment/quick-start.html">
+ Quick Start
+ </a>
+ </li>
+ <li class="toctree-l2">
+ <a class="reference internal" href="deployment/cargo-install.html">
+ Cargo Install
+ </a>
+ </li>
+ <li class="toctree-l2">
+ <a class="reference internal" href="deployment/docker.html">
+ Docker
+ </a>
+ </li>
+ <li class="toctree-l2">
+ <a class="reference internal" href="deployment/docker-compose.html">
+ Docker Compose
+ </a>
+ </li>
+ <li class="toctree-l2">
+ <a class="reference internal" href="deployment/kubernetes.html">
+ Kubernetes
+ </a>
+ </li>
+ </ul>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="scheduler.html">
+ Scheduler
+ </a>
+ </li>
+</ul>
+<p aria-level="2" class="caption" role="heading">
+ <span class="caption-text">
+ Clients
+ </span>
+</p>
+<ul class="nav bd-sidenav">
+ <li class="toctree-l1">
+ <a class="reference internal" href="python.html">
+ Python
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="rust.html">
+ Rust
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="cli.html">
+ SQL CLI
+ </a>
+ </li>
+</ul>
+<p aria-level="2" class="caption" role="heading">
+ <span class="caption-text">
+ Reference
+ </span>
+</p>
+<ul class="current nav bd-sidenav">
+ <li class="toctree-l1">
+ <a class="reference internal" href="configs.html">
+ Configuration
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="tuning-guide.html">
+ Tuning Guide
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="metrics.html">
+ Ballista Scheduler Metrics
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="faq.html">
+ Frequently Asked Questions
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extending-components.html">
+ Extending Ballista Scheduler And Executors
+ </a>
+ </li>
+ <li class="toctree-l1 current active">
+ <a class="current reference internal" href="#">
+ Extensions Example
+ </a>
+ </li>
+</ul>
+<p aria-level="2" class="caption" role="heading">
+ <span class="caption-text">
+ Contributors Guide
+ </span>
+</p>
+<ul class="nav bd-sidenav">
+ <li class="toctree-l1">
+ <a class="reference internal" href="../contributors-guide/architecture.html">
+ Ballista Architecture
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference internal"
href="../contributors-guide/code-organization.html">
+ Ballista Code Organization
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="../contributors-guide/development.html">
+ Ballista Development
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference external"
href="https://github.com/apache/datafusion-ballista/">
+ Source code
+ </a>
+ </li>
+</ul>
+<p aria-level="2" class="caption" role="heading">
+ <span class="caption-text">
+ Community
+ </span>
+</p>
+<ul class="nav bd-sidenav">
+ <li class="toctree-l1">
+ <a class="reference internal" href="../community/communication.html">
+ Communication
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference external"
href="https://github.com/apache/datafusion-ballista/issues">
+ Issue tracker
+ </a>
+ </li>
+ <li class="toctree-l1">
+ <a class="reference external"
href="https://github.com/apache/datafusion-ballista/blob/main/CODE_OF_CONDUCT.md">
+ Code of conduct
+ </a>
+ </li>
+</ul>
+
+
+ </div>
+</nav>
+ </div>
+ <div class="sidebar-end-items">
+ </div>
+ </div>
+
+
+
+
+ <div class="d-none d-xl-block col-xl-2 bd-toc">
+
+
+ <div class="toc-item">
+
+<div class="tocsection onthispage pt-5 pb-3">
+ <i class="fas fa-list"></i> On this page
+</div>
+
+<nav id="bd-toc-nav">
+ <ul class="visible nav section-nav flex-column">
+ <li class="toc-h2 nav-item toc-entry">
+ <a class="reference internal nav-link" href="#logical-plan-extension">
+ Logical Plan Extension
+ </a>
+ </li>
+ <li class="toc-h2 nav-item toc-entry">
+ <a class="reference internal nav-link" href="#dataframe-extension">
+ DataFrame Extension
+ </a>
+ </li>
+ <li class="toc-h2 nav-item toc-entry">
+ <a class="reference internal nav-link" href="#logical-extension-codec">
+ Logical Extension Codec
+ </a>
+ </li>
+ <li class="toc-h2 nav-item toc-entry">
+ <a class="reference internal nav-link"
href="#logical-to-physical-plan-translation">
+ Logical to Physical Plan Translation
+ </a>
+ </li>
+ <li class="toc-h2 nav-item toc-entry">
+ <a class="reference internal nav-link" href="#conclusion">
+ Conclusion
+ </a>
+ </li>
+</ul>
+
+</nav>
+ </div>
+
+ <div class="toc-item">
+
+
+<div class="tocsection editthispage">
+ <a
href="https://github.com/apache/datafusion-ballista/edit/main/docs/source/user-guide/extensions-example.md">
+ <i class="fas fa-pencil-alt"></i> Edit this page
+ </a>
+</div>
+
+ </div>
+
+
+ </div>
+
+
+
+
+
+
+ <main class="col-12 col-md-9 col-xl-7 py-md-5 pl-md-5 pr-md-4
bd-content" role="main">
+
+ <div>
+
+ <!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<section id="extensions-example">
+<h1>Extensions Example<a class="headerlink" href="#extensions-example"
title="Link to this heading">¶</a></h1>
+<p>This project demonstrates possible extensions mechanisms.</p>
+<p>The goal of this small project is to enhance Ballista’s capabilities by
providing new logical and physical operators,
+utilities, and integration tools to support additional data processing
workflows.</p>
+<p>This example will implement <a class="reference external"
href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sample.html">
+<code class="docutils literal notranslate"><span
class="pre">sample()</span></code></a>
+operator which will return a sampled subset of original <code class="docutils
literal notranslate"><span class="pre">DataFrame</span></code>:</p>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="kd">let</span><span class="w">
</span><span class="n">ctx</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span
class="n">SessionContext</span><span class="p">::</span><span
class="n">remote_with_state</span><span class="p">(</span><span
class="s">"df://localhost:50050"</span><span class="p">,</span><span
class="w"> </span><span class="n">state</span><span [...]
+<span class="kd">let</span><span class="w"> </span><span
class="n">df</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="n">ctx</span><span class="p">.</span><span
class="n">read_parquet</span><span class="p">(</span><span
class="s">"data/"</span><span class="p">,</span><span class="w">
</span><span class="nb">Default</span><span class="p">::</span><span
class="n">default</span><span class="w"> </span><span
class="p">()).</span><span clas [...]
+
+<span class="c1">// The `sample` operator, defined in this project,</span>
+<span class="c1">// samples 30% of the data and displays the result.</span>
+<span class="kd">let</span><span class="w"> </span><span
class="n">df</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="n">df</span><span class="p">.</span><span
class="n">sample</span><span class="p">(</span><span
class="mf">0.30</span><span class="p">,</span><span class="w"> </span><span
class="nb">None</span><span class="p">)</span><span class="w"> </span><span
class="o">?</span><span class="p">;</span>
+</pre></div>
+</div>
+<p>To implement this functionality, it is necessary to implement new logical
plan extension, physical operators and extend
+<code class="docutils literal notranslate"><span
class="pre">DataFrame</span></code> to expose new operator.</p>
+<blockquote>
+<div><p>[!WARNING]<br />
+Please do not use implemented sampling operator for production, statisticians
would not approve it, probably.</p>
+</div></blockquote>
+<p>This demo will provide:</p>
+<ul class="simple">
+<li><p>Custom DataFusion (logical and physical) nodes.</p></li>
+<li><p>Logical and physical extension codecs.</p></li>
+<li><p>Custom protocol buffer definitions.</p></li>
+<li><p>Extension query planner.</p></li>
+</ul>
+<section id="logical-plan-extension">
+<h2>Logical Plan Extension<a class="headerlink" href="#logical-plan-extension"
title="Link to this heading">¶</a></h2>
+<p>The first step is to implement a custom logical plan extension:</p>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="sd">//! This module defines
the implementation of the `UserDefinedLogicalNodeCore` trait for the `Sample`
logical plan node.</span>
+<span class="sd">//!</span>
+<span class="sd">//! The `Sample` node represents a custom logical plan
extension for sampling data within a query plan.</span>
+<span class="sd">//!</span>
+<span class="k">use</span><span class="w"> </span><span
class="n">std</span><span class="p">::{</span><span class="n">hash</span><span
class="p">::</span><span class="n">Hash</span><span class="p">,</span><span
class="w"> </span><span class="n">vec</span><span class="p">};</span>
+
+<span class="k">use</span><span class="w"> </span><span
class="n">datafusion</span><span class="p">::{</span>
+<span class="w"> </span><span class="n">error</span><span
class="p">::</span><span class="n">DataFusionError</span><span
class="p">,</span>
+<span class="w"> </span><span class="n">logical_expr</span><span
class="p">::{</span><span class="n">LogicalPlan</span><span
class="p">,</span><span class="w"> </span><span
class="n">UserDefinedLogicalNodeCore</span><span class="p">},</span>
+<span class="p">};</span>
+
+<span class="cp">#[derive(Debug, Clone, PartialEq, PartialOrd)]</span>
+<span class="k">pub</span><span class="w"> </span><span
class="k">struct</span><span class="w"> </span><span
class="nc">Sample</span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="k">pub</span><span class="w">
</span><span class="n">fraction</span><span class="p">:</span><span class="w">
</span><span class="kt">f32</span><span class="p">,</span>
+<span class="w"> </span><span class="k">pub</span><span class="w">
</span><span class="n">seed</span><span class="p">:</span><span class="w">
</span><span class="nb">Option</span><span class="o"><</span><span
class="kt">i64</span><span class="o">></span><span class="p">,</span>
+<span class="w"> </span><span class="k">pub</span><span class="w">
</span><span class="n">input</span><span class="p">:</span><span class="w">
</span><span class="nc">LogicalPlan</span><span class="p">,</span>
+<span class="p">}</span>
+
+<span class="k">impl</span><span class="w"> </span><span
class="n">Hash</span><span class="w"> </span><span class="k">for</span><span
class="w"> </span><span class="n">Sample</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">hash</span><span class="o"><</span><span
class="n">H</span><span class="p">:</span><span class="w"> </span><span
class="nc">std</span><span class="p">::</span><span class="n">hash</span><span
class="p">::</span><span class="n">Hasher</span><span
class="o">></span><span class="p">(</span><span class="o">&</span><span
class="bp">self</span><span class="p">,</span><span class="w"> </span>< [...]
+<span class="w"> </span><span class="bp">self</span><span
class="p">.</span><span class="n">seed</span><span class="p">.</span><span
class="n">hash</span><span class="p">(</span><span class="n">state</span><span
class="p">);</span>
+<span class="w"> </span><span class="bp">self</span><span
class="p">.</span><span class="n">input</span><span class="p">.</span><span
class="n">hash</span><span class="p">(</span><span class="n">state</span><span
class="p">);</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="p">}</span>
+
+<span class="k">impl</span><span class="w"> </span><span
class="nb">Eq</span><span class="w"> </span><span class="k">for</span><span
class="w"> </span><span class="n">Sample</span><span class="w"> </span><span
class="p">{}</span>
+
+<span class="k">impl</span><span class="w"> </span><span
class="n">Sample</span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="k">pub</span><span class="w">
</span><span class="k">fn</span><span class="w"> </span><span
class="nf">new</span><span class="p">(</span><span
class="n">fraction</span><span class="p">:</span><span class="w"> </span><span
class="kt">f32</span><span class="p">,</span><span class="w"> </span><span
class="n">seed</span><span class="p">:</span><span class="w"> </span><span
class="nb">Option</span><span class="o"><</span><span
class="kt">i64</span><s [...]
+<span class="w"> </span><span class="bp">Self</span><span class="w">
</span><span class="p">{</span>
+<span class="w"> </span><span class="n">fraction</span><span
class="p">,</span>
+<span class="w"> </span><span class="n">seed</span><span
class="p">,</span>
+<span class="w"> </span><span class="n">input</span><span
class="p">,</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="p">}</span>
+
+<span class="k">impl</span><span class="w"> </span><span
class="n">UserDefinedLogicalNodeCore</span><span class="w"> </span><span
class="k">for</span><span class="w"> </span><span class="n">Sample</span><span
class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">name</span><span class="p">(</span><span
class="o">&</span><span class="bp">self</span><span class="p">)</span><span
class="w"> </span><span class="p">-></span><span class="w"> </span><span
class="kp">&</span><span class="kt">str</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="s">"Sample"</span>
+<span class="w"> </span><span class="p">}</span>
+
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">inputs</span><span class="p">(</span><span
class="o">&</span><span class="bp">self</span><span class="p">)</span><span
class="w"> </span><span class="p">-></span><span class="w"> </span><span
class="nb">Vec</span><span class="o"><&</span><span
class="n">LogicalPlan</span><span class="o">></span><span class="w">
</span><span class="p">{</span>
+<span class="w"> </span><span class="fm">vec!</span><span
class="p">[</span><span class="o">&</span><span class="bp">self</span><span
class="p">.</span><span class="n">input</span><span class="p">]</span>
+<span class="w"> </span><span class="p">}</span>
+
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">schema</span><span class="p">(</span><span
class="o">&</span><span class="bp">self</span><span class="p">)</span><span
class="w"> </span><span class="p">-></span><span class="w"> </span><span
class="kp">&</span><span class="nc">datafusion</span><span
class="p">::</span><span class="n">common</span><span class="p">::</span><span
class="n">DFSchemaRef</span><span class="w"> </span><span [...]
+<span class="w"> </span><span class="bp">self</span><span
class="p">.</span><span class="n">input</span><span class="p">.</span><span
class="n">schema</span><span class="p">()</span>
+<span class="w"> </span><span class="p">}</span>
+
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">expressions</span><span class="p">(</span><span
class="o">&</span><span class="bp">self</span><span class="p">)</span><span
class="w"> </span><span class="p">-></span><span class="w"> </span><span
class="nb">Vec</span><span class="o"><</span><span
class="n">datafusion</span><span class="p">::</span><span
class="n">prelude</span><span class="p">::</span><span
class="n">Expr</span><span c [...]
+<span class="w"> </span><span class="fm">vec!</span><span
class="p">[]</span>
+<span class="w"> </span><span class="p">}</span>
+
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">fmt_for_explain</span><span class="p">(</span><span
class="o">&</span><span class="bp">self</span><span class="p">,</span><span
class="w"> </span><span class="n">f</span><span class="p">:</span><span
class="w"> </span><span class="kp">&</span><span class="nc">mut</span><span
class="w"> </span><span class="n">std</span><span class="p">::</span><span
class="n">fmt</span><span class="p">::</ [...]
+<span class="w"> </span><span class="n">f</span><span
class="p">.</span><span class="n">write_fmt</span><span class="p">(</span><span
class="fm">format_args!</span><span class="p">(</span>
+<span class="w"> </span><span class="s">"Sample: fraction: {},
seed: {:?}"</span><span class="p">,</span>
+<span class="w"> </span><span class="bp">self</span><span
class="p">.</span><span class="n">fraction</span><span class="p">,</span><span
class="w"> </span><span class="bp">self</span><span class="p">.</span><span
class="n">seed</span>
+<span class="w"> </span><span class="p">))</span><span
class="o">?</span><span class="p">;</span>
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(())</span>
+<span class="w"> </span><span class="p">}</span>
+
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">with_exprs_and_inputs</span><span class="p">(</span>
+<span class="w"> </span><span class="o">&</span><span
class="bp">self</span><span class="p">,</span>
+<span class="w"> </span><span class="n">_exprs</span><span
class="p">:</span><span class="w"> </span><span class="nb">Vec</span><span
class="o"><</span><span class="n">datafusion</span><span
class="p">::</span><span class="n">prelude</span><span class="p">::</span><span
class="n">Expr</span><span class="o">></span><span class="p">,</span>
+<span class="w"> </span><span class="n">inputs</span><span
class="p">:</span><span class="w"> </span><span class="nb">Vec</span><span
class="o"><</span><span class="n">LogicalPlan</span><span
class="o">></span><span class="p">,</span>
+<span class="w"> </span><span class="p">)</span><span class="w">
</span><span class="p">-></span><span class="w"> </span><span
class="nc">datafusion</span><span class="p">::</span><span
class="n">error</span><span class="p">::</span><span
class="nb">Result</span><span class="o"><</span><span
class="bp">Self</span><span class="o">></span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(</span><span class="bp">Self</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="n">seed</span><span
class="p">:</span><span class="w"> </span><span class="nc">self</span><span
class="p">.</span><span class="n">seed</span><span class="p">,</span>
+<span class="w"> </span><span class="n">fraction</span><span
class="p">:</span><span class="w"> </span><span class="nc">self</span><span
class="p">.</span><span class="n">fraction</span><span class="p">,</span>
+<span class="w"> </span><span class="n">input</span><span
class="p">:</span><span class="w"> </span><span class="nc">inputs</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">first</span><span class="p">()</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">ok_or</span><span class="p">(</span><span
class="n">DataFusionError</span><span class="p">::</span><span
class="n">Plan</span><span class="p">(</span><span class="s">"expected
single input"</span><span class="p">.</span><span
class="n">to_string</span><span class="p">()))</span><span class="o">?</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">clone</span><span class="p">(),</span>
+<span class="w"> </span><span class="p">})</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="p">}</span>
+</pre></div>
+</div>
+</section>
+<section id="dataframe-extension">
+<h2>DataFrame Extension<a class="headerlink" href="#dataframe-extension"
title="Link to this heading">¶</a></h2>
+<p>To expose this functionality to end users, a DataFrame extension] is
implemented. This extension creates a
+<code class="docutils literal notranslate"><span
class="pre">LogicalPlan::Extension(extension)</span></code> node:</p>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="k">use</span><span class="w">
</span><span class="n">std</span><span class="p">::</span><span
class="n">sync</span><span class="p">::</span><span class="n">Arc</span><span
class="p">;</span>
+
+<span class="k">use</span><span class="w"> </span><span
class="n">datafusion</span><span class="p">::{</span>
+<span class="w"> </span><span class="n">error</span><span
class="p">::</span><span class="n">DataFusionError</span><span
class="p">,</span>
+<span class="w"> </span><span class="n">logical_expr</span><span
class="p">::{</span><span class="n">Extension</span><span
class="p">,</span><span class="w"> </span><span
class="n">LogicalPlan</span><span class="p">},</span>
+<span class="w"> </span><span class="n">prelude</span><span
class="p">::</span><span class="n">DataFrame</span><span class="p">,</span>
+<span class="p">};</span>
+
+<span class="k">use</span><span class="w"> </span><span
class="k">crate</span><span class="p">::</span><span
class="n">logical</span><span class="p">::</span><span
class="n">sample_extension</span><span class="p">::</span><span
class="n">Sample</span><span class="p">;</span>
+
+<span class="k">pub</span><span class="w"> </span><span
class="k">trait</span><span class="w"> </span><span
class="n">DataFrameExt</span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">sample</span><span class="p">(</span><span
class="bp">self</span><span class="p">,</span><span class="w"> </span><span
class="n">fraction</span><span class="p">:</span><span class="w"> </span><span
class="kt">f32</span><span class="p">,</span><span class="w"> </span><span
class="n">seed</span><span class="p">:</span><span class="w"> </span><span
class="nb">Option</span><span class="o"><</span> [...]
+<span class="p">}</span>
+
+<span class="sd">/// Returns a new `DataFrame` containing a random sample of
rows from the original `DataFrame`.</span>
+<span class="sd">///</span>
+<span class="sd">/// # Arguments</span>
+<span class="sd">///</span>
+<span class="sd">/// * `fraction` - The fraction of rows to sample, must be in
the range (0.0, 1.0].</span>
+<span class="sd">/// * `seed` - An optional seed for the random number
generator to ensure reproducibility.</span>
+<span class="sd">///</span>
+<span class="sd">/// # Errors</span>
+<span class="sd">///</span>
+<span class="sd">/// Returns a `DataFusionError::Configuration` if `fraction`
is not within the valid range.</span>
+<span class="sd">///</span>
+<span class="k">impl</span><span class="w"> </span><span
class="n">DataFrameExt</span><span class="w"> </span><span
class="k">for</span><span class="w"> </span><span
class="n">DataFrame</span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">sample</span><span class="p">(</span><span
class="bp">self</span><span class="p">,</span><span class="w"> </span><span
class="n">fraction</span><span class="p">:</span><span class="w"> </span><span
class="kt">f32</span><span class="p">,</span><span class="w"> </span><span
class="n">seed</span><span class="p">:</span><span class="w"> </span><span
class="nb">Option</span><span class="o"><</span> [...]
+<span class="w"> </span><span class="k">if</span><span class="w">
</span><span class="o">!</span><span class="p">(</span><span
class="n">fraction</span><span class="w"> </span><span
class="o">></span><span class="w"> </span><span class="mf">0.0</span><span
class="w"> </span><span class="o">&&</span><span class="w">
</span><span class="n">fraction</span><span class="w"> </span><span
class="o"><=</span><span class="w"> </span><span class="mf">1.0</span><span
class="p"> [...]
+<span class="w"> </span><span class="nb">Err</span><span
class="p">(</span><span class="n">DataFusionError</span><span
class="p">::</span><span class="n">Configuration</span><span class="p">(</span>
+<span class="w"> </span><span class="s">"fraction should
be in 0 ..= 1 range"</span><span class="p">.</span><span
class="n">to_string</span><span class="p">(),</span>
+<span class="w"> </span><span class="p">))</span><span
class="o">?</span>
+<span class="w"> </span><span class="p">}</span>
+
+<span class="w"> </span><span class="k">if</span><span class="w">
</span><span class="n">seed</span><span class="p">.</span><span
class="n">unwrap_or</span><span class="p">(</span><span
class="mi">0</span><span class="p">)</span><span class="w"> </span><span
class="o"><</span><span class="w"> </span><span class="mi">0</span><span
class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="nb">Err</span><span
class="p">(</span><span class="n">DataFusionError</span><span
class="p">::</span><span class="n">Configuration</span><span class="p">(</span>
+<span class="w"> </span><span class="s">"seed should be
positive number"</span><span class="p">.</span><span
class="n">to_string</span><span class="p">(),</span>
+<span class="w"> </span><span class="p">))</span><span
class="o">?</span>
+<span class="w"> </span><span class="p">}</span>
+
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="p">(</span><span class="n">state</span><span
class="p">,</span><span class="w"> </span><span class="n">input</span><span
class="p">)</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="bp">self</span><span class="p">.</span><span
class="n">into_parts</span><span class="p">();</span>
+
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">node</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">Arc</span><span
class="p">::</span><span class="n">new</span><span class="p">(</span><span
class="n">Sample</span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="n">fraction</span><span
class="p">,</span>
+<span class="w"> </span><span class="n">seed</span><span
class="p">,</span>
+<span class="w"> </span><span class="n">input</span><span
class="p">,</span>
+<span class="w"> </span><span class="p">});</span>
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">extension</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">Extension</span><span
class="w"> </span><span class="p">{</span><span class="w"> </span><span
class="n">node</span><span class="w"> </span><span class="p">};</span>
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">plan</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span
class="n">LogicalPlan</span><span class="p">::</span><span
class="n">Extension</span><span class="p">(</span><span
class="n">extension</span><span class="p">);</span>
+
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(</span><span class="n">DataFrame</span><span
class="p">::</span><span class="n">new</span><span class="p">(</span><span
class="n">state</span><span class="p">,</span><span class="w"> </span><span
class="n">plan</span><span class="p">))</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="p">}</span>
+</pre></div>
+</div>
+<p>This approach enables the addition of new methods to the DataFusion
DataFrame implementation:</p>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="kd">let</span><span class="w">
</span><span class="n">ctx</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span
class="n">SessionContext</span><span class="p">::</span><span
class="n">remote_with_state</span><span class="p">(</span><span
class="s">"df://localhost:50050"</span><span class="p">,</span><span
class="w"> </span><span class="n">state</span><span [...]
+<span class="kd">let</span><span class="w"> </span><span
class="n">df</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="n">ctx</span><span class="p">.</span><span
class="n">read_parquet</span><span class="p">(</span><span
class="s">"data/"</span><span class="p">,</span><span class="w">
</span><span class="nb">Default</span><span class="p">::</span><span
class="n">default</span><span class="w"> </span><span
class="p">()).</span><span clas [...]
+
+<span class="c1">// The DataFrame extension provides the `sample` method</span>
+<span class="kd">let</span><span class="w"> </span><span
class="n">df</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="n">df</span><span class="p">.</span><span
class="n">sample</span><span class="p">(</span><span
class="mf">0.30</span><span class="p">,</span><span class="w"> </span><span
class="nb">None</span><span class="p">)</span><span class="w"> </span><span
class="o">?</span><span class="p">;</span>
+</pre></div>
+</div>
+<p><img alt="diagram" src="../_images/ballista_extensions.excalidraw.svg"
/></p>
+</section>
+<section id="logical-extension-codec">
+<h2>Logical Extension Codec<a class="headerlink"
href="#logical-extension-codec" title="Link to this heading">¶</a></h2>
+<p>With the extension in place, a custom logical extension codec is required
to transmit the client logical plan to the
+scheduler.</p>
+<p>The logical extension codec typically consists of two components: Google
Protocol Buffer definitions:</p>
+<div class="highlight-proto notranslate"><div
class="highlight"><pre><span></span><span class="kd">message</span><span
class="w"> </span><span class="nc">LMessage</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="k">oneof</span><span class="w">
</span><span class="n">Extension</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="n">LSample</span><span class="w">
</span><span class="na">sample</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="mi">1</span><span
class="p">;</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="p">}</span>
+
+<span class="kd">message</span><span class="w"> </span><span
class="nc">LSample</span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="kt">float</span><span class="w">
</span><span class="na">fraction</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="mi">1</span><span
class="p">;</span>
+<span class="w"> </span><span class="k">optional</span><span class="w">
</span><span class="kt">int64</span><span class="w"> </span><span
class="na">seed</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="mi">2</span><span class="p">;</span>
+<span class="p">}</span>
+</pre></div>
+</div>
+<p><code class="docutils literal notranslate"><span
class="pre">LogicalExtensionCodec</span></code> extends <code class="docutils
literal notranslate"><span
class="pre">BallistaLogicalExtensionCodec</span></code> handling newly defined
operator messages:</p>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="cp">#[derive(Debug,
Default)]</span>
+<span class="k">pub</span><span class="w"> </span><span
class="k">struct</span><span class="w"> </span><span
class="nc">ExtendedBallistaLogicalCodec</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="n">inner</span><span
class="p">:</span><span class="w"> </span><span
class="nc">BallistaLogicalExtensionCodec</span><span class="p">,</span>
+<span class="p">}</span>
+
+<span class="k">impl</span><span class="w"> </span><span
class="n">LogicalExtensionCodec</span><span class="w"> </span><span
class="k">for</span><span class="w"> </span><span
class="n">ExtendedBallistaLogicalCodec</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">try_decode</span><span class="p">(</span>
+<span class="w"> </span><span class="o">&</span><span
class="bp">self</span><span class="p">,</span>
+<span class="w"> </span><span class="n">buf</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="p">[</span><span class="kt">u8</span><span class="p">],</span>
+<span class="w"> </span><span class="n">inputs</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="p">[</span><span class="n">datafusion</span><span
class="p">::</span><span class="n">logical_expr</span><span
class="p">::</span><span class="n">LogicalPlan</span><span class="p">],</span>
+<span class="w"> </span><span class="n">_ctx</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="nc">datafusion</span><span class="p">::</span><span
class="n">prelude</span><span class="p">::</span><span
class="n">SessionContext</span><span class="p">,</span>
+<span class="w"> </span><span class="p">)</span><span class="w">
</span><span class="p">-></span><span class="w"> </span><span
class="nc">datafusion</span><span class="p">::</span><span
class="n">error</span><span class="p">::</span><span
class="nb">Result</span><span class="o"><</span><span
class="n">datafusion</span><span class="p">::</span><span
class="n">logical_expr</span><span class="p">::</span><span
class="n">Extension</span><span class="o">></span><span class="w"> </ [...]
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">message</span><span class="w"> </span><span
class="o">=</span>
+<span class="w"> </span><span class="n">LMessage</span><span
class="p">::</span><span class="n">decode</span><span class="p">(</span><span
class="n">buf</span><span class="p">).</span><span
class="n">map_err</span><span class="p">(</span><span class="o">|</span><span
class="n">e</span><span class="o">|</span><span class="w"> </span><span
class="n">DataFusionError</span><span class="p">::</span><span
class="n">Internal</span><span class="p">(</span><span class="n">e</span><span
[...]
+
+<span class="w"> </span><span class="k">match</span><span class="w">
</span><span class="n">message</span><span class="p">.</span><span
class="n">extension</span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="nb">Some</span><span
class="p">(</span><span class="n">Extension</span><span
class="p">::</span><span class="n">Sample</span><span class="p">(</span><span
class="n">sample</span><span class="p">))</span><span class="w"> </span><span
class="o">=></span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="kd">let</span><span
class="w"> </span><span class="n">node</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">Arc</span><span
class="p">::</span><span class="n">new</span><span class="p">(</span><span
class="n">Sample</span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="n">input</span><span
class="p">:</span><span class="w"> </span><span class="nc">inputs</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">first</span><span class="p">()</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">ok_or</span><span class="p">(</span><span
class="n">DataFusionError</span><span class="p">::</span><span
class="n">Plan</span><span class="p">(</span><span class="s">"expected
input"</span><span class="p">.</span><span class="n">to_string</span><span
class="p">()))</span><span class="o">?</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">clone</span><span class="p">(),</span>
+<span class="w"> </span><span class="n">seed</span><span
class="p">:</span><span class="w"> </span><span class="nc">sample</span><span
class="p">.</span><span class="n">seed</span><span class="p">,</span>
+<span class="w"> </span><span
class="n">fraction</span><span class="p">:</span><span class="w"> </span><span
class="nc">sample</span><span class="p">.</span><span
class="n">fraction</span><span class="p">,</span>
+<span class="w"> </span><span class="p">});</span>
+
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(</span><span class="n">datafusion</span><span
class="p">::</span><span class="n">logical_expr</span><span
class="p">::</span><span class="n">Extension</span><span class="w">
</span><span class="p">{</span><span class="w"> </span><span
class="n">node</span><span class="w"> </span><span class="p">})</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="w"> </span><span class="nb">None</span><span
class="w"> </span><span class="o">=></span><span class="w"> </span><span
class="n">plan_err</span><span class="o">!</span><span class="p">(</span><span
class="s">"Can't cast logical extension "</span><span
class="p">),</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="w"> </span><span class="p">}</span>
+
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">try_encode</span><span class="p">(</span>
+<span class="w"> </span><span class="o">&</span><span
class="bp">self</span><span class="p">,</span>
+<span class="w"> </span><span class="n">node</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="nc">datafusion</span><span class="p">::</span><span
class="n">logical_expr</span><span class="p">::</span><span
class="n">Extension</span><span class="p">,</span>
+<span class="w"> </span><span class="n">buf</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="nc">mut</span><span class="w"> </span><span class="nb">Vec</span><span
class="o"><</span><span class="kt">u8</span><span class="o">></span><span
class="p">,</span>
+<span class="w"> </span><span class="p">)</span><span class="w">
</span><span class="p">-></span><span class="w"> </span><span
class="nc">datafusion</span><span class="p">::</span><span
class="n">error</span><span class="p">::</span><span
class="nb">Result</span><span class="o"><</span><span
class="p">()</span><span class="o">></span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="k">if</span><span class="w">
</span><span class="kd">let</span><span class="w"> </span><span
class="nb">Some</span><span class="p">(</span><span
class="n">Sample</span><span class="w"> </span><span class="p">{</span><span
class="w"> </span><span class="n">seed</span><span class="p">,</span><span
class="w"> </span><span class="n">fraction</span><span class="p">,</span><span
class="w"> </span><span class="o">..</span><span class="w"> </span><span [...]
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">sample</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">LSample</span><span
class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="n">seed</span><span
class="p">:</span><span class="w"> </span><span class="o">*</span><span
class="n">seed</span><span class="p">,</span>
+<span class="w"> </span><span class="n">fraction</span><span
class="p">:</span><span class="w"> </span><span class="o">*</span><span
class="n">fraction</span><span class="p">,</span>
+<span class="w"> </span><span class="p">};</span>
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">message</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">LMessage</span><span
class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="n">extension</span><span
class="p">:</span><span class="w"> </span><span class="nb">Some</span><span
class="p">(</span><span class="k">super</span><span class="p">::</span><span
class="n">messages</span><span class="p">::</span><span
class="n">l_message</span><span class="p">::</span><span
class="n">Extension</span><span class="p">::</span><span
class="n">Sample</span><span class="p">(</span><span
class="n">sample</span><span class="p"> [...]
+<span class="w"> </span><span class="p">};</span>
+
+<span class="w"> </span><span class="n">message</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">encode</span><span class="p">(</span><span class="n">buf</span><span
class="p">)</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">map_err</span><span class="p">(</span><span class="o">|</span><span
class="n">e</span><span class="o">|</span><span class="w"> </span><span
class="n">DataFusionError</span><span class="p">::</span><span
class="n">Internal</span><span class="p">(</span><span class="n">e</span><span
class="p">.</span><span class="n">to_string</span><span
class="p">()))</span><span class="o">?</span><span class="p">;</span>
+
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(())</span>
+<span class="w"> </span><span class="p">}</span><span class="w">
</span><span class="k">else</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="bp">self</span><span
class="p">.</span><span class="n">inner</span><span class="p">.</span><span
class="n">try_encode</span><span class="p">(</span><span
class="n">node</span><span class="p">,</span><span class="w"> </span><span
class="n">buf</span><span class="p">)</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="w"> </span><span class="c1">// Additional implementation
omitted for brevity</span>
+<span class="p">}</span>
+</pre></div>
+</div>
+<p>in short,implementation of the <code class="docutils literal
notranslate"><span class="pre">LogicalExtensionCodec</span></code> trait, which
handles conversion between Rust structures and
+protocol buffer definitions.</p>
+</section>
+<section id="logical-to-physical-plan-translation">
+<h2>Logical to Physical Plan Translation<a class="headerlink"
href="#logical-to-physical-plan-translation" title="Link to this
heading">¶</a></h2>
+<p>Once the logical plan extension is provided, a translation from the logical
node to a physical node is required. The
+transformation is performed using implementing <code class="docutils literal
notranslate"><span class="pre">ExtensionPlanner</span></code> trait:</p>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="cp">#[derive(Debug, Clone,
Default)]</span>
+<span class="k">pub</span><span class="w"> </span><span
class="k">struct</span><span class="w"> </span><span
class="nc">CustomPlannerExtension</span><span class="w"> </span><span
class="p">{}</span>
+
+<span class="cp">#[async_trait]</span>
+<span class="k">impl</span><span class="w"> </span><span
class="n">ExtensionPlanner</span><span class="w"> </span><span
class="k">for</span><span class="w"> </span><span
class="n">CustomPlannerExtension</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="k">async</span><span class="w">
</span><span class="k">fn</span><span class="w"> </span><span
class="nf">plan_extension</span><span class="p">(</span>
+<span class="w"> </span><span class="o">&</span><span
class="bp">self</span><span class="p">,</span>
+<span class="w"> </span><span class="n">_planner</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="nc">dyn</span><span class="w"> </span><span
class="n">PhysicalPlanner</span><span class="p">,</span>
+<span class="w"> </span><span class="n">node</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="nc">dyn</span><span class="w"> </span><span
class="n">UserDefinedLogicalNode</span><span class="p">,</span>
+<span class="w"> </span><span class="n">_logical_inputs</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="p">[</span><span class="o">&</span><span
class="n">LogicalPlan</span><span class="p">],</span>
+<span class="w"> </span><span class="n">physical_inputs</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="p">[</span><span class="n">Arc</span><span class="o"><</span><span
class="k">dyn</span><span class="w"> </span><span
class="n">ExecutionPlan</span><span class="o">></span><span
class="p">],</span>
+<span class="w"> </span><span class="n">_session_state</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="nc">SessionState</span><span class="p">,</span>
+<span class="w"> </span><span class="p">)</span><span class="w">
</span><span class="p">-></span><span class="w"> </span><span
class="nc">datafusion</span><span class="p">::</span><span
class="n">error</span><span class="p">::</span><span
class="nb">Result</span><span class="o"><</span><span
class="nb">Option</span><span class="o"><</span><span
class="n">Arc</span><span class="o"><</span><span class="k">dyn</span><span
class="w"> </span><span class="n">ExecutionPlan</span> [...]
+<span class="w"> </span><span class="k">if</span><span class="w">
</span><span class="kd">let</span><span class="w"> </span><span
class="nb">Some</span><span class="p">(</span><span
class="n">Sample</span><span class="w"> </span><span class="p">{</span><span
class="w"> </span><span class="n">fraction</span><span class="p">,</span><span
class="w"> </span><span class="n">seed</span><span class="p">,</span><span
class="w"> </span><span class="o">..</span><span class="w"> </span><span [...]
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">input</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">physical_inputs</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">first</span><span class="p">()</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">ok_or</span><span class="p">(</span><span
class="n">DataFusionError</span><span class="p">::</span><span
class="n">Plan</span><span class="p">(</span><span class="s">"expected
single input"</span><span class="p">.</span><span
class="n">to_string</span><span class="p">()))</span><span class="o">?</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">clone</span><span class="p">();</span>
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">node</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span
class="n">SampleExec</span><span class="p">::</span><span
class="n">new</span><span class="p">(</span><span class="o">*</span><span
class="n">fraction</span><span class="p">,</span><span class="w"> </span><span
class="o">*</span><span class="n">seed</span><span class="p">,</span><span
class="w"> </sp [...]
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">node</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">Arc</span><span
class="p">::</span><span class="n">new</span><span class="p">(</span><span
class="n">node</span><span class="p">);</span>
+
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(</span><span class="nb">Some</span><span class="p">(</span><span
class="n">node</span><span class="p">))</span>
+<span class="w"> </span><span class="p">}</span><span class="w">
</span><span class="k">else</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(</span><span class="nb">None</span><span class="p">)</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="p">}</span>
+</pre></div>
+</div>
+<p>The custom planner is registered in the session state as follows:</p>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="kd">let</span><span class="w">
</span><span class="n">query_planner</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">Arc</span><span
class="p">::</span><span class="n">new</span><span class="p">(</span><span
class="n">QueryPlannerWithExtensions</span><span class="p">::</span><span
class="n">default</span><span class="w"> </span><span class="p">());</span>
+
+<span class="kd">let</span><span class="w"> </span><span
class="n">state</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="n">SessionStateBuilder</span><span
class="p">::</span><span class="n">new</span><span class="p">()</span>
+<span class="p">.</span><span class="n">with_query_planner</span><span
class="p">(</span><span class="n">query_planner</span><span class="p">)</span>
+<span class="p">.</span><span class="n">with_default_features</span><span
class="p">()</span>
+<span class="p">.</span><span class="n">build</span><span class="p">();</span>
+</pre></div>
+</div>
+<p>Finally, the generated physical plan is serialized using the physical plan
extension codec and
+transmitted to the executor(s). Implementation is an extension of <code
class="docutils literal notranslate"><span
class="pre">BallistaPhysicalExtensionCodec</span></code>:</p>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="cp">#[derive(Debug,
Default)]</span>
+<span class="k">pub</span><span class="w"> </span><span
class="k">struct</span><span class="w"> </span><span
class="nc">ExtendedBallistaPhysicalCodec</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="n">inner</span><span
class="p">:</span><span class="w"> </span><span
class="nc">BallistaPhysicalExtensionCodec</span><span class="p">,</span>
+<span class="p">}</span>
+
+<span class="k">impl</span><span class="w"> </span><span
class="n">PhysicalExtensionCodec</span><span class="w"> </span><span
class="k">for</span><span class="w"> </span><span
class="n">ExtendedBallistaPhysicalCodec</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">try_decode</span><span class="p">(</span>
+<span class="w"> </span><span class="o">&</span><span
class="bp">self</span><span class="p">,</span>
+<span class="w"> </span><span class="n">buf</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="p">[</span><span class="kt">u8</span><span class="p">],</span>
+<span class="w"> </span><span class="n">inputs</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="p">[</span><span class="n">std</span><span class="p">::</span><span
class="n">sync</span><span class="p">::</span><span class="n">Arc</span><span
class="o"><</span><span class="k">dyn</span><span class="w"> </span><span
class="n">datafusion</span><span class="p">::</span><span
class="n">physical_plan</span><span class="p">::</span><span [...]
+<span class="w"> </span><span class="n">registry</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="nc">dyn</span><span class="w"> </span><span
class="n">datafusion</span><span class="p">::</span><span
class="n">execution</span><span class="p">::</span><span
class="n">FunctionRegistry</span><span class="p">,</span>
+<span class="w"> </span><span class="p">)</span><span class="w">
</span><span class="p">-></span><span class="w"> </span><span
class="nc">datafusion</span><span class="p">::</span><span
class="n">error</span><span class="p">::</span><span
class="nb">Result</span><span class="o"><</span><span
class="n">std</span><span class="p">::</span><span class="n">sync</span><span
class="p">::</span><span class="n">Arc</span><span class="o"><</span><span
class="k">dyn</span><span class="w [...]
+<span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">message</span><span class="w"> </span><span
class="o">=</span>
+<span class="w"> </span><span class="n">PMessage</span><span
class="p">::</span><span class="n">decode</span><span class="p">(</span><span
class="n">buf</span><span class="p">).</span><span
class="n">map_err</span><span class="p">(</span><span class="o">|</span><span
class="n">e</span><span class="o">|</span><span class="w"> </span><span
class="n">DataFusionError</span><span class="p">::</span><span
class="n">Internal</span><span class="p">(</span><span class="n">e</span><span
[...]
+
+<span class="w"> </span><span class="k">match</span><span class="w">
</span><span class="n">message</span><span class="p">.</span><span
class="n">extension</span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="nb">Some</span><span
class="p">(</span><span class="k">super</span><span class="p">::</span><span
class="n">messages</span><span class="p">::</span><span
class="n">p_message</span><span class="p">::</span><span
class="n">Extension</span><span class="p">::</span><span
class="n">Sample</span><span class="p">(</span><span
class="n">PSample</span><span class="w"> </span><span class="p">{</span>
+<span class="w">
</span><span class="n">fraction</span><span class="p">,</span><span
class="w"> </span><span class="n">seed</span><span class="p">,</span><span
class="w"> </span><span class="o">..</span>
+<span class="w">
</span><span class="p">}))</span><span class="w"> </span><span
class="o">=></span><span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="kd">let</span><span
class="w"> </span><span class="n">input</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">inputs</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">first</span><span class="p">()</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">ok_or</span><span class="p">(</span><span
class="n">DataFusionError</span><span class="p">::</span><span
class="n">Plan</span><span class="p">(</span><span class="s">"expected
input"</span><span class="p">.</span><span class="n">to_string</span><span
class="p">()))</span><span class="o">?</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">clone</span><span class="p">();</span>
+
+<span class="w"> </span><span class="kd">let</span><span
class="w"> </span><span class="n">node</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">Arc</span><span
class="p">::</span><span class="n">new</span><span class="p">(</span><span
class="n">SampleExec</span><span class="p">::</span><span
class="n">new</span><span class="p">(</span><span
class="n">fraction</span><span class="p">,</span><span class="w"> </span><span
class="n" [...]
+
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(</span><span class="n">node</span><span class="p">)</span>
+<span class="w"> </span><span class="p">}</span>
+
+<span class="w"> </span><span class="nb">Some</span><span
class="p">(</span><span class="k">super</span><span class="p">::</span><span
class="n">messages</span><span class="p">::</span><span
class="n">p_message</span><span class="p">::</span><span
class="n">Extension</span><span class="p">::</span><span
class="n">Opaque</span><span class="p">(</span><span
class="n">opaque</span><span class="p">))</span><span class="w"> </span><span
class="o">=></span><span class="w"> </span [...]
+<span class="w"> </span><span class="bp">self</span><span
class="p">.</span><span class="n">inner</span><span class="p">.</span><span
class="n">try_decode</span><span class="p">(</span><span
class="o">&</span><span class="n">opaque</span><span
class="p">,</span><span class="w"> </span><span class="n">inputs</span><span
class="p">,</span><span class="w"> </span><span class="n">registry</span><span
class="p">)</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="w"> </span><span class="nb">None</span><span
class="w"> </span><span class="o">=></span><span class="w"> </span><span
class="n">plan_err</span><span class="o">!</span><span class="p">(</span><span
class="s">"Can't cast physical extension "</span><span
class="p">),</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="w"> </span><span class="p">}</span>
+
+<span class="w"> </span><span class="k">fn</span><span class="w">
</span><span class="nf">try_encode</span><span class="p">(</span>
+<span class="w"> </span><span class="o">&</span><span
class="bp">self</span><span class="p">,</span>
+<span class="w"> </span><span class="n">node</span><span
class="p">:</span><span class="w"> </span><span class="nc">std</span><span
class="p">::</span><span class="n">sync</span><span class="p">::</span><span
class="n">Arc</span><span class="o"><</span><span class="k">dyn</span><span
class="w"> </span><span class="n">datafusion</span><span
class="p">::</span><span class="n">physical_plan</span><span
class="p">::</span><span class="n">ExecutionPlan</span><span class="o">></sp
[...]
+<span class="w"> </span><span class="n">buf</span><span
class="p">:</span><span class="w"> </span><span class="kp">&</span><span
class="nc">mut</span><span class="w"> </span><span class="nb">Vec</span><span
class="o"><</span><span class="kt">u8</span><span class="o">></span><span
class="p">,</span>
+<span class="w"> </span><span class="p">)</span><span class="w">
</span><span class="p">-></span><span class="w"> </span><span
class="nc">datafusion</span><span class="p">::</span><span
class="n">error</span><span class="p">::</span><span
class="nb">Result</span><span class="o"><</span><span
class="p">()</span><span class="o">></span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="k">if</span><span class="w">
</span><span class="kd">let</span><span class="w"> </span><span
class="nb">Some</span><span class="p">(</span><span
class="n">SampleExec</span><span class="w"> </span><span
class="p">{</span><span class="w"> </span><span class="n">fraction</span><span
class="p">,</span><span class="w"> </span><span class="n">seed</span><span
class="p">,</span><span class="w"> </span><span class="o">..</span><span
class="w"> </span>< [...]
+<span class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">message</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">PMessage</span><span
class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="n">extension</span><span
class="p">:</span><span class="w"> </span><span class="nb">Some</span><span
class="p">(</span><span class="k">super</span><span class="p">::</span><span
class="n">messages</span><span class="p">::</span><span
class="n">p_message</span><span class="p">::</span><span
class="n">Extension</span><span class="p">::</span><span
class="n">Sample</span><span class="p">(</span><span
class="n">PSample</span><span class="w" [...]
+<span class="w"> </span><span
class="n">fraction</span><span class="p">:</span><span class="w"> </span><span
class="o">*</span><span class="n">fraction</span><span class="p">,</span>
+<span class="w"> </span><span class="n">seed</span><span
class="p">:</span><span class="w"> </span><span class="o">*</span><span
class="n">seed</span><span class="p">,</span>
+<span class="w"> </span><span class="p">})),</span>
+<span class="w"> </span><span class="p">};</span>
+
+<span class="w"> </span><span class="n">message</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">encode</span><span class="p">(</span><span class="n">buf</span><span
class="p">)</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">map_err</span><span class="p">(</span><span class="o">|</span><span
class="n">e</span><span class="o">|</span><span class="w"> </span><span
class="n">DataFusionError</span><span class="p">::</span><span
class="n">Internal</span><span class="p">(</span><span class="n">e</span><span
class="p">.</span><span class="n">to_string</span><span
class="p">()))</span><span class="o">?</span><span class="p">;</span>
+
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(())</span>
+<span class="w"> </span><span class="p">}</span><span class="w">
</span><span class="k">else</span><span class="w"> </span><span
class="p">{</span>
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="k">mut</span><span class="w"> </span><span
class="n">opaque</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="fm">vec!</span><span class="p">[];</span>
+<span class="w"> </span><span class="bp">self</span><span
class="p">.</span><span class="n">inner</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">try_encode</span><span class="p">(</span><span
class="n">node</span><span class="p">,</span><span class="w"> </span><span
class="o">&</span><span class="k">mut</span><span class="w"> </span><span
class="n">opaque</span><span class="p">)</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">map_err</span><span class="p">(</span><span class="o">|</span><span
class="n">e</span><span class="o">|</span><span class="w"> </span><span
class="n">DataFusionError</span><span class="p">::</span><span
class="n">Internal</span><span class="p">(</span><span class="n">e</span><span
class="p">.</span><span class="n">to_string</span><span
class="p">()))</span><span class="o">?</span><span class="p">;</span>
+
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">message</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">PMessage</span><span
class="w"> </span><span class="p">{</span>
+<span class="w"> </span><span class="n">extension</span><span
class="p">:</span><span class="w"> </span><span class="nb">Some</span><span
class="p">(</span><span class="k">super</span><span class="p">::</span><span
class="n">messages</span><span class="p">::</span><span
class="n">p_message</span><span class="p">::</span><span
class="n">Extension</span><span class="p">::</span><span
class="n">Opaque</span><span class="p">(</span><span
class="n">opaque</span><span class="p"> [...]
+<span class="w"> </span><span class="p">};</span>
+
+<span class="w"> </span><span class="n">message</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">encode</span><span class="p">(</span><span class="n">buf</span><span
class="p">)</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">map_err</span><span class="p">(</span><span class="o">|</span><span
class="n">e</span><span class="o">|</span><span class="w"> </span><span
class="n">DataFusionError</span><span class="p">::</span><span
class="n">Internal</span><span class="p">(</span><span class="n">e</span><span
class="p">.</span><span class="n">to_string</span><span
class="p">()))</span><span class="o">?</span><span class="p">;</span>
+
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(())</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="w"> </span><span class="p">}</span>
+<span class="p">}</span>
+</pre></div>
+</div>
+<p>This should be all moving parts necessary to extend ballista functionality.
Last step would be to
+configure scheduler and executor to use new features.</p>
+<p><code class="docutils literal notranslate"><span
class="pre">SchedulerConfig</span></code> should be configured overriding
logical, physical codec and session builder function:</p>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="kd">let</span><span class="w">
</span><span class="n">config</span><span class="p">:</span><span class="w">
</span><span class="nc">SchedulerConfig</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span
class="n">SchedulerConfig</span><span class="w"> </span><span class="p">{</span>
+<span class="n">override_logical_codec</span><span class="p">:</span><span
class="w"> </span><span class="nb">Some</span><span class="p">(</span><span
class="n">Arc</span><span class="p">::</span><span class="n">new</span><span
class="p">(</span><span class="n">ExtendedBallistaLogicalCodec</span><span
class="p">::</span><span class="n">default</span><span class="w"> </span><span
class="p">())),</span>
+<span class="n">override_physical_codec</span><span class="p">:</span><span
class="w"> </span><span class="nb">Some</span><span class="p">(</span><span
class="n">Arc</span><span class="p">::</span><span class="n">new</span><span
class="p">(</span><span class="n">ExtendedBallistaPhysicalCodec</span><span
class="p">::</span><span class="n">default</span><span class="w"> </span><span
class="p">())),</span>
+<span class="n">override_session_builder</span><span class="p">:</span><span
class="w"> </span><span class="nb">Some</span><span class="p">(</span><span
class="n">Arc</span><span class="p">::</span><span class="n">new</span><span
class="p">(</span><span class="n">extended_state_producer</span><span
class="p">)),</span>
+<span class="o">..</span><span class="nb">Default</span><span
class="p">::</span><span class="n">default</span><span class="w"> </span><span
class="p">()</span>
+<span class="p">};</span>
+
+<span class="kd">let</span><span class="w"> </span><span
class="n">address</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="fm">format!</span><span class="p">(</span><span
class="s">"{}:{}"</span><span class="p">,</span><span class="w">
</span><span class="n">config</span><span class="p">.</span><span
class="n">bind_host</span><span class="p">,</span><span class="w"> </span><span
class="n">config</span><span class="p">.</span><span clas [...]
+<span class="kd">let</span><span class="w"> </span><span
class="n">address</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="n">address</span>
+<span class="p">.</span><span class="n">parse</span><span class="p">()</span>
+<span class="p">.</span><span class="n">map_err</span><span
class="p">(</span><span class="w"> </span><span class="o">|</span><span
class="w"> </span><span class="n">e</span><span class="p">:</span><span
class="w"> </span><span class="nc">AddrParseError</span><span
class="o">|</span><span class="w"> </span><span
class="n">BallistaError</span><span class="p">::</span><span
class="n">Configuration</span><span class="p">(</span><span
class="n">e</span><span class="p">.</span><span class="n" [...]
+
+<span class="kd">let</span><span class="w"> </span><span
class="n">cluster</span><span class="w"> </span><span class="o">=</span><span
class="w"> </span><span class="n">BallistaCluster</span><span
class="p">::</span><span class="n">new_from_config</span><span
class="p">(</span><span class="w"> </span><span class="o">&</span><span
class="w"> </span><span class="n">config</span><span class="p">).</span><span
class="k">await</span><span class="o">?</span><span class="p">;</span>
+
+<span class="n">start_server</span><span class="p">(</span><span
class="n">cluster</span><span class="p">,</span><span class="w"> </span><span
class="n">address</span><span class="p">,</span><span class="w"> </span><span
class="n">Arc</span><span class="p">::</span><span class="n">new</span><span
class="p">(</span><span class="n">config</span><span class="p">)).</span><span
class="k">await</span><span class="o">?</span><span class="p">;</span>
+</pre></div>
+</div>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="k">pub</span><span class="w">
</span><span class="k">fn</span><span class="w"> </span><span
class="nf">extended_state_producer</span><span class="p">(</span><span
class="n">config</span><span class="p">:</span><span class="w"> </span><span
class="nc">SessionConfig</span><span class="p">)</span><span class="w">
</span><span class="p">-></span><span class="w"> </span><span
class="nc">datafusion [...]
+<span class="w"> </span><span class="c1">// we need custom query planner to
convert logical to physical operator</span>
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">query_planner</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span class="n">Arc</span><span
class="p">::</span><span class="n">new</span><span class="p">(</span><span
class="n">QueryPlannerWithExtensions</span><span class="p">::</span><span
class="n">default</span><span class="p">());</span>
+
+<span class="w"> </span><span class="kd">let</span><span class="w">
</span><span class="n">state</span><span class="w"> </span><span
class="o">=</span><span class="w"> </span><span
class="n">SessionStateBuilder</span><span class="p">::</span><span
class="n">new</span><span class="p">()</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">with_config</span><span class="p">(</span><span
class="n">config</span><span class="p">)</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">with_query_planner</span><span class="p">(</span><span
class="n">query_planner</span><span class="p">)</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">with_default_features</span><span class="p">()</span>
+<span class="w"> </span><span class="p">.</span><span
class="n">build</span><span class="p">();</span>
+
+<span class="w"> </span><span class="nb">Ok</span><span
class="p">(</span><span class="n">state</span><span class="p">)</span>
+<span class="p">}</span>
+</pre></div>
+</div>
+<p>similarly for <code class="docutils literal notranslate"><span
class="pre">ExecutorProcessConfig</span></code>:</p>
+<div class="highlight-rust notranslate"><div
class="highlight"><pre><span></span><span class="kd">let</span><span class="w">
</span><span class="n">config</span><span class="p">:</span><span class="w">
</span><span class="nc">ExecutorProcessConfig</span><span class="w">
</span><span class="o">=</span><span class="w"> </span><span
class="n">ExecutorProcessConfig</span><span class="w"> </span><span
class="p">{</span>
+<span class="n">override_logical_codec</span><span class="p">:</span><span
class="w"> </span><span class="nb">Some</span><span class="p">(</span><span
class="n">Arc</span><span class="p">::</span><span class="n">new</span><span
class="p">(</span><span class="n">ExtendedBallistaLogicalCodec</span><span
class="p">::</span><span class="n">default</span><span class="w"> </span><span
class="p">())),</span>
+<span class="n">override_physical_codec</span><span class="p">:</span><span
class="w"> </span><span class="nb">Some</span><span class="p">(</span><span
class="n">Arc</span><span class="p">::</span><span class="n">new</span><span
class="p">(</span><span class="n">ExtendedBallistaPhysicalCodec</span><span
class="p">::</span><span class="n">default</span><span class="w"> </span><span
class="p">())),</span>
+<span class="o">..</span><span class="nb">Default</span><span
class="p">::</span><span class="n">default</span><span class="w"> </span><span
class="p">()</span>
+<span class="p">};</span>
+
+<span class="n">start_executor_process</span><span class="p">(</span><span
class="n">Arc</span><span class="p">::</span><span class="n">new</span><span
class="p">(</span><span class="n">config</span><span class="p">)).</span><span
class="k">await</span>
+</pre></div>
+</div>
+</section>
+<section id="conclusion">
+<h2>Conclusion<a class="headerlink" href="#conclusion" title="Link to this
heading">¶</a></h2>
+<p>This project demonstrates how to extend Ballista with custom logical and
physical operators, codecs, and planner logic.
+By following the outlined steps, you can introduce new DataFrame operations
and ensure they are supported throughout the
+distributed query lifecycle.</p>
+<p>For more details, refer to the source code and the linked example files.
Contributions and feedback are welcome!</p>
+<hr class="docutils" />
+<p><strong>Related links:</strong></p>
+<ul class="simple">
+<li><p><a class="reference external"
href="https://github.com/milenkovicm/ballista_extensions">Ballista Extensions
Source Code</a></p></li>
+<li><p><a class="reference external"
href="https://datafusion.apache.org">DataFusion Documentation</a></p></li>
+<li><p><a class="reference external"
href="https://docs.rs/tonic/latest/tonic/">Rust Tonic (GRPC)
support</a></p></li>
+</ul>
+</section>
+</section>
+
+
+ </div>
+
+
+ <!-- Previous / next buttons -->
+<div class='prev-next-area'>
+ <a class='left-prev' id="prev-link" href="extending-components.html"
title="previous page">
+ <i class="fas fa-angle-left"></i>
+ <div class="prev-next-info">
+ <p class="prev-next-subtitle">previous</p>
+ <p class="prev-next-title">Extending Ballista Scheduler And
Executors</p>
+ </div>
+ </a>
+ <a class='right-next' id="next-link"
href="../contributors-guide/architecture.html" title="next page">
+ <div class="prev-next-info">
+ <p class="prev-next-subtitle">next</p>
+ <p class="prev-next-title">Ballista Architecture</p>
+ </div>
+ <i class="fas fa-angle-right"></i>
+ </a>
+</div>
+
+ </main>
+
+
+ </div>
+ </div>
+
+ <script
src="../_static/scripts/pydata-sphinx-theme.js?digest=1999514e3f237ded88cf"></script>
+
+<!-- Based on pydata_sphinx_theme/footer.html -->
+<footer class="footer mt-5 mt-md-0">
+ <div class="container">
+
+ <div class="footer-item">
+ <p class="copyright">
+ © Copyright 2019-2024, Apache Software Foundation.<br>
+</p>
+ </div>
+
+ <div class="footer-item">
+ <p class="sphinx-version">
+Created using <a href="http://sphinx-doc.org/">Sphinx</a> 8.1.3.<br>
+</p>
+ </div>
+
+ <div class="footer-item">
+ <p>Apache DataFusion Ballista, Arrow Ballista, Apache, the Apache
feather logo, and the Apache DataFusion Ballista project logo</p>
+ <p>are either registered trademarks or trademarks of The Apache Software
Foundation in the United States and other countries.</p>
+ </div>
+ </div>
+</footer>
+
+
+ </body>
+</html>
\ No newline at end of file
diff --git a/user-guide/faq.html b/user-guide/faq.html
index 3d119ed88..b9f9b374f 100644
--- a/user-guide/faq.html
+++ b/user-guide/faq.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/introduction.html b/user-guide/introduction.html
index 789eaeac1..0edc22c0e 100644
--- a/user-guide/introduction.html
+++ b/user-guide/introduction.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/metrics.html b/user-guide/metrics.html
index d11954875..ea0b85b89 100644
--- a/user-guide/metrics.html
+++ b/user-guide/metrics.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/python.html b/user-guide/python.html
index 476975e73..c1edb58c0 100644
--- a/user-guide/python.html
+++ b/user-guide/python.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/rust.html b/user-guide/rust.html
index 62268ab34..ecf668f2c 100644
--- a/user-guide/rust.html
+++ b/user-guide/rust.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/scheduler.html b/user-guide/scheduler.html
index cc023e124..a85a52015 100644
--- a/user-guide/scheduler.html
+++ b/user-guide/scheduler.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
diff --git a/user-guide/tuning-guide.html b/user-guide/tuning-guide.html
index b2485c51e..24a6e7647 100644
--- a/user-guide/tuning-guide.html
+++ b/user-guide/tuning-guide.html
@@ -182,6 +182,11 @@
Extending Ballista Scheduler And Executors
</a>
</li>
+ <li class="toctree-l1">
+ <a class="reference internal" href="extensions-example.html">
+ Extensions Example
+ </a>
+ </li>
</ul>
<p aria-level="2" class="caption" role="heading">
<span class="caption-text">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]