This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 379752e28 feat: Cluster RPC customisations to support TLS and custom 
headers (#1400)
379752e28 is described below

commit 379752e284defb498d22522112ccf0badbc5607f
Author: Phillip LeBlanc <[email protected]>
AuthorDate: Thu Jan 29 18:17:39 2026 +0900

    feat: Cluster RPC customisations to support TLS and custom headers (#1400)
    
    * Cluster RPC customizations to support TLS and custom headers
    
    * Add TLS support to scheduler flight proxy service
    
    - Update BallistaFlightProxyService to accept use_tls and 
customize_endpoint parameters
    - Add use_tls field to SchedulerConfig with with_use_tls() builder method
    - Unify EndpointOverrideFn type across crates to use 
ballista_core::extension definition
    - Update flight proxy to use https/http scheme based on TLS configuration
    - Apply custom endpoint configuration for TLS certificate setup
---
 Cargo.lock                                         |   5 +
 ballista/core/Cargo.toml                           |   2 +-
 ballista/core/src/client.rs                        |  35 +-
 .../core/src/execution_plans/distributed_query.rs  |  53 +-
 .../core/src/execution_plans/shuffle_reader.rs     |  73 ++-
 ballista/core/src/extension.rs                     | 234 +++++++++
 ballista/core/src/utils.rs                         |  77 ++-
 ballista/executor/src/config.rs                    |   1 +
 ballista/executor/src/execution_loop.rs            |  14 +-
 ballista/executor/src/executor_process.rs          |  74 ++-
 ballista/executor/src/executor_server.rs           |  26 +-
 ballista/scheduler/src/config.rs                   |  25 +
 ballista/scheduler/src/flight_proxy_service.rs     |  40 +-
 ballista/scheduler/src/scheduler_process.rs        |   9 +
 ballista/scheduler/src/state/executor_manager.rs   |  20 +-
 examples/Cargo.toml                                |  12 +
 examples/examples/mtls-cluster.rs                  | 534 +++++++++++++++++++++
 examples/examples/standalone-substrait.rs          |   7 +-
 18 files changed, 1166 insertions(+), 75 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 3fb69107f..1dbca79e0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1008,17 +1008,21 @@ dependencies = [
 name = "ballista-examples"
 version = "52.0.0"
 dependencies = [
+ "arrow-flight",
  "ballista",
  "ballista-core",
  "ballista-executor",
  "ballista-scheduler",
  "ctor",
  "datafusion",
+ "datafusion-proto",
  "datafusion-substrait",
  "env_logger",
  "futures",
  "log",
  "object_store",
+ "rustls",
+ "tempfile",
  "testcontainers-modules",
  "tokio",
  "tonic",
@@ -5666,6 +5670,7 @@ dependencies = [
  "socket2",
  "sync_wrapper",
  "tokio",
+ "tokio-rustls",
  "tokio-stream",
  "tower",
  "tower-layer",
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 5ad3f7dce..22d89b59f 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -62,7 +62,7 @@ prost = { workspace = true }
 prost-types = { workspace = true }
 rand = { workspace = true }
 serde = { workspace = true, features = ["derive"] }
-tokio = { workspace = true }
+tokio = { workspace = true, features = ["rt-multi-thread"] }
 tokio-stream = { workspace = true, features = ["net"] }
 tonic = { workspace = true }
 tonic-prost = { workspace = true }
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index 53a9aedfa..c01d431ca 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -44,8 +44,11 @@ use datafusion::arrow::{
 use datafusion::error::DataFusionError;
 use datafusion::error::Result;
 
+use crate::extension::BallistaConfigGrpcEndpoint;
 use crate::serde::protobuf;
-use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
+
+use crate::utils::create_grpc_client_endpoint;
+
 use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use futures::{Stream, StreamExt};
 use log::{debug, warn};
@@ -69,17 +72,37 @@ impl BallistaClient {
         host: &str,
         port: u16,
         max_message_size: usize,
+        use_tls: bool,
+        customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
     ) -> BResult<Self> {
-        let addr = format!("http://{host}:{port}";);
-        let grpc_config = GrpcClientConfig::default();
+        let scheme = if use_tls { "https" } else { "http" };
+
+        let addr = format!("{scheme}://{host}:{port}");
         debug!("BallistaClient connecting to {addr}");
-        let connection = create_grpc_client_connection(addr.clone(), 
&grpc_config)
-            .await
+
+        let mut endpoint = create_grpc_client_endpoint(addr.clone(), None)
             .map_err(|e| {
                 BallistaError::GrpcConnectionError(format!(
-                    "Error connecting to Ballista scheduler or executor at 
{addr}: {e:?}"
+                    "Error creating endpoint to Ballista scheduler or executor 
at {addr}: {e:?}"
                 ))
             })?;
+
+        if let Some(customize) = customize_endpoint {
+            endpoint = customize
+                .configure_endpoint(endpoint)
+                .map_err(|e| {
+                    BallistaError::GrpcConnectionError(format!(
+                        "Error creating endpoint to Ballista scheduler or 
executor at {addr}: {e:?}"
+                    ))
+                })?;
+        }
+
+        let connection = endpoint.connect().await.map_err(|e| {
+            BallistaError::GrpcConnectionError(format!(
+                "Error connecting to Ballista scheduler or executor at {addr}: 
{e:?}"
+            ))
+        })?;
+
         let flight_client = FlightServiceClient::new(connection)
             .max_decoding_message_size(max_message_size)
             .max_encoding_message_size(max_message_size);
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index e79edad1d..d7edac56b 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -17,6 +17,7 @@
 
 use crate::client::BallistaClient;
 use crate::config::BallistaConfig;
+use crate::extension::{BallistaConfigGrpcEndpoint, SessionConfigExt};
 use crate::serde::protobuf::get_job_status_result::FlightProxy;
 use crate::serde::protobuf::{
     ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
@@ -24,7 +25,7 @@ use crate::serde::protobuf::{
     scheduler_grpc_client::SchedulerGrpcClient,
 };
 use crate::serde::protobuf::{ExecutorMetadata, SuccessfulJob};
-use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
+use crate::utils::{GrpcClientConfig, create_grpc_client_endpoint};
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::arrow::error::ArrowError;
 use datafusion::arrow::record_batch::RecordBatch;
@@ -40,6 +41,7 @@ use datafusion::physical_plan::{
     DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
     SendableRecordBatchStream, Statistics,
 };
+use datafusion::prelude::SessionConfig;
 use datafusion_proto::logical_plan::{
     AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
 };
@@ -243,6 +245,8 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
         let metric_total_bytes =
             MetricBuilder::new(&self.metrics).counter("transferred_bytes", 
partition);
 
+        let session_config = context.session_config().clone();
+
         let stream = futures::stream::once(
             execute_query(
                 self.scheduler_url.clone(),
@@ -252,6 +256,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
                 GrpcClientConfig::from(&self.config),
                 Arc::new(self.metrics.clone()),
                 partition,
+                session_config,
             )
             .map_err(|e| ArrowError::ExternalError(Box::new(e))),
         )
@@ -283,6 +288,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
     }
 }
 
+#[allow(clippy::too_many_arguments)]
 async fn execute_query(
     scheduler_url: String,
     session_id: String,
@@ -291,19 +297,39 @@ async fn execute_query(
     grpc_config: GrpcClientConfig,
     metrics: Arc<ExecutionPlanMetricsSet>,
     partition: usize,
+    session_config: SessionConfig,
 ) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
+    let grpc_interceptor = session_config.ballista_grpc_interceptor();
+    let customize_endpoint =
+        session_config.ballista_override_create_grpc_client_endpoint();
+    let use_tls = session_config.ballista_use_tls();
+
     // Capture query submission time for total_query_time_ms
     let query_start_time = std::time::Instant::now();
 
     info!("Connecting to Ballista scheduler at {scheduler_url}");
     // TODO reuse the scheduler to avoid connecting to the Ballista scheduler 
again and again
-    let connection = create_grpc_client_connection(scheduler_url.clone(), 
&grpc_config)
+    let mut endpoint =
+        create_grpc_client_endpoint(scheduler_url.clone(), Some(&grpc_config))
+            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+
+    if let Some(ref customize) = customize_endpoint {
+        endpoint = customize
+            .configure_endpoint(endpoint)
+            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+    }
+
+    let connection = endpoint
+        .connect()
         .await
         .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
 
-    let mut scheduler = SchedulerGrpcClient::new(connection)
-        .max_encoding_message_size(max_message_size)
-        .max_decoding_message_size(max_message_size);
+    let mut scheduler = SchedulerGrpcClient::with_interceptor(
+        connection,
+        grpc_interceptor.as_ref().clone(),
+    )
+    .max_encoding_message_size(max_message_size)
+    .max_decoding_message_size(max_message_size);
 
     let query_result = scheduler
         .execute_query(query)
@@ -414,6 +440,8 @@ async fn execute_query(
                         true,
                         scheduler_url.clone(),
                         flight_proxy.clone(),
+                        customize_endpoint.clone(),
+                        use_tls,
                     )
                     .map_err(|e| ArrowError::ExternalError(Box::new(e)));
 
@@ -477,6 +505,8 @@ async fn fetch_partition(
     flight_transport: bool,
     scheduler_url: String,
     flight_proxy: Option<FlightProxy>,
+    customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+    use_tls: bool,
 ) -> Result<SendableRecordBatchStream> {
     let metadata = location.executor_meta.ok_or_else(|| {
         DataFusionError::Internal("Received empty executor 
metadata".to_owned())
@@ -491,10 +521,15 @@ async fn fetch_partition(
     let (client_host, client_port) =
         get_client_host_port(&metadata, &scheduler_url, &flight_proxy)?;
 
-    let mut ballista_client =
-        BallistaClient::try_new(client_host.as_str(), client_port, 
max_message_size)
-            .await
-            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+    let mut ballista_client = BallistaClient::try_new(
+        client_host.as_str(),
+        client_port,
+        max_message_size,
+        use_tls,
+        customize_endpoint,
+    )
+    .await
+    .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
     ballista_client
         .fetch_partition(
             &metadata.id,
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 7de252c9c..1b2de8613 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -33,7 +33,7 @@ use crate::client::BallistaClient;
 use crate::execution_plans::sort_shuffle::{
     get_index_path, is_sort_shuffle_output, stream_sort_shuffle_partition,
 };
-use crate::extension::SessionConfigExt;
+use crate::extension::{BallistaConfigGrpcEndpoint, SessionConfigExt};
 use crate::serde::scheduler::{PartitionLocation, PartitionStats};
 
 use datafusion::arrow::datatypes::SchemaRef;
@@ -169,6 +169,8 @@ impl ExecutionPlan for ShuffleReaderExec {
         let force_remote_read = 
config.ballista_shuffle_reader_force_remote_read();
         let prefer_flight = 
config.ballista_shuffle_reader_remote_prefer_flight();
         let batch_size = config.batch_size();
+        let customize_endpoint = 
config.ballista_override_create_grpc_client_endpoint();
+        let use_tls = config.ballista_use_tls();
 
         if force_remote_read {
             debug!(
@@ -202,6 +204,8 @@ impl ExecutionPlan for ShuffleReaderExec {
             max_message_size,
             force_remote_read,
             prefer_flight,
+            customize_endpoint,
+            use_tls,
         );
 
         let input_stream = Box::pin(RecordBatchStreamAdapter::new(
@@ -404,6 +408,8 @@ fn send_fetch_partitions(
     max_message_size: usize,
     force_remote_read: bool,
     flight_transport: bool,
+    customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+    use_tls: bool,
 ) -> AbortableReceiverStream {
     let (response_sender, response_receiver) = mpsc::channel(max_request_num);
     let semaphore = Arc::new(Semaphore::new(max_request_num));
@@ -420,10 +426,17 @@ fn send_fetch_partitions(
 
     // keep local shuffle files reading in serial order for memory control.
     let response_sender_c = response_sender.clone();
+    let customize_endpoint_c = customize_endpoint.clone();
     spawned_tasks.push(SpawnedTask::spawn(async move {
         for p in local_locations {
             let r = PartitionReaderEnum::Local
-                .fetch_partition(&p, max_message_size, flight_transport)
+                .fetch_partition(
+                    &p,
+                    max_message_size,
+                    flight_transport,
+                    customize_endpoint_c.clone(),
+                    use_tls,
+                )
                 .await;
             if let Err(e) = response_sender_c.send(r).await {
                 error!("Fail to send response event to the channel due to 
{e}");
@@ -434,11 +447,18 @@ fn send_fetch_partitions(
     for p in remote_locations.into_iter() {
         let semaphore = semaphore.clone();
         let response_sender = response_sender.clone();
+        let customize_endpoint_c = customize_endpoint.clone();
         spawned_tasks.push(SpawnedTask::spawn(async move {
             // Block if exceeds max request number.
             let permit = semaphore.acquire_owned().await.unwrap();
             let r = PartitionReaderEnum::FlightRemote
-                .fetch_partition(&p, max_message_size, flight_transport)
+                .fetch_partition(
+                    &p,
+                    max_message_size,
+                    flight_transport,
+                    customize_endpoint_c,
+                    use_tls,
+                )
                 .await;
             // Block if the channel buffer is full.
             if let Err(e) = response_sender.send(r).await {
@@ -465,6 +485,8 @@ trait PartitionReader: Send + Sync + Clone {
         location: &PartitionLocation,
         max_message_size: usize,
         flight_transport: bool,
+        customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+        use_tls: bool,
     ) -> result::Result<SendableRecordBatchStream, BallistaError>;
 }
 
@@ -484,10 +506,19 @@ impl PartitionReader for PartitionReaderEnum {
         location: &PartitionLocation,
         max_message_size: usize,
         flight_transport: bool,
+        customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+        use_tls: bool,
     ) -> result::Result<SendableRecordBatchStream, BallistaError> {
         match self {
             PartitionReaderEnum::FlightRemote => {
-                fetch_partition_remote(location, max_message_size, 
flight_transport).await
+                fetch_partition_remote(
+                    location,
+                    max_message_size,
+                    flight_transport,
+                    customize_endpoint,
+                    use_tls,
+                )
+                .await
             }
             PartitionReaderEnum::Local => 
fetch_partition_local(location).await,
             PartitionReaderEnum::ObjectStoreRemote => {
@@ -501,6 +532,8 @@ async fn fetch_partition_remote(
     location: &PartitionLocation,
     max_message_size: usize,
     flight_transport: bool,
+    customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+    use_tls: bool,
 ) -> result::Result<SendableRecordBatchStream, BallistaError> {
     let metadata = &location.executor_meta;
     let partition_id = &location.partition_id;
@@ -508,18 +541,24 @@ async fn fetch_partition_remote(
     // And we should also avoid to keep alive too many connections for long 
time.
     let host = metadata.host.as_str();
     let port = metadata.port;
-    let mut ballista_client = BallistaClient::try_new(host, port, 
max_message_size)
-        .await
-        .map_err(|error| match error {
-            // map grpc connection error to partition fetch error.
-            BallistaError::GrpcConnectionError(msg) => 
BallistaError::FetchFailed(
-                metadata.id.clone(),
-                partition_id.stage_id,
-                partition_id.partition_id,
-                msg,
-            ),
-            other => other,
-        })?;
+    let mut ballista_client = BallistaClient::try_new(
+        host,
+        port,
+        max_message_size,
+        use_tls,
+        customize_endpoint,
+    )
+    .await
+    .map_err(|error| match error {
+        // map grpc connection error to partition fetch error.
+        BallistaError::GrpcConnectionError(msg) => BallistaError::FetchFailed(
+            metadata.id.clone(),
+            partition_id.stage_id,
+            partition_id.partition_id,
+            msg,
+        ),
+        other => other,
+    })?;
 
     ballista_client
         .fetch_partition(
@@ -1087,6 +1126,8 @@ mod tests {
             4 * 1024 * 1024,
             false,
             true,
+            None,
+            false,
         );
 
         let stream = RecordBatchStreamAdapter::new(
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index 9021ce2f4..f7b02395d 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -30,7 +30,18 @@ use 
datafusion::execution::session_state::SessionStateBuilder;
 use datafusion_proto::logical_plan::LogicalExtensionCodec;
 use datafusion_proto::physical_plan::PhysicalExtensionCodec;
 use datafusion_proto::protobuf::LogicalPlanNode;
+use std::collections::HashMap;
+use std::error::Error;
 use std::sync::Arc;
+use tonic::codegen::http::HeaderName;
+use tonic::metadata::MetadataMap;
+use tonic::service::Interceptor;
+use tonic::transport::Endpoint;
+use tonic::{Request, Status};
+
+/// Type alias for the endpoint override function used in gRPC client 
configuration
+pub type EndpointOverrideFn =
+    Arc<dyn Fn(Endpoint) -> Result<Endpoint, Box<dyn Error + Send + Sync>> + 
Send + Sync>;
 
 /// Provides methods which adapt [SessionState]
 /// for Ballista usage
@@ -143,6 +154,29 @@ pub trait SessionConfigExt {
         self,
         prefer_flight: bool,
     ) -> Self;
+
+    /// Set user defined metadata keys in Ballista gRPC requests
+    fn with_ballista_grpc_metadata(self, metadata: HashMap<String, String>) -> 
Self;
+
+    /// Get a `tonic` interceptor configured to decorate the provided metadata 
keys
+    fn ballista_grpc_interceptor(&self) -> 
Arc<BallistaGrpcMetadataInterceptor>;
+
+    /// Set a custom endpoint override function for gRPC client endpoint 
configuration
+    fn with_ballista_override_create_grpc_client_endpoint(
+        self,
+        override_f: EndpointOverrideFn,
+    ) -> Self;
+
+    /// Get the custom endpoint override function for gRPC client endpoint 
configuration
+    fn ballista_override_create_grpc_client_endpoint(
+        &self,
+    ) -> Option<Arc<BallistaConfigGrpcEndpoint>>;
+
+    /// Set whether to use TLS for executor connections (cluster-wide setting)
+    fn with_ballista_use_tls(self, use_tls: bool) -> Self;
+
+    /// Get whether to use TLS for executor connections
+    fn ballista_use_tls(&self) -> bool;
 }
 
 /// [SessionConfigHelperExt] is set of [SessionConfig] extension methods
@@ -389,6 +423,44 @@ impl SessionConfigExt for SessionConfig {
                 .set_bool(BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT, 
prefer_flight)
         }
     }
+
+    fn with_ballista_grpc_metadata(self, metadata: HashMap<String, String>) -> 
Self {
+        let extension = BallistaGrpcMetadataInterceptor::new(metadata);
+        self.with_extension(Arc::new(extension))
+    }
+
+    fn ballista_grpc_interceptor(&self) -> 
Arc<BallistaGrpcMetadataInterceptor> {
+        self.get_extension::<BallistaGrpcMetadataInterceptor>()
+            .unwrap_or_default()
+    }
+
+    fn with_ballista_override_create_grpc_client_endpoint(
+        self,
+        override_f: Arc<
+            dyn Fn(Endpoint) -> Result<Endpoint, Box<dyn Error + Send + Sync>>
+                + Send
+                + Sync,
+        >,
+    ) -> Self {
+        let extension = BallistaConfigGrpcEndpoint::new(override_f);
+        self.with_extension(Arc::new(extension))
+    }
+
+    fn ballista_override_create_grpc_client_endpoint(
+        &self,
+    ) -> Option<Arc<BallistaConfigGrpcEndpoint>> {
+        self.get_extension::<BallistaConfigGrpcEndpoint>()
+    }
+
+    fn with_ballista_use_tls(self, use_tls: bool) -> Self {
+        self.with_extension(Arc::new(BallistaUseTls(use_tls)))
+    }
+
+    fn ballista_use_tls(&self) -> bool {
+        self.get_extension::<BallistaUseTls>()
+            .map(|ext| ext.0)
+            .unwrap_or(false)
+    }
 }
 
 impl SessionConfigHelperExt for SessionConfig {
@@ -528,6 +600,71 @@ impl BallistaQueryPlannerExtension {
     }
 }
 
+/// Wrapper allowing additional metadata keys to be decorated to the scheduler
+/// gRPC request
+#[derive(Default, Clone)]
+pub struct BallistaGrpcMetadataInterceptor {
+    additional_metadata: HashMap<String, String>,
+}
+
+impl BallistaGrpcMetadataInterceptor {
+    /// Create a new interceptor with additional metadata
+    pub fn new(additional_metadata: HashMap<String, String>) -> Self {
+        Self {
+            additional_metadata,
+        }
+    }
+}
+
+impl Interceptor for BallistaGrpcMetadataInterceptor {
+    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, 
Status> {
+        if self.additional_metadata.is_empty() {
+            Ok(request)
+        } else {
+            let mut request_headers = 
request.metadata().clone().into_headers();
+            for (k, v) in &self.additional_metadata {
+                request_headers.insert(
+                    HeaderName::from_bytes(k.as_bytes())
+                        .map_err(|e| Status::invalid_argument(e.to_string()))?,
+                    v.parse().map_err(|_e| {
+                        Status::invalid_argument(format!(
+                            "{v} is not a valid header value"
+                        ))
+                    })?,
+                );
+            }
+            *request.metadata_mut() = 
MetadataMap::from_headers(request_headers);
+            Ok(request)
+        }
+    }
+}
+
+/// Wrapper for customizing gRPC client endpoint configuration.
+/// This allows configuring TLS, timeouts, and other transport settings.
+#[derive(Clone)]
+pub struct BallistaConfigGrpcEndpoint {
+    override_f: EndpointOverrideFn,
+}
+
+impl BallistaConfigGrpcEndpoint {
+    /// Create a new endpoint configuration wrapper with the given override 
function
+    pub fn new(override_f: EndpointOverrideFn) -> Self {
+        Self { override_f }
+    }
+
+    /// Apply the custom configuration to an endpoint
+    pub fn configure_endpoint(
+        &self,
+        endpoint: Endpoint,
+    ) -> Result<Endpoint, Box<dyn Error + Send + Sync>> {
+        (self.override_f)(endpoint)
+    }
+}
+
+/// Wrapper for cluster-wide TLS configuration
+#[derive(Clone, Copy)]
+pub struct BallistaUseTls(pub bool);
+
 #[cfg(test)]
 mod test {
     use datafusion::{
@@ -572,4 +709,101 @@ mod test {
                 .any(|p| p.key == "datafusion.catalog.information_schema")
         )
     }
+
+    #[test]
+    fn test_ballista_grpc_metadata_interceptor() {
+        use std::collections::HashMap;
+        use tonic::Request;
+        use tonic::service::Interceptor;
+
+        use super::BallistaGrpcMetadataInterceptor;
+
+        // Test empty interceptor passes through unchanged
+        let mut interceptor = BallistaGrpcMetadataInterceptor::default();
+        let request = Request::new(());
+        let result = interceptor.call(request).unwrap();
+        assert!(result.metadata().is_empty());
+
+        // Test interceptor adds metadata
+        let mut metadata = HashMap::new();
+        metadata.insert("x-api-key".to_string(), "test-key".to_string());
+        metadata.insert("x-custom-header".to_string(), 
"custom-value".to_string());
+
+        let mut interceptor = BallistaGrpcMetadataInterceptor::new(metadata);
+        let request = Request::new(());
+        let result = interceptor.call(request).unwrap();
+
+        assert_eq!(
+            result
+                .metadata()
+                .get("x-api-key")
+                .unwrap()
+                .to_str()
+                .unwrap(),
+            "test-key"
+        );
+        assert_eq!(
+            result
+                .metadata()
+                .get("x-custom-header")
+                .unwrap()
+                .to_str()
+                .unwrap(),
+            "custom-value"
+        );
+    }
+
+    #[test]
+    fn test_ballista_grpc_metadata_via_session_config() {
+        use std::collections::HashMap;
+        use tonic::Request;
+        use tonic::service::Interceptor;
+
+        // Test that metadata set via SessionConfig is accessible via 
interceptor
+        let mut metadata = HashMap::new();
+        metadata.insert("authorization".to_string(), "Bearer 
token123".to_string());
+
+        let config =
+            
SessionConfig::new_with_ballista().with_ballista_grpc_metadata(metadata);
+
+        let interceptor = config.ballista_grpc_interceptor();
+        let mut interceptor = interceptor.as_ref().clone();
+
+        let request = Request::new(());
+        let result = interceptor.call(request).unwrap();
+
+        assert_eq!(
+            result
+                .metadata()
+                .get("authorization")
+                .unwrap()
+                .to_str()
+                .unwrap(),
+            "Bearer token123"
+        );
+    }
+
+    #[test]
+    fn test_ballista_endpoint_override_error_handling() {
+        use std::sync::Arc;
+        use tonic::transport::Endpoint;
+
+        use super::BallistaConfigGrpcEndpoint;
+
+        // Test that errors from override function are propagated
+        let override_fn: super::EndpointOverrideFn =
+            Arc::new(|_ep: Endpoint| Err("TLS configuration failed".into()));
+
+        let config_endpoint = BallistaConfigGrpcEndpoint::new(override_fn);
+        let endpoint = Endpoint::from_static("http://localhost:50051";);
+        let result = config_endpoint.configure_endpoint(endpoint);
+
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("TLS configuration failed")
+        );
+    }
 }
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 75698233a..e7f4e2a57 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -36,7 +36,7 @@ use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use std::{fs::File, pin::Pin};
 use tonic::codegen::StdError;
-use tonic::transport::{Channel, Error, Server};
+use tonic::transport::{Channel, Endpoint, Error, Server};
 
 /// Configuration for gRPC client connections.
 ///
@@ -222,6 +222,34 @@ where
     endpoint.connect().await
 }
 
+/// Creates a gRPC client endpoint (without connecting) for customization.
+/// This is typically used when TLS or other custom configuration is needed.
+/// If `config` is provided, standard timeout and keepalive settings are 
applied.
+pub fn create_grpc_client_endpoint<D>(
+    dst: D,
+    config: Option<&GrpcClientConfig>,
+) -> std::result::Result<Endpoint, Error>
+where
+    D: std::convert::TryInto<tonic::transport::Endpoint>,
+    D::Error: Into<StdError>,
+{
+    let endpoint = tonic::transport::Endpoint::new(dst)?;
+    if let Some(config) = config {
+        Ok(endpoint
+            
.connect_timeout(Duration::from_secs(config.connect_timeout_seconds))
+            .timeout(Duration::from_secs(config.timeout_seconds))
+            .tcp_nodelay(true)
+            
.tcp_keepalive(Some(Duration::from_secs(config.tcp_keepalive_seconds)))
+            .http2_keep_alive_interval(Duration::from_secs(
+                config.http2_keepalive_interval_seconds,
+            ))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true))
+    } else {
+        Ok(endpoint)
+    }
+}
+
 /// Creates a gRPC server builder with the specified configuration.
 pub fn create_grpc_server(config: &GrpcServerConfig) -> Server {
     Server::builder()
@@ -261,3 +289,50 @@ pub fn get_time_before(interval_seconds: u64) -> u64 {
         .unwrap_or_else(|| Duration::from_secs(0))
         .as_secs()
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_grpc_client_config_from_ballista_config() {
+        let ballista_config = BallistaConfig::default();
+        let grpc_config = GrpcClientConfig::from(&ballista_config);
+
+        // Verify the conversion picks up the right values
+        assert_eq!(
+            grpc_config.connect_timeout_seconds,
+            ballista_config.default_grpc_client_connect_timeout_seconds() as 
u64
+        );
+        assert_eq!(
+            grpc_config.timeout_seconds,
+            ballista_config.default_grpc_client_timeout_seconds() as u64
+        );
+        assert_eq!(
+            grpc_config.tcp_keepalive_seconds,
+            ballista_config.default_grpc_client_tcp_keepalive_seconds() as u64
+        );
+        assert_eq!(
+            grpc_config.http2_keepalive_interval_seconds,
+            
ballista_config.default_grpc_client_http2_keepalive_interval_seconds() as u64
+        );
+    }
+
+    #[test]
+    fn test_create_grpc_client_endpoint_with_config() {
+        let config = GrpcClientConfig {
+            connect_timeout_seconds: 10,
+            timeout_seconds: 30,
+            tcp_keepalive_seconds: 1800,
+            http2_keepalive_interval_seconds: 150,
+        };
+        let result = create_grpc_client_endpoint("http://localhost:50051";, 
Some(&config));
+        assert!(result.is_ok());
+    }
+
+    #[test]
+    fn test_create_grpc_client_endpoint_invalid_url() {
+        let result = create_grpc_client_endpoint("not a valid url", None);
+        assert!(result.is_err());
+    }
+}
diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs
index 524504c3e..2a58c9e92 100644
--- a/ballista/executor/src/config.rs
+++ b/ballista/executor/src/config.rs
@@ -176,6 +176,7 @@ impl TryFrom<Config> for ExecutorProcessConfig {
             override_logical_codec: None,
             override_physical_codec: None,
             override_arrow_flight_service: None,
+            override_create_grpc_client_endpoint: None,
         })
     }
 }
diff --git a/ballista/executor/src/execution_loop.rs 
b/ballista/executor/src/execution_loop.rs
index 87e34a0ca..0d55793f2 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -46,7 +46,7 @@ use std::sync::mpsc::{Receiver, Sender, TryRecvError};
 use std::time::{SystemTime, UNIX_EPOCH};
 use std::{sync::Arc, time::Duration};
 use tokio::sync::{OwnedSemaphorePermit, Semaphore};
-use tonic::transport::Channel;
+use tonic::codegen::{Body, Bytes, StdError};
 
 /// Main execution loop that polls the scheduler for available tasks.
 ///
@@ -56,11 +56,17 @@ use tonic::transport::Channel;
 ///
 /// The loop respects the executor's concurrent task limit via a semaphore,
 /// ensuring no more than the configured number of tasks run simultaneously.
-pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
-    mut scheduler: SchedulerGrpcClient<Channel>,
+pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan, C>(
+    mut scheduler: SchedulerGrpcClient<C>,
     executor: Arc<Executor>,
     codec: BallistaCodec<T, U>,
-) -> Result<(), BallistaError> {
+) -> Result<(), BallistaError>
+where
+    C: tonic::client::GrpcService<tonic::body::Body>,
+    C::Error: Into<StdError>,
+    C::ResponseBody: Body<Data = Bytes> + Send + 'static,
+    <C::ResponseBody as Body>::Error: Into<StdError> + Send,
+{
     let executor_specification: ExecutorSpecification = executor
         .metadata
         .specification
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index 16a5d2e45..702c8976b 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -42,7 +42,7 @@ use datafusion::execution::runtime_env::RuntimeEnvBuilder;
 
 use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy};
 use ballista_core::error::BallistaError;
-use ballista_core::extension::SessionConfigExt;
+use ballista_core::extension::{EndpointOverrideFn, SessionConfigExt};
 use ballista_core::serde::protobuf::executor_resource::Resource;
 use ballista_core::serde::protobuf::executor_status::Status;
 use ballista_core::serde::protobuf::{
@@ -53,7 +53,7 @@ use ballista_core::serde::{
     BallistaCodec, BallistaLogicalExtensionCodec, 
BallistaPhysicalExtensionCodec,
 };
 use ballista_core::utils::{
-    GrpcServerConfig, create_grpc_client_connection, create_grpc_server,
+    GrpcClientConfig, GrpcServerConfig, create_grpc_client_endpoint, 
create_grpc_server,
     default_config_producer, get_time_before,
 };
 use ballista_core::{BALLISTA_VERSION, ConfigProducer, RuntimeProducer};
@@ -129,6 +129,8 @@ pub struct ExecutorProcessConfig {
     pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
     /// [ArrowFlightServerProvider] implementation override option
     pub override_arrow_flight_service: Option<Arc<ArrowFlightServerProvider>>,
+    /// Override function for customizing gRPC client endpoints before they 
are used
+    pub override_create_grpc_client_endpoint: Option<EndpointOverrideFn>,
 }
 
 impl ExecutorProcessConfig {
@@ -174,6 +176,7 @@ impl Default for ExecutorProcessConfig {
             override_logical_codec: None,
             override_physical_codec: None,
             override_arrow_flight_service: None,
+            override_create_grpc_client_endpoint: None,
         }
     }
 }
@@ -284,14 +287,28 @@ pub async fn start_executor_process(
     let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
     let session_config = (executor.config_producer)();
     let ballista_config = session_config.ballista_config();
+    let grpc_config = GrpcClientConfig::from(&ballista_config);
     let connection = if connect_timeout == 0 {
-        create_grpc_client_connection(scheduler_url, 
&(&ballista_config).into())
-            .await
+        let mut endpoint = create_grpc_client_endpoint(scheduler_url, 
Some(&grpc_config))
             .map_err(|_| {
                 BallistaError::GrpcConnectionError(
-                    "Could not connect to scheduler".to_string(),
+                    "Could not create endpoint to scheduler".to_string(),
                 )
-            })
+            })?;
+
+        if let Some(ref override_fn) = 
opt.override_create_grpc_client_endpoint {
+            endpoint = override_fn(endpoint).map_err(|_| {
+                BallistaError::GrpcConnectionError(
+                    "Failed to apply endpoint override".to_string(),
+                )
+            })?;
+        }
+
+        endpoint.connect().await.map_err(|_| {
+            BallistaError::GrpcConnectionError(
+                "Could not connect to scheduler".to_string(),
+            )
+        })
     } else {
         // this feature was added to support docker-compose so that we can 
have the executor
         // wait for the scheduler to start, or at least run for 10 seconds 
before failing so
@@ -301,23 +318,40 @@ pub async fn start_executor_process(
         while x.is_none()
             && Instant::now().elapsed().as_secs() - start_time < 
connect_timeout
         {
-            match create_grpc_client_connection(
-                scheduler_url.clone(),
-                &(&ballista_config).into(),
-            )
-            .await
-            .map_err(|_| {
-                BallistaError::GrpcConnectionError(
-                    "Could not connect to scheduler".to_string(),
-                )
-            }) {
-                Ok(connection) => {
-                    info!("Connected to scheduler at {scheduler_url}");
-                    x = Some(connection);
+            match create_grpc_client_endpoint(scheduler_url.clone(), 
Some(&grpc_config)) {
+                Ok(mut endpoint) => {
+                    if let Some(ref override_fn) =
+                        opt.override_create_grpc_client_endpoint
+                    {
+                        match override_fn(endpoint) {
+                            Ok(overridden_endpoint) => endpoint = 
overridden_endpoint,
+                            Err(e) => {
+                                warn!(
+                                    "Failed to apply endpoint override to 
scheduler at {scheduler_url} ({e}); retrying ..."
+                                );
+                                
tokio::time::sleep(time::Duration::from_millis(500))
+                                    .await;
+                                continue;
+                            }
+                        }
+                    }
+
+                    match endpoint.connect().await {
+                        Ok(connection) => {
+                            info!("Connected to scheduler at {scheduler_url}");
+                            x = Some(connection);
+                        }
+                        Err(e) => {
+                            warn!(
+                                "Failed to connect to scheduler at 
{scheduler_url} ({e}); retrying ..."
+                            );
+                            
tokio::time::sleep(time::Duration::from_millis(500)).await;
+                        }
+                    }
                 }
                 Err(e) => {
                     warn!(
-                        "Failed to connect to scheduler at {scheduler_url} 
({e}); retrying ..."
+                        "Failed to create endpoint to scheduler at 
{scheduler_url} ({e}); retrying ..."
                     );
                     tokio::time::sleep(time::Duration::from_millis(500)).await;
                 }
diff --git a/ballista/executor/src/executor_server.rs 
b/ballista/executor/src/executor_server.rs
index a3da987b8..c01f282ad 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -34,7 +34,7 @@ use tonic::transport::Channel;
 use tonic::{Request, Response, Status};
 
 use ballista_core::error::BallistaError;
-use ballista_core::extension::SessionConfigExt;
+use ballista_core::extension::EndpointOverrideFn;
 use ballista_core::serde::BallistaCodec;
 use ballista_core::serde::protobuf::{
     CancelTasksParams, CancelTasksResult, ExecutorMetric, ExecutorStatus,
@@ -47,10 +47,12 @@ use ballista_core::serde::protobuf::{
 };
 use ballista_core::serde::scheduler::PartitionId;
 use ballista_core::serde::scheduler::TaskDefinition;
+
 use ballista_core::serde::scheduler::from_proto::{
     get_task_definition, get_task_definition_vec,
 };
-use ballista_core::utils::{create_grpc_client_connection, create_grpc_server};
+use ballista_core::utils::{create_grpc_client_endpoint, create_grpc_server};
+
 use dashmap::DashMap;
 use datafusion::execution::TaskContext;
 use datafusion_proto::{logical_plan::AsLogicalPlan, 
physical_plan::AsExecutionPlan};
@@ -113,6 +115,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static 
+ AsExecutionPlan>(
         codec,
         config.grpc_max_encoding_message_size as usize,
         config.grpc_max_decoding_message_size as usize,
+        config.override_create_grpc_client_endpoint.clone(),
     );
 
     // 1. Start executor grpc service
@@ -212,6 +215,7 @@ pub struct ExecutorServer<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPl
     grpc_max_encoding_message_size: usize,
     /// Maximum size for incoming gRPC messages.
     grpc_max_decoding_message_size: usize,
+    override_create_grpc_client_endpoint: Option<EndpointOverrideFn>,
 }
 
 #[derive(Clone)]
@@ -238,6 +242,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
         codec: BallistaCodec<T, U>,
         grpc_max_encoding_message_size: usize,
         grpc_max_decoding_message_size: usize,
+        override_create_grpc_client_endpoint: Option<EndpointOverrideFn>,
     ) -> Self {
         Self {
             _start_time: SystemTime::now()
@@ -251,6 +256,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
             schedulers: Default::default(),
             grpc_max_encoding_message_size,
             grpc_max_decoding_message_size,
+            override_create_grpc_client_endpoint,
         }
     }
 
@@ -264,11 +270,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
             Ok(scheduler)
         } else {
             let scheduler_url = format!("http://{scheduler_id}";);
-            let session_config = (self.executor.config_producer)();
-            let ballista_config = session_config.ballista_config();
-            let connection =
-                create_grpc_client_connection(scheduler_url, 
&(&ballista_config).into())
-                    .await?;
+            let mut endpoint = create_grpc_client_endpoint(scheduler_url, 
None)?;
+
+            if let Some(ref override_fn) = 
self.override_create_grpc_client_endpoint {
+                endpoint = override_fn(endpoint).map_err(|e| {
+                    BallistaError::GrpcConnectionError(format!(
+                        "Failed to customize endpoint for scheduler 
{scheduler_id}: {e}"
+                    ))
+                })?;
+            }
+
+            let connection = endpoint.connect().await?;
             let scheduler = SchedulerGrpcClient::new(connection)
                 .max_encoding_message_size(self.grpc_max_encoding_message_size)
                 
.max_decoding_message_size(self.grpc_max_decoding_message_size);
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index bbb82f5f5..49d4b3f49 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -27,6 +27,7 @@
 
 use crate::SessionBuilder;
 use crate::cluster::DistributionPolicy;
+use ballista_core::extension::EndpointOverrideFn;
 use ballista_core::{ConfigProducer, config::TaskSchedulingPolicy};
 use datafusion_proto::logical_plan::LogicalExtensionCodec;
 use datafusion_proto::physical_plan::PhysicalExtensionCodec;
@@ -240,6 +241,10 @@ pub struct SchedulerConfig {
     pub override_logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
     /// [PhysicalExtensionCodec] override option
     pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
+    /// Override function for customizing gRPC client endpoints before they 
are used
+    pub override_create_grpc_client_endpoint: Option<EndpointOverrideFn>,
+    /// Whether to use TLS when connecting to executors (for flight proxy)
+    pub use_tls: bool,
 }
 
 impl Default for SchedulerConfig {
@@ -266,6 +271,8 @@ impl Default for SchedulerConfig {
             override_session_builder: None,
             override_logical_codec: None,
             override_physical_codec: None,
+            override_create_grpc_client_endpoint: None,
+            use_tls: false,
         }
     }
 }
@@ -385,6 +392,22 @@ impl SchedulerConfig {
         self.override_session_builder = Some(override_session_builder);
         self
     }
+
+    /// Set a custom override function for creating gRPC client endpoints.
+    /// This allows configuring TLS, timeouts, and other transport settings.
+    pub fn with_override_create_grpc_client_endpoint(
+        mut self,
+        override_fn: EndpointOverrideFn,
+    ) -> Self {
+        self.override_create_grpc_client_endpoint = Some(override_fn);
+        self
+    }
+
+    /// Sets whether TLS should be used when connecting to executors (for 
flight proxy).
+    pub fn with_use_tls(mut self, use_tls: bool) -> Self {
+        self.use_tls = use_tls;
+        self
+    }
 }
 
 /// Policy of distributing tasks to available executor slots
@@ -495,6 +518,8 @@ impl TryFrom<Config> for SchedulerConfig {
             override_logical_codec: None,
             override_physical_codec: None,
             override_session_builder: None,
+            override_create_grpc_client_endpoint: None,
+            use_tls: false,
         };
 
         Ok(config)
diff --git a/ballista/scheduler/src/flight_proxy_service.rs 
b/ballista/scheduler/src/flight_proxy_service.rs
index bc5c63153..1513b3197 100644
--- a/ballista/scheduler/src/flight_proxy_service.rs
+++ b/ballista/scheduler/src/flight_proxy_service.rs
@@ -22,13 +22,15 @@ use arrow_flight::{
     HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult, 
Ticket,
 };
 use ballista_core::error::BallistaError;
+use ballista_core::extension::BallistaConfigGrpcEndpoint;
 use ballista_core::serde::decode_protobuf;
 use ballista_core::serde::scheduler::Action as BallistaAction;
-use ballista_core::utils::{GrpcClientConfig, create_grpc_client_connection};
+use ballista_core::utils::{GrpcClientConfig, create_grpc_client_endpoint};
 
 use futures::{Stream, TryFutureExt};
 use log::debug;
 use std::pin::Pin;
+use std::sync::Arc;
 use tonic::{Request, Response, Status, Streaming};
 
 /// Service implementing a proxy from scheduler to executor Apache Arrow 
Flight Protocol
@@ -40,16 +42,24 @@ use tonic::{Request, Response, Status, Streaming};
 pub struct BallistaFlightProxyService {
     max_decoding_message_size: usize,
     max_encoding_message_size: usize,
+    /// Whether to use TLS when connecting to executors
+    use_tls: bool,
+    /// Optional function to customize gRPC endpoint configuration (e.g., for 
TLS)
+    customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
 }
 
 impl BallistaFlightProxyService {
     pub fn new(
         max_decoding_message_size: usize,
         max_encoding_message_size: usize,
+        use_tls: bool,
+        customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
     ) -> Self {
         Self {
             max_decoding_message_size,
             max_encoding_message_size,
+            use_tls,
+            customize_endpoint,
         }
     }
 }
@@ -120,6 +130,8 @@ impl FlightService for BallistaFlightProxyService {
                     *port,
                     self.max_decoding_message_size,
                     self.max_encoding_message_size,
+                    self.use_tls,
+                    self.customize_endpoint.clone(),
                 )
                 .map_err(|e| from_ballista_err(&e))
                 .await?;
@@ -169,16 +181,34 @@ async fn get_flight_client(
     port: u16,
     max_decoding_message_size: usize,
     max_encoding_message_size: usize,
+    use_tls: bool,
+    customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
 ) -> Result<FlightServiceClient<tonic::transport::channel::Channel>, 
BallistaError> {
-    let addr = format!("http://{host}:{port}";);
+    let scheme = if use_tls { "https" } else { "http" };
+    let addr = format!("{scheme}://{host}:{port}");
     let grpc_config = GrpcClientConfig::default();
-    let connection = create_grpc_client_connection(addr.clone(), &grpc_config)
-        .await
+
+    let mut endpoint = create_grpc_client_endpoint(addr.clone(), 
Some(&grpc_config))
         .map_err(|e| {
             BallistaError::GrpcConnectionError(format!(
-                "Error connecting to Ballista scheduler or executor at {addr}: 
{e:?}"
+                "Error creating endpoint for Ballista executor at {addr}: 
{e:?}"
+            ))
+        })?;
+
+    if let Some(ref customize) = customize_endpoint {
+        endpoint = customize.configure_endpoint(endpoint).map_err(|e| {
+            BallistaError::GrpcConnectionError(format!(
+                "Error customizing endpoint for Ballista executor at {addr}: 
{e}"
             ))
         })?;
+    }
+
+    let connection = endpoint.connect().await.map_err(|e| {
+        BallistaError::GrpcConnectionError(format!(
+            "Error connecting to Ballista executor at {addr}: {e:?}"
+        ))
+    })?;
+
     let flight_client = FlightServiceClient::new(connection)
         .max_decoding_message_size(max_decoding_message_size)
         .max_encoding_message_size(max_encoding_message_size);
diff --git a/ballista/scheduler/src/scheduler_process.rs 
b/ballista/scheduler/src/scheduler_process.rs
index 679d2c411..3e6a97b2f 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -20,6 +20,7 @@ use crate::flight_proxy_service::BallistaFlightProxyService;
 use arrow_flight::flight_service_server::FlightServiceServer;
 use ballista_core::BALLISTA_VERSION;
 use ballista_core::error::BallistaError;
+use ballista_core::extension::BallistaConfigGrpcEndpoint;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer;
 use ballista_core::serde::{
     BallistaCodec, BallistaLogicalExtensionCodec, 
BallistaPhysicalExtensionCodec,
@@ -102,9 +103,17 @@ pub async fn start_grpc_service<
     match &config.advertise_flight_sql_endpoint {
         Some(proxy) if proxy.is_empty() => {
             info!("Adding embedded flight proxy service on scheduler");
+            // Wrap the endpoint override function in 
BallistaConfigGrpcEndpoint
+            let customize_endpoint = config
+                .override_create_grpc_client_endpoint
+                .clone()
+                .map(|f| Arc::new(BallistaConfigGrpcEndpoint::new(f)));
+
             let flight_proxy = 
FlightServiceServer::new(BallistaFlightProxyService::new(
                 config.grpc_server_max_encoding_message_size as usize,
                 config.grpc_server_max_decoding_message_size as usize,
+                config.use_tls,
+                customize_endpoint,
             ))
             .max_decoding_message_size(
                 config.grpc_server_max_decoding_message_size as usize,
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index 9ce158fab..3e36537a8 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -34,9 +34,11 @@ use ballista_core::serde::protobuf::{
     StopExecutorParams, executor_status,
 };
 use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
+
 use ballista_core::utils::{
-    GrpcClientConfig, create_grpc_client_connection, get_time_before,
+    GrpcClientConfig, create_grpc_client_endpoint, get_time_before,
 };
+
 use dashmap::DashMap;
 use log::{debug, error, info, warn};
 use std::collections::{HashMap, HashSet};
@@ -473,8 +475,20 @@ impl ExecutorManager {
                 "http://{}:{}";,
                 executor_metadata.host, executor_metadata.grpc_port
             );
-            let connection =
-                create_grpc_client_connection(executor_url, 
grpc_client_config).await?;
+            let mut endpoint =
+                create_grpc_client_endpoint(executor_url, 
Some(grpc_client_config))?;
+
+            if let Some(ref override_fn) =
+                self.config.override_create_grpc_client_endpoint
+            {
+                endpoint = override_fn(endpoint).map_err(|e| {
+                    BallistaError::GrpcConnectionError(format!(
+                        "Failed to customize endpoint for executor 
{executor_id}: {e}"
+                    ))
+                })?;
+            }
+
+            let connection = endpoint.connect().await?;
             let client = ExecutorGrpcClient::new(connection);
 
             {
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 255b28c79..b91bfc3a2 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -33,20 +33,30 @@ name = "standalone_sql"
 path = "examples/standalone-sql.rs"
 required-features = ["ballista/standalone"]
 
+[[example]]
+name = "mtls-cluster"
+path = "examples/mtls-cluster.rs"
+required-features = ["tls"]
+
 [dependencies]
+# Optional dependency for TLS support
+rustls = { version = "0.23", features = ["ring"], optional = true }
 
 [dev-dependencies]
+arrow-flight = { workspace = true }
 ballista = { path = "../ballista/client", version = "52.0.0" }
 ballista-core = { path = "../ballista/core", version = "52.0.0", 
default-features = false }
 ballista-executor = { path = "../ballista/executor", version = "52.0.0", 
default-features = false }
 ballista-scheduler = { path = "../ballista/scheduler", version = "52.0.0", 
default-features = false }
 ctor = { workspace = true }
 datafusion = { workspace = true }
+datafusion-proto = { workspace = true }
 datafusion-substrait = { workspace = true }
 env_logger = { workspace = true }
 futures = { workspace = true }
 log = { workspace = true }
 object_store = { workspace = true, features = ["aws"] }
+tempfile = { workspace = true }
 testcontainers-modules = { version = "0.14", features = ["minio"] }
 tokio = { workspace = true, features = [
     "macros",
@@ -64,3 +74,5 @@ default = ["substrait", "standalone"]
 standalone = ["ballista/standalone"]
 substrait = ["ballista-scheduler/substrait"]
 testcontainers = []
+# Enable TLS support for the mtls-cluster example
+tls = ["tonic/tls-ring", "dep:rustls"]
diff --git a/examples/examples/mtls-cluster.rs 
b/examples/examples/mtls-cluster.rs
new file mode 100644
index 000000000..c86014bec
--- /dev/null
+++ b/examples/examples/mtls-cluster.rs
@@ -0,0 +1,534 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # mTLS Cluster Configuration Example
+//!
+//! This example demonstrates how to configure mutual TLS (mTLS) for secure
+//! communication between Ballista scheduler, executors, and clients.
+//!
+//! ## Overview
+//!
+//! mTLS provides two-way authentication:
+//! - The client verifies the server's identity using the CA certificate
+//! - The server verifies the client's identity using client certificates
+//!
+//! ## Architecture
+//!
+//! This example uses pull-based scheduling where:
+//! - The scheduler runs a gRPC server with server-side TLS
+//! - Executors poll the scheduler for work (client TLS) and serve Flight data 
(server TLS)
+//! - Clients connect to the scheduler with client TLS
+//!
+//! ## Running
+//!
+//! ```bash
+//! # Generate certificates
+//! cargo run --example mtls-cluster --features tls -- certs
+//!
+//! # Terminal 1: Start scheduler with TLS
+//! cargo run --example mtls-cluster --features tls -- scheduler
+//!
+//! # Terminal 2: Start executor with TLS  
+//! cargo run --example mtls-cluster --features tls -- executor
+//!
+//! # Terminal 3: Run client query with TLS
+//! cargo run --example mtls-cluster --features tls -- client
+//! ```
+
+use std::net::SocketAddr;
+use std::process::Command;
+use std::sync::Arc;
+
+use arrow_flight::flight_service_server::FlightServiceServer;
+use ballista_core::ConfigProducer;
+use ballista_core::extension::{SessionConfigExt, SessionStateExt};
+use ballista_core::serde::protobuf::executor_resource::Resource;
+use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
+use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer;
+use ballista_core::serde::protobuf::{
+    ExecutorRegistration, ExecutorResource, ExecutorSpecification,
+};
+use ballista_core::serde::{
+    BallistaCodec, BallistaLogicalExtensionCodec, 
BallistaPhysicalExtensionCodec,
+};
+use ballista_core::utils::create_grpc_client_endpoint;
+use ballista_executor::execution_loop;
+use ballista_executor::executor::Executor;
+use ballista_executor::flight_service::BallistaFlightService;
+use ballista_executor::metrics::LoggingMetricsCollector;
+use ballista_scheduler::cluster::BallistaCluster;
+use ballista_scheduler::config::SchedulerConfig;
+use ballista_scheduler::scheduler_server::SchedulerServer;
+use datafusion::execution::SessionStateBuilder;
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+use log::info;
+use tonic::transport::{
+    Certificate, ClientTlsConfig, Endpoint, Identity, Server, ServerTlsConfig,
+};
+
+/// Directory for certificate files
+const CERTS_DIR: &str = "certs";
+const CA_CERT_PATH: &str = "certs/ca.crt";
+const CA_KEY_PATH: &str = "certs/ca.key";
+const SERVER_CERT_PATH: &str = "certs/server.crt";
+const SERVER_KEY_PATH: &str = "certs/server.key";
+const CLIENT_CERT_PATH: &str = "certs/client.crt";
+const CLIENT_KEY_PATH: &str = "certs/client.key";
+
+/// Holds loaded TLS certificates for mTLS configuration
+#[derive(Clone)]
+struct TlsConfig {
+    ca_cert: Certificate,
+    server_identity: Identity,
+    client_tls: ClientTlsConfig,
+}
+
+impl TlsConfig {
+    fn load() -> Result<Self, Box<dyn std::error::Error>> {
+        let ca_cert_pem = std::fs::read_to_string(CA_CERT_PATH)?;
+        let server_cert_pem = std::fs::read_to_string(SERVER_CERT_PATH)?;
+        let server_key_pem = std::fs::read_to_string(SERVER_KEY_PATH)?;
+        let client_cert_pem = std::fs::read_to_string(CLIENT_CERT_PATH)?;
+        let client_key_pem = std::fs::read_to_string(CLIENT_KEY_PATH)?;
+
+        let ca_cert = Certificate::from_pem(&ca_cert_pem);
+        let server_identity = Identity::from_pem(&server_cert_pem, 
&server_key_pem);
+
+        let client_tls = ClientTlsConfig::new()
+            .ca_certificate(Certificate::from_pem(&ca_cert_pem))
+            .identity(Identity::from_pem(&client_cert_pem, &client_key_pem))
+            .domain_name("localhost");
+
+        Ok(Self {
+            ca_cert,
+            server_identity,
+            client_tls,
+        })
+    }
+
+    fn server_tls_config(&self) -> ServerTlsConfig {
+        ServerTlsConfig::new()
+            .identity(self.server_identity.clone())
+            .client_ca_root(self.ca_cert.clone())
+    }
+}
+
+/// Generate certificates for mTLS using OpenSSL
+fn generate_certs() -> Result<(), Box<dyn std::error::Error>> {
+    std::fs::create_dir_all(CERTS_DIR)?;
+    println!("Created {CERTS_DIR}/ directory");
+
+    let openssl_conf = r#"
+[req]
+distinguished_name = req_distinguished_name
+x509_extensions = v3_ca
+prompt = no
+
+[req_distinguished_name]
+CN = Ballista CA
+
+[v3_ca]
+basicConstraints = critical, CA:TRUE
+keyUsage = critical, keyCertSign, cRLSign
+subjectKeyIdentifier = hash
+
+[server_ext]
+basicConstraints = CA:FALSE
+keyUsage = critical, digitalSignature, keyEncipherment
+extendedKeyUsage = serverAuth, clientAuth
+subjectAltName = @alt_names
+subjectKeyIdentifier = hash
+authorityKeyIdentifier = keyid,issuer
+
+[client_ext]
+basicConstraints = CA:FALSE
+keyUsage = critical, digitalSignature, keyEncipherment
+extendedKeyUsage = clientAuth, serverAuth
+subjectAltName = @alt_names
+subjectKeyIdentifier = hash
+authorityKeyIdentifier = keyid,issuer
+
+[alt_names]
+DNS.1 = localhost
+IP.1 = 127.0.0.1
+"#;
+    let conf_path = format!("{CERTS_DIR}/openssl.cnf");
+    std::fs::write(&conf_path, openssl_conf)?;
+
+    println!("Generating CA...");
+    run_openssl(&["genrsa", "-out", CA_KEY_PATH, "4096"])?;
+    run_openssl(&[
+        "req",
+        "-new",
+        "-x509",
+        "-days",
+        "365",
+        "-key",
+        CA_KEY_PATH,
+        "-out",
+        CA_CERT_PATH,
+        "-config",
+        &conf_path,
+        "-extensions",
+        "v3_ca",
+    ])?;
+
+    println!("Generating server certificate...");
+    run_openssl(&["genrsa", "-out", SERVER_KEY_PATH, "4096"])?;
+    run_openssl(&[
+        "req",
+        "-new",
+        "-key",
+        SERVER_KEY_PATH,
+        "-out",
+        &format!("{CERTS_DIR}/server.csr"),
+        "-subj",
+        "/CN=localhost",
+    ])?;
+    run_openssl(&[
+        "x509",
+        "-req",
+        "-days",
+        "365",
+        "-in",
+        &format!("{CERTS_DIR}/server.csr"),
+        "-CA",
+        CA_CERT_PATH,
+        "-CAkey",
+        CA_KEY_PATH,
+        "-CAcreateserial",
+        "-out",
+        SERVER_CERT_PATH,
+        "-extfile",
+        &conf_path,
+        "-extensions",
+        "server_ext",
+    ])?;
+
+    println!("Generating client certificate...");
+    run_openssl(&["genrsa", "-out", CLIENT_KEY_PATH, "4096"])?;
+    run_openssl(&[
+        "req",
+        "-new",
+        "-key",
+        CLIENT_KEY_PATH,
+        "-out",
+        &format!("{CERTS_DIR}/client.csr"),
+        "-subj",
+        "/CN=ballista-client",
+    ])?;
+    run_openssl(&[
+        "x509",
+        "-req",
+        "-days",
+        "365",
+        "-in",
+        &format!("{CERTS_DIR}/client.csr"),
+        "-CA",
+        CA_CERT_PATH,
+        "-CAkey",
+        CA_KEY_PATH,
+        "-CAcreateserial",
+        "-out",
+        CLIENT_CERT_PATH,
+        "-extfile",
+        &conf_path,
+        "-extensions",
+        "client_ext",
+    ])?;
+
+    // Cleanup
+    for f in ["server.csr", "client.csr", "openssl.cnf", "ca.srl"] {
+        let _ = std::fs::remove_file(format!("{CERTS_DIR}/{f}"));
+    }
+
+    println!("\nCertificates generated in {CERTS_DIR}/");
+    println!("  - ca.crt / ca.key       : Certificate Authority");
+    println!("  - server.crt / server.key : Server certificate (for 
scheduler/executor)");
+    println!("  - client.crt / client.key : Client certificate (for 
executors/clients)");
+    Ok(())
+}
+
+fn run_openssl(args: &[&str]) -> Result<(), Box<dyn std::error::Error>> {
+    let status = Command::new("openssl").args(args).status()?;
+    if !status.success() {
+        return Err(format!("openssl {} failed", args.join(" ")).into());
+    }
+    Ok(())
+}
+
+/// Create a ConfigProducer that configures TLS for task execution (shuffle 
fetching)
+fn create_tls_config_producer(client_tls: ClientTlsConfig) -> ConfigProducer {
+    Arc::new(move || {
+        let tls = client_tls.clone();
+        SessionConfig::new_with_ballista()
+            .with_ballista_use_tls(true)
+            .with_ballista_override_create_grpc_client_endpoint(Arc::new(
+                move |endpoint: Endpoint| {
+                    endpoint.tls_config(tls.clone()).map_err(|e| {
+                        Box::new(e) as Box<dyn std::error::Error + Send + Sync>
+                    })
+                },
+            ))
+    })
+}
+
+/// Start an mTLS-enabled scheduler
+async fn run_scheduler() -> Result<(), Box<dyn std::error::Error>> {
+    let addr: SocketAddr = "0.0.0.0:50050".parse()?;
+    info!("Starting mTLS scheduler on {addr}");
+
+    let tls = TlsConfig::load()?;
+
+    // Configure scheduler with TLS for outbound connections to executors
+    let client_tls = tls.client_tls.clone();
+    let endpoint_override: ballista_scheduler::config::EndpointOverrideFn =
+        Arc::new(move |endpoint: Endpoint| 
endpoint.tls_config(client_tls.clone()));
+
+    let config = SchedulerConfig {
+        bind_host: "0.0.0.0".to_string(),
+        external_host: "localhost".to_string(),
+        bind_port: 50050,
+        override_create_grpc_client_endpoint: Some(endpoint_override),
+        ..Default::default()
+    };
+
+    let cluster = BallistaCluster::new_from_config(&config).await?;
+
+    // Create scheduler server
+    let codec = BallistaCodec::new(
+        Arc::new(BallistaLogicalExtensionCodec::default()),
+        Arc::new(BallistaPhysicalExtensionCodec::default()),
+    );
+    let metrics_collector = 
ballista_scheduler::metrics::default_metrics_collector()?;
+
+    let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
+        SchedulerServer::new(
+            config.scheduler_name(),
+            cluster,
+            codec,
+            Arc::new(config),
+            metrics_collector,
+        );
+    scheduler.init().await?;
+
+    // Build gRPC server with mTLS
+    let scheduler_grpc = SchedulerGrpcServer::new(scheduler)
+        .max_decoding_message_size(16 * 1024 * 1024)
+        .max_encoding_message_size(16 * 1024 * 1024);
+
+    info!("Scheduler listening with mTLS on {addr}");
+
+    Server::builder()
+        .tls_config(tls.server_tls_config())?
+        .add_service(scheduler_grpc)
+        .serve(addr)
+        .await?;
+
+    Ok(())
+}
+
+/// Start an mTLS-enabled executor using pull-based scheduling
+async fn run_executor() -> Result<(), Box<dyn std::error::Error>> {
+    let flight_addr: SocketAddr = "0.0.0.0:50051".parse()?;
+    info!("Starting mTLS executor, flight service on {flight_addr}");
+
+    let tls = TlsConfig::load()?;
+
+    // Create executor
+    let executor_id = uuid::Uuid::new_v4().to_string();
+    let work_dir = tempfile::tempdir()?;
+    let work_dir_str = work_dir.path().to_string_lossy().to_string();
+
+    let executor_meta = ExecutorRegistration {
+        id: executor_id.clone(),
+        host: Some("localhost".to_string()),
+        port: 50051,
+        grpc_port: 0, // Not used in pull-based scheduling
+        specification: Some(ExecutorSpecification {
+            resources: vec![ExecutorResource {
+                resource: Some(Resource::TaskSlots(4)),
+            }],
+        }),
+    };
+
+    let config_producer = create_tls_config_producer(tls.client_tls.clone());
+
+    // Create runtime producer
+    let wd = work_dir_str.clone();
+    let runtime_producer: ballista_core::RuntimeProducer = Arc::new(move |_| {
+        Ok(Arc::new(
+            RuntimeEnvBuilder::new()
+                .with_temp_file_path(wd.clone())
+                .build()?,
+        ))
+    });
+
+    let executor = Arc::new(Executor::new(
+        executor_meta,
+        &work_dir_str,
+        runtime_producer,
+        config_producer,
+        Default::default(), // function_registry
+        Arc::new(LoggingMetricsCollector::default()), // metrics_collector
+        4,                  // concurrent_tasks
+        None,               // execution_engine
+    ));
+
+    // Start Flight service with mTLS for serving shuffle data
+    let flight_service = FlightServiceServer::new(BallistaFlightService::new())
+        .max_decoding_message_size(16 * 1024 * 1024)
+        .max_encoding_message_size(16 * 1024 * 1024);
+
+    // Spawn Flight server with TLS
+    let server_tls = tls.server_tls_config();
+    let flight_handle = tokio::spawn(async move {
+        info!("Executor Flight service listening with mTLS on {flight_addr}");
+        Server::builder()
+            .tls_config(server_tls)
+            .expect("Failed to configure TLS")
+            .add_service(flight_service)
+            .serve(flight_addr)
+            .await
+    });
+
+    // Connect to scheduler with TLS
+    let scheduler_url = "https://localhost:50050";;
+    info!("Connecting to scheduler at {scheduler_url}");
+
+    let endpoint = create_grpc_client_endpoint(scheduler_url.to_string(), 
None)?
+        .tls_config(tls.client_tls.clone())?;
+    let connection = endpoint.connect().await?;
+
+    let scheduler = SchedulerGrpcClient::new(connection)
+        .max_encoding_message_size(16 * 1024 * 1024)
+        .max_decoding_message_size(16 * 1024 * 1024);
+
+    let codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> = 
BallistaCodec::new(
+        Arc::new(BallistaLogicalExtensionCodec::default()),
+        Arc::new(BallistaPhysicalExtensionCodec::default()),
+    );
+
+    // Run the pull-based execution loop
+    // This registers the executor and starts polling for tasks
+    info!("Starting execution poll loop...");
+    let poll_handle = tokio::spawn(async move {
+        execution_loop::poll_loop(scheduler, executor, codec).await
+    });
+
+    tokio::select! {
+        result = flight_handle => {
+            result??;
+        }
+        result = poll_handle => {
+            result??;
+        }
+    }
+
+    Ok(())
+}
+
+/// Run a client query against the mTLS-enabled cluster
+async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
+    info!("Connecting to mTLS scheduler at https://localhost:50050";);
+
+    let tls = TlsConfig::load()?;
+
+    // Configure session with mTLS
+    let client_tls = tls.client_tls.clone();
+    let session_config = SessionConfig::new_with_ballista()
+        .with_ballista_use_tls(true)
+        .with_ballista_override_create_grpc_client_endpoint(Arc::new(
+            move |endpoint: Endpoint| {
+                endpoint
+                    .tls_config(client_tls.clone())
+                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + 
Send + Sync>)
+            },
+        ));
+
+    let session_state = SessionStateBuilder::new()
+        .with_default_features()
+        .with_config(session_config)
+        .build()
+        .upgrade_for_ballista("https://localhost:50050".to_string())?;
+
+    let ctx = SessionContext::new_with_state(session_state);
+
+    info!("Executing query...");
+    let df = ctx.sql("SELECT 1 + 1 as result").await?;
+    df.show().await?;
+
+    Ok(())
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    // Install the ring crypto provider for rustls
+    // This is required when using tonic with TLS
+    rustls::crypto::ring::default_provider()
+        .install_default()
+        .expect("Failed to install rustls crypto provider");
+
+    
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
+        .init();
+
+    let args: Vec<String> = std::env::args().collect();
+    let mode = args.get(1).map(|s| s.as_str()).unwrap_or("help");
+
+    match mode {
+        "certs" => generate_certs()?,
+        "scheduler" => run_scheduler().await?,
+        "executor" => run_executor().await?,
+        "client" => run_client().await?,
+        _ => {
+            println!(
+                r#"Usage: {} <certs|scheduler|executor|client>
+
+mTLS Cluster Example
+====================
+
+This example demonstrates how to configure mutual TLS (mTLS) for secure
+communication between Ballista scheduler, executors, and clients.
+
+Commands:
+  certs      Generate TLS certificates (requires openssl)
+  scheduler  Start mTLS-enabled scheduler on port 50050
+  executor   Start mTLS-enabled executor on port 50051  
+  client     Run a test query against the cluster
+
+Quick start:
+  cargo run --example mtls-cluster --features tls -- certs
+  cargo run --example mtls-cluster --features tls -- scheduler  # terminal 1
+  cargo run --example mtls-cluster --features tls -- executor   # terminal 2
+  cargo run --example mtls-cluster --features tls -- client     # terminal 3
+
+How it works:
+  - Generates X.509 v3 certificates with proper extensions (SAN, keyUsage)
+  - Scheduler runs gRPC server with ServerTlsConfig for mTLS
+  - Executor uses pull-based scheduling (polls scheduler for tasks)
+  - Executor's Flight service uses ServerTlsConfig for shuffle data
+  - Client uses SessionConfigExt to configure TLS for all connections
+"#,
+                args.first().unwrap_or(&"mtls-cluster".to_string())
+            );
+        }
+    }
+
+    Ok(())
+}
diff --git a/examples/examples/standalone-substrait.rs 
b/examples/examples/standalone-substrait.rs
index e0b917597..7e8b2036c 100644
--- a/examples/examples/standalone-substrait.rs
+++ b/examples/examples/standalone-substrait.rs
@@ -406,9 +406,10 @@ impl SubstraitSchedulerClient {
         let host = metadata.host.as_str();
         let port = metadata.port as u16;
 
-        let mut ballista_client = BallistaClient::try_new(host, port, 
max_message_size)
-            .await
-            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+        let mut ballista_client =
+            BallistaClient::try_new(host, port, max_message_size, false, None)
+                .await
+                .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
 
         ballista_client
             .fetch_partition(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to