This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 91a712c12 chore: Add standalone example for executing Substrait DDL
and query (#1376)
91a712c12 is described below
commit 91a712c1238c4bd540ffebe46b7888e66a594991
Author: Matt Cuento <[email protected]>
AuthorDate: Sun Jan 25 04:27:21 2026 -0800
chore: Add standalone example for executing Substrait DDL and query (#1376)
---
Cargo.lock | 3 +
examples/Cargo.toml | 7 +-
examples/README.md | 69 ++++
examples/examples/standalone-substrait.rs | 588 ++++++++++++++++++++++++++++++
4 files changed, 666 insertions(+), 1 deletion(-)
diff --git a/Cargo.lock b/Cargo.lock
index 017652481..351b06156 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1014,13 +1014,16 @@ dependencies = [
"ballista-scheduler",
"ctor",
"datafusion",
+ "datafusion-substrait",
"env_logger",
+ "futures",
"log",
"object_store",
"testcontainers-modules",
"tokio",
"tonic",
"url",
+ "uuid",
]
[[package]]
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 787f458c0..255b28c79 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -42,7 +42,9 @@ ballista-executor = { path = "../ballista/executor", version
= "52.0.0", default
ballista-scheduler = { path = "../ballista/scheduler", version = "52.0.0",
default-features = false }
ctor = { workspace = true }
datafusion = { workspace = true }
+datafusion-substrait = { workspace = true }
env_logger = { workspace = true }
+futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
testcontainers-modules = { version = "0.14", features = ["minio"] }
@@ -55,7 +57,10 @@ tokio = { workspace = true, features = [
] }
tonic = { workspace = true }
url = { workspace = true }
+uuid = { workspace = true }
[features]
-default = []
+default = ["substrait", "standalone"]
+standalone = ["ballista/standalone"]
+substrait = ["ballista-scheduler/substrait"]
testcontainers = []
diff --git a/examples/README.md b/examples/README.md
index 14604ac2b..04854762b 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -73,6 +73,75 @@ async fn main() -> Result<()> {
```
+```bash
+cargo run --example standalone-substrait
--features="ballista/standalone","substrait"
+```
+
+### Source code for standalone Substrait example
+
+```rust
+use std::sync::Arc;
+
+use ballista::datafusion::common::Result;
+use ballista::extension::{Extension, SubstraitExec};
+use ballista_core::extension::SessionConfigExt;
+use ballista_examples::test_util;
+use datafusion::catalog::MemoryCatalogProviderList;
+use datafusion::execution::SessionStateBuilder;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_substrait::serializer::serialize_bytes;
+use futures::StreamExt;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let catalog_list = Arc::new(MemoryCatalogProviderList::new());
+
+ let config = SessionConfig::new_with_ballista();
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_default_features()
+ .with_catalog_list(catalog_list.clone())
+ .build();
+ let ctx = SessionContext::new_with_state(state.clone());
+
+ // Use any frontend to serialize a Substrait plan
+ let test_data = test_util::examples_test_data();
+ let ddl_plan_bytes = serialize_bytes(&format!(
+ "CREATE EXTERNAL TABLE IF NOT EXISTS another_data \
+ STORED AS PARQUET \
+ LOCATION '{}/alltypes_plain.parquet'",
+ test_data
+ ), &ctx).await?;
+ let select_plan_bytes = serialize_bytes(
+ "SELECT id, string_col FROM another_data",
+ &ctx)
+ .await?;
+
+ let session_id = ctx.session_id();
+ let scheduler_url = Extension::setup_standalone(Some(&state)).await?;
+ let client = SubstraitSchedulerClient::new(scheduler_url,
session_id.to_string()).await?;
+
+ client.execute_query(ddl_plan_bytes).await?;
+ let mut stream = client.execute_query(select_plan_bytes).await?;
+
+ let mut batch_count = 0;
+ let mut total_rows = 0;
+ while let Some(batch_result) = stream.next().await {
+ let batch = batch_result?;
+ batch_count += 1;
+ total_rows += batch.num_rows();
+
+ println!("Batch {}: {} rows", batch_count, batch.num_rows());
+ println!("{:?}", batch);
+ }
+ println!("---------");
+ println!("Query executed successfully!");
+ println!("Total batches: {}, Total rows: {}", batch_count, total_rows);
+
+ Ok(())
+}
+```
+
## Distributed Examples
For background information on the Ballista architecture, refer to
diff --git a/examples/examples/standalone-substrait.rs
b/examples/examples/standalone-substrait.rs
new file mode 100644
index 000000000..5548a9576
--- /dev/null
+++ b/examples/examples/standalone-substrait.rs
@@ -0,0 +1,588 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ballista::datafusion::common::Result;
+use ballista_core::client::BallistaClient;
+use ballista_core::extension::SessionConfigExt;
+use ballista_core::serde::protobuf::execute_query_params::Query::SubstraitPlan;
+use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
+use ballista_core::serde::protobuf::{
+ ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult,
PartitionLocation,
+ SuccessfulJob, execute_query_result, job_status,
+};
+use ballista_core::utils::{GrpcClientConfig, create_grpc_client_connection};
+use ballista_examples::test_util;
+use datafusion::arrow::array::RecordBatch;
+use datafusion::arrow::error::ArrowError;
+use datafusion::catalog::MemoryCatalogProviderList;
+use datafusion::common::DataFusionError;
+use datafusion::execution::{
+ SendableRecordBatchStream, SessionState, SessionStateBuilder,
+};
+use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet,
MetricBuilder};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_substrait::serializer::serialize_bytes;
+use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
+use log::{error, info};
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+use tonic::transport::Channel;
+
+/// This example demonstrates executing Substrait plans against external tables
+/// using datafusion-substrait as our frontend.
+///
+/// In a distributed setting, an external catalog would be required to resolve
Substrait
+/// `NamedTable` references. Substrait DDL is currently not supported by
+/// [datafusion_substrait]; producing plans results in an
[LogicalPlan::EmptyRelation] and
+/// consuming DDL Substrait plans will result in an error.
+#[tokio::main]
+async fn main() -> Result<()> {
+ let catalog_list = Arc::new(MemoryCatalogProviderList::new());
+
+ let config = SessionConfig::new_with_ballista();
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_default_features()
+ .with_catalog_list(catalog_list.clone())
+ .build();
+ let ctx = SessionContext::new_with_state(state.clone());
+
+ // Use any frontend to serialize a Substrait plan
+ let test_data = test_util::examples_test_data();
+ let ddl_plan_bytes = serialize_bytes(
+ &format!(
+ "CREATE EXTERNAL TABLE IF NOT EXISTS another_data \
+ STORED AS PARQUET \
+ LOCATION '{}/alltypes_plain.parquet'",
+ test_data
+ ),
+ &ctx,
+ )
+ .await?;
+ let select_plan_bytes =
+ serialize_bytes("SELECT id, string_col FROM another_data",
&ctx).await?;
+
+ let session_id = ctx.session_id();
+ let scheduler_url = setup_standalone(Some(&state)).await?;
+ let client =
+ SubstraitSchedulerClient::new(scheduler_url,
session_id.to_string()).await?;
+
+ client.execute_query(ddl_plan_bytes).await?;
+ let mut stream = client.execute_query(select_plan_bytes).await?;
+
+ let mut batch_count = 0;
+ let mut total_rows = 0;
+ while let Some(batch_result) = stream.next().await {
+ let batch = batch_result?;
+ batch_count += 1;
+ total_rows += batch.num_rows();
+
+ println!("Batch {}: {} rows", batch_count, batch.num_rows());
+ println!("{:?}", batch);
+ }
+ println!("---------");
+ println!("Query executed successfully!");
+ println!("Total batches: {}, Total rows: {}", batch_count, total_rows);
+
+ Ok(())
+}
+
+/// Wrapper class to simplify execution of Substrait plans.
+pub struct SubstraitSchedulerClient {
+ scheduler_url: String,
+ session_id: String,
+ grpc_config: GrpcClientConfig,
+ max_message_size: usize,
+ use_flight_transport: bool,
+}
+
+impl SubstraitSchedulerClient {
+ /// Creates a new [SubstraitSchedulerClient] with default configuration.
+ ///
+ /// # Example
+ /// ```no_run
+ /// use ballista::extension::SubstraitSchedulerClient;
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> datafusion::error::Result<()> {
+ /// let client = SubstraitSchedulerClient::new(
+ /// "http://localhost:50050".to_string(),
+ /// "session-123".to_string()
+ /// ).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn new(
+ scheduler_url: String,
+ session_id: String,
+ ) -> datafusion::error::Result<Self> {
+ Ok(Self {
+ scheduler_url,
+ session_id,
+ grpc_config: GrpcClientConfig::default(),
+ max_message_size: 16 * 1024 * 1024, // 16MB
+ use_flight_transport: false,
+ })
+ }
+
+ /// Creates a builder for custom configuration.
+ ///
+ /// # Example
+ /// ```no_run
+ /// use ballista::extension::SubstraitSchedulerClient;
+ /// use ballista_core::utils::GrpcClientConfig;
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> datafusion::error::Result<()> {
+ /// let custom_config = GrpcClientConfig {
+ /// connect_timeout_seconds: 60,
+ /// timeout_seconds: 120,
+ /// ..Default::default()
+ /// };
+ ///
+ /// let client = SubstraitSchedulerClient::builder(
+ /// "http://localhost:50050".to_string(),
+ /// "session-123".to_string()
+ /// )
+ /// .with_grpc_config(custom_config)
+ /// .with_max_message_size(32 * 1024 * 1024)
+ /// .build()
+ /// .await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn builder(
+ scheduler_url: String,
+ session_id: String,
+ ) -> SubstraitSchedulerClientBuilder {
+ SubstraitSchedulerClientBuilder::new(scheduler_url, session_id)
+ }
+
+ /// Executes a Substrait query plan and returns a stream of results.
+ ///
+ /// This method:
+ /// 1. Submits the Substrait plan to the scheduler
+ /// 2. Polls until the job completes
+ /// 3. Fetches result partitions from executors
+ /// 4. Returns a lazy stream of RecordBatches
+ ///
+ /// # Arguments
+ /// * `plan` - Serialized Substrait plan bytes
+ ///
+ /// # Returns
+ /// A `SendableRecordBatchStream` that lazily fetches and yields result
batches.
+ ///
+ /// # Example
+ /// ```no_run
+ /// use ballista::extension::SubstraitSchedulerClient;
+ /// use futures::StreamExt;
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> datafusion::error::Result<()> {
+ /// let client = SubstraitSchedulerClient::new(
+ /// "http://localhost:50050".to_string(),
+ /// "session-123".to_string()
+ /// ).await?;
+ ///
+ /// let substrait_bytes = vec![]; // Your Substrait plan
+ /// let mut stream = client.execute_query(substrait_bytes).await?;
+ ///
+ /// while let Some(batch) = stream.next().await {
+ /// let batch = batch?;
+ /// println!("Got {} rows", batch.num_rows());
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn execute_query(
+ &self,
+ plan: Vec<u8>,
+ ) -> Result<SendableRecordBatchStream> {
+ let query_start_time = Instant::now();
+
+ let connection =
+ create_grpc_client_connection(self.scheduler_url.clone(),
&self.grpc_config)
+ .await
+ .map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to connect to scheduler: {e:?}"
+ ))
+ })?;
+
+ let mut scheduler = SchedulerGrpcClient::new(connection)
+ .max_encoding_message_size(self.max_message_size)
+ .max_decoding_message_size(self.max_message_size);
+
+ let execute_query_params = ExecuteQueryParams {
+ session_id: self.session_id.clone(),
+ settings: vec![],
+ operation_id: uuid::Uuid::now_v7().to_string(),
+ query: Some(SubstraitPlan(plan)),
+ };
+
+ let response = scheduler
+ .execute_query(execute_query_params)
+ .await
+ .map_err(|e| {
+ DataFusionError::Execution(format!("Failed to execute query:
{e:?}"))
+ })?
+ .into_inner();
+
+ let query_result = match response.result {
+ Some(execute_query_result::Result::Success(success)) => success,
+ Some(execute_query_result::Result::Failure(failure)) => {
+ return Err(DataFusionError::Execution(format!(
+ "Query execution failed: {:?}",
+ failure
+ )));
+ }
+ None => {
+ return Err(DataFusionError::Execution(
+ "No result received from scheduler".to_string(),
+ ));
+ }
+ };
+
+ if query_result.session_id != self.session_id {
+ return Err(DataFusionError::Execution(format!(
+ "Session ID mismatch: expected {}, got {}",
+ self.session_id, query_result.session_id
+ )));
+ }
+
+ let successful_job = Self::poll_job(
+ &mut scheduler,
+ query_result.job_id,
+ None,
+ 0,
+ query_start_time,
+ )
+ .await?;
+
+ let partition_stream = Self::fetch_partitions(
+ successful_job.partition_location,
+ self.max_message_size,
+ self.use_flight_transport,
+ )
+ .await?;
+
+ Ok(Box::pin(SubstraitResultStream::new(partition_stream)))
+ }
+
+ async fn poll_job(
+ scheduler: &mut SchedulerGrpcClient<Channel>,
+ job_id: String,
+ metrics: Option<Arc<ExecutionPlanMetricsSet>>,
+ partition: usize,
+ query_start_time: Instant,
+ ) -> Result<SuccessfulJob> {
+ let mut prev_status: Option<job_status::Status> = None;
+
+ loop {
+ let GetJobStatusResult { status } = scheduler
+ .get_job_status(GetJobStatusParams {
+ job_id: job_id.clone(),
+ })
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
+ .into_inner();
+
+ let status = status.and_then(|s| s.status);
+ let wait_future = tokio::time::sleep(Duration::from_millis(100));
+ let has_status_change = prev_status != status;
+
+ match status {
+ None => {
+ if has_status_change {
+ info!("Job {job_id} is in initialization ...");
+ }
+ wait_future.await;
+ prev_status = status;
+ }
+ Some(job_status::Status::Queued(_)) => {
+ if has_status_change {
+ info!("Job {job_id} is queued...");
+ }
+ wait_future.await;
+ prev_status = status;
+ }
+ Some(job_status::Status::Running(_)) => {
+ if has_status_change {
+ info!("Job {job_id} is running...");
+ }
+ wait_future.await;
+ prev_status = status;
+ }
+ Some(job_status::Status::Failed(err)) => {
+ let msg = format!("Job {} failed: {}", job_id, err.error);
+ error!("{msg}");
+ break Err(DataFusionError::Execution(msg));
+ }
+ Some(job_status::Status::Successful(successful_job)) => {
+ // Calculate job execution time (server-side execution)
+ let job_execution_ms = successful_job
+ .ended_at
+ .saturating_sub(successful_job.started_at);
+ let duration = Duration::from_millis(job_execution_ms);
+
+ info!("Job {job_id} finished executing in {duration:?}");
+
+ // Calculate scheduling time (server-side queue time)
+ // This includes network latency and actual queue time
+ let scheduling_ms = successful_job
+ .started_at
+ .saturating_sub(successful_job.queued_at);
+
+ // Calculate total query time (end-to-end from client
perspective)
+ let total_elapsed = query_start_time.elapsed();
+ let total_ms = total_elapsed.as_millis();
+
+ // Set timing metrics if provided
+ if let Some(ref metrics) = metrics {
+ let metric_job_execution = MetricBuilder::new(metrics)
+ .gauge("job_execution_time_ms", partition);
+ metric_job_execution.set(job_execution_ms as usize);
+
+ let metric_scheduling = MetricBuilder::new(metrics)
+ .gauge("job_scheduling_in_ms", partition);
+ metric_scheduling.set(scheduling_ms as usize);
+
+ let metric_total_time = MetricBuilder::new(metrics)
+ .gauge("total_query_time_ms", partition);
+ metric_total_time.set(total_ms as usize);
+ }
+
+ break Ok(successful_job);
+ }
+ }
+ }
+ }
+
+ async fn fetch_partitions(
+ partition_locations: Vec<PartitionLocation>,
+ max_message_size: usize,
+ use_flight_transport: bool,
+ ) -> Result<std::pin::Pin<Box<dyn Stream<Item = Result<RecordBatch>> +
Send>>> {
+ let streams = partition_locations.into_iter().map(move |partition| {
+ let f = Self::fetch_partition_internal(
+ partition,
+ max_message_size,
+ use_flight_transport,
+ )
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)));
+
+ futures::stream::once(f).try_flatten()
+ });
+
+ Ok(Box::pin(futures::stream::iter(streams).flatten()))
+ }
+
+ async fn fetch_partition_internal(
+ location: PartitionLocation,
+ max_message_size: usize,
+ flight_transport: bool,
+ ) -> Result<SendableRecordBatchStream> {
+ let metadata = location.executor_meta.ok_or_else(|| {
+ DataFusionError::Internal("Received empty executor
metadata".to_owned())
+ })?;
+ let partition_id = location.partition_id.ok_or_else(|| {
+ DataFusionError::Internal("Received empty partition id".to_owned())
+ })?;
+
+ let host = metadata.host.as_str();
+ let port = metadata.port as u16;
+
+ let mut ballista_client = BallistaClient::try_new(host, port,
max_message_size)
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+
+ ballista_client
+ .fetch_partition(
+ &metadata.id,
+ &partition_id.into(),
+ &location.path,
+ host,
+ port,
+ flight_transport,
+ )
+ .await
+ .map_err(|e| DataFusionError::External(Box::new(e)))
+ }
+}
+
+/// Builder for [SubstraitSchedulerClient]
+pub struct SubstraitSchedulerClientBuilder {
+ scheduler_url: String,
+ session_id: String,
+ grpc_config: Option<GrpcClientConfig>,
+ max_message_size: Option<usize>,
+ use_flight_transport: Option<bool>,
+}
+
+impl SubstraitSchedulerClientBuilder {
+ fn new(scheduler_url: String, session_id: String) -> Self {
+ Self {
+ scheduler_url,
+ session_id,
+ grpc_config: None,
+ max_message_size: None,
+ use_flight_transport: None,
+ }
+ }
+
+ /// Overrides default gRPC configuration
+ pub fn with_grpc_config(mut self, config: GrpcClientConfig) -> Self {
+ self.grpc_config = Some(config);
+ self
+ }
+
+ /// Sets max message size for gRPC client
+ pub fn with_max_message_size(mut self, size: usize) -> Self {
+ self.max_message_size = Some(size);
+ self
+ }
+
+ /// Sets usage of Flight when communicating with executor
+ pub fn with_flight_transport(mut self, use_flight: bool) -> Self {
+ self.use_flight_transport = Some(use_flight);
+ self
+ }
+
+ /// Creates a [SubstraitSchedulerClient] instance
+ pub async fn build(self) ->
datafusion::error::Result<SubstraitSchedulerClient> {
+ Ok(SubstraitSchedulerClient {
+ scheduler_url: self.scheduler_url,
+ session_id: self.session_id,
+ grpc_config: self.grpc_config.unwrap_or_default(),
+ max_message_size: self.max_message_size.unwrap_or(16 * 1024 *
1024),
+ use_flight_transport: self.use_flight_transport.unwrap_or(false),
+ })
+ }
+}
+
+struct SubstraitResultStream<S> {
+ inner: std::pin::Pin<Box<S>>,
+ schema: Option<datafusion::arrow::datatypes::SchemaRef>,
+}
+
+impl<S> SubstraitResultStream<S> {
+ fn new(inner: S) -> Self {
+ Self {
+ inner: Box::pin(inner),
+ schema: None,
+ }
+ }
+}
+
+impl<S> futures::Stream for SubstraitResultStream<S>
+where
+ S: futures::Stream<
+ Item = datafusion::error::Result<
+ datafusion::arrow::record_batch::RecordBatch,
+ >,
+ >,
+{
+ type Item =
datafusion::error::Result<datafusion::arrow::record_batch::RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ match self.inner.as_mut().poll_next(cx) {
+ std::task::Poll::Ready(Some(Ok(batch))) => {
+ if self.schema.is_none() {
+ self.schema = Some(batch.schema());
+ }
+ std::task::Poll::Ready(Some(Ok(batch)))
+ }
+ other => other,
+ }
+ }
+}
+
+impl<S> datafusion::physical_plan::RecordBatchStream for
SubstraitResultStream<S>
+where
+ S: futures::Stream<
+ Item = datafusion::error::Result<
+ datafusion::arrow::record_batch::RecordBatch,
+ >,
+ > + Send,
+{
+ fn schema(&self) -> datafusion::arrow::datatypes::SchemaRef {
+ self.schema.clone().unwrap_or_else(|| {
+ std::sync::Arc::new(datafusion::arrow::datatypes::Schema::empty())
+ })
+ }
+}
+
+/// Creates in-process scheduler and executor, returning the scheduler URL.
+pub async fn setup_standalone(session_state: Option<&SessionState>) ->
Result<String> {
+ use ballista_core::{serde::BallistaCodec, utils::default_config_producer};
+
+ let addr = match session_state {
+ None => ballista_scheduler::standalone::new_standalone_scheduler()
+ .await
+ .map_err(|e| DataFusionError::Configuration(e.to_string()))?,
+ Some(session_state) => {
+
ballista_scheduler::standalone::new_standalone_scheduler_from_state(
+ session_state,
+ )
+ .await
+ .map_err(|e| DataFusionError::Configuration(e.to_string()))?
+ }
+ };
+ let config = session_state
+ .map(|s| s.config().clone())
+ .unwrap_or_else(default_config_producer);
+
+ let scheduler_url = format!("http://localhost:{}", addr.port());
+
+ let scheduler = loop {
+ match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
+ Err(_) => {
+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+ log::info!("Attempting to connect to in-proc scheduler...");
+ }
+ Ok(scheduler) => break scheduler,
+ }
+ };
+
+ let concurrent_tasks = config.ballista_standalone_parallelism();
+
+ match session_state {
+ None => {
+ ballista_executor::new_standalone_executor(
+ scheduler,
+ concurrent_tasks,
+ BallistaCodec::default(),
+ )
+ .await
+ .map_err(|e| DataFusionError::Configuration(e.to_string()))?;
+ }
+ Some(session_state) => {
+ ballista_executor::new_standalone_executor_from_state(
+ scheduler,
+ concurrent_tasks,
+ session_state,
+ )
+ .await
+ .map_err(|e| DataFusionError::Configuration(e.to_string()))?;
+ }
+ }
+
+ Ok(scheduler_url)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]