This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new e9e8f9a7 feat: executor support pluggable arrow flight server (#1170)
e9e8f9a7 is described below

commit e9e8f9a783d9dbc8b40f3e57e2a7294788df6b93
Author: Marko Milenković <milenkov...@users.noreply.github.com>
AuthorDate: Fri Jan 24 20:31:21 2025 +0000

    feat: executor support pluggable arrow flight server (#1170)
---
 ballista/executor/src/config.rs           |   1 +
 ballista/executor/src/executor_process.rs | 115 +++++++++++++++++++++---------
 ballista/executor/src/lib.rs              |  15 ++++
 3 files changed, 96 insertions(+), 35 deletions(-)

diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs
index 91b54732..02ff0869 100644
--- a/ballista/executor/src/config.rs
+++ b/ballista/executor/src/config.rs
@@ -54,6 +54,7 @@ impl TryFrom<Config> for ExecutorProcessConfig {
             override_runtime_producer: None,
             override_logical_codec: None,
             override_physical_codec: None,
+            override_arrow_flight_service: None,
         })
     }
 }
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index bfd76b05..b8edc596 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -63,8 +63,8 @@ use crate::flight_service::BallistaFlightService;
 use crate::metrics::LoggingMetricsCollector;
 use crate::shutdown::Shutdown;
 use crate::shutdown::ShutdownNotifier;
-use crate::terminate;
 use crate::{execution_loop, executor_server};
+use crate::{terminate, ArrowFlightServerProvider};
 
 pub struct ExecutorProcessConfig {
     pub bind_host: String,
@@ -101,6 +101,8 @@ pub struct ExecutorProcessConfig {
     pub override_logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
     /// [PhysicalExtensionCodec] override option
     pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
+    /// [ArrowFlightServerProvider] implementation override option
+    pub override_arrow_flight_service: Option<Arc<ArrowFlightServerProvider>>,
 }
 
 impl ExecutorProcessConfig {
@@ -143,6 +145,7 @@ impl Default for ExecutorProcessConfig {
             override_config_producer: None,
             override_logical_codec: None,
             override_physical_codec: None,
+            override_arrow_flight_service: None,
         }
     }
 }
@@ -151,7 +154,7 @@ pub async fn start_executor_process(
     opt: Arc<ExecutorProcessConfig>,
 ) -> ballista_core::error::Result<()> {
     let addr = format!("{}:{}", opt.bind_host, opt.port);
-    let addr = addr.parse().map_err(|e: std::net::AddrParseError| {
+    let address = addr.parse().map_err(|e: std::net::AddrParseError| {
         BallistaError::Configuration(e.to_string())
     })?;
 
@@ -174,9 +177,12 @@ pub async fn start_executor_process(
         opt.concurrent_tasks
     };
 
-    info!("Running with config:");
-    info!("work_dir: {}", work_dir);
-    info!("concurrent_tasks: {}", concurrent_tasks);
+    info!(
+        "Executor starting ... (Datafusion Ballista {})",
+        BALLISTA_VERSION
+    );
+    info!("Executor working directory: {}", work_dir);
+    info!("Executor number of concurrent tasks: {}", concurrent_tasks);
 
     // assign this executor an unique ID
     let executor_id = Uuid::new_v4().to_string();
@@ -261,16 +267,16 @@ pub async fn start_executor_process(
                         "Could not connect to scheduler".to_string(),
                     )
                 }) {
-                Ok(conn) => {
+                Ok(connection) => {
                     info!("Connected to scheduler at {}", scheduler_url);
-                    x = Some(conn);
+                    x = Some(connection);
                 }
                 Err(e) => {
                     warn!(
                         "Failed to connect to scheduler at {} ({}); retrying 
...",
                         scheduler_url, e
                     );
-                    std::thread::sleep(time::Duration::from_millis(500));
+                    tokio::time::sleep(time::Duration::from_millis(500)).await;
                 }
             }
         }
@@ -290,13 +296,15 @@ pub async fn start_executor_process(
     let job_data_ttl_seconds = opt.job_data_ttl_seconds;
 
     // Graceful shutdown notification
-    let shutdown_noti = ShutdownNotifier::new();
+    let shutdown_notification = ShutdownNotifier::new();
 
     if opt.job_data_clean_up_interval_seconds > 0 {
         let mut interval_time =
             
time::interval(Duration::from_secs(opt.job_data_clean_up_interval_seconds));
-        let mut shuffle_cleaner_shutdown = 
shutdown_noti.subscribe_for_shutdown();
-        let shuffle_cleaner_complete = 
shutdown_noti.shutdown_complete_tx.clone();
+
+        let mut shuffle_cleaner_shutdown = 
shutdown_notification.subscribe_for_shutdown();
+        let shuffle_cleaner_complete = 
shutdown_notification.shutdown_complete_tx.clone();
+
         tokio::spawn(async move {
             // As long as the shutdown notification has not been received
             while !shuffle_cleaner_shutdown.is_shutdown() {
@@ -338,7 +346,7 @@ pub async fn start_executor_process(
                     executor.clone(),
                     default_codec,
                     stop_send,
-                    &shutdown_noti,
+                    &shutdown_notification,
                 )
                 .await?,
             );
@@ -351,10 +359,19 @@ pub async fn start_executor_process(
             )));
         }
     };
-    service_handlers.push(tokio::spawn(flight_server_run(
-        addr,
-        shutdown_noti.subscribe_for_shutdown(),
-    )));
+    let shutdown = shutdown_notification.subscribe_for_shutdown();
+    let override_flight = opt.override_arrow_flight_service.clone();
+
+    service_handlers.push(match override_flight {
+        None => {
+            info!("Starting built-in arrow flight service");
+            flight_server_task(address, shutdown).await
+        }
+        Some(flight_provider) => {
+            info!("Starting custom, user provided, arrow flight service");
+            (flight_provider)(address, shutdown)
+        }
+    });
 
     let tasks_drained = TasksDrainedFuture(executor);
 
@@ -436,7 +453,7 @@ pub async fn start_executor_process(
         shutdown_complete_tx,
         notify_shutdown,
         ..
-    } = shutdown_noti;
+    } = shutdown_notification;
 
     // When `notify_shutdown` is dropped, all components which have 
`subscribe`d will
     // receive the shutdown signal and can exit
@@ -451,25 +468,21 @@ pub async fn start_executor_process(
 }
 
 // Arrow flight service
-async fn flight_server_run(
-    addr: SocketAddr,
+async fn flight_server_task(
+    address: SocketAddr,
     mut grpc_shutdown: Shutdown,
-) -> Result<(), BallistaError> {
-    let service = BallistaFlightService::new();
-    let server = FlightServiceServer::new(service);
-    info!(
-        "Ballista v{} Rust Executor Flight Server listening on {:?}",
-        BALLISTA_VERSION, addr
-    );
-
-    let shutdown_signal = grpc_shutdown.recv();
-    let server_future = create_grpc_server()
-        .add_service(server)
-        .serve_with_shutdown(addr, shutdown_signal);
-
-    server_future.await.map_err(|e| {
-        error!("Tonic error, Could not start Executor Flight Server.");
-        BallistaError::TonicError(e)
+) -> JoinHandle<Result<(), BallistaError>> {
+    tokio::spawn(async move {
+        info!("Built-in arrow flight server listening on: {:?}", address);
+
+        let server_future = create_grpc_server()
+            
.add_service(FlightServiceServer::new(BallistaFlightService::new()))
+            .serve_with_shutdown(address, grpc_shutdown.recv());
+
+        server_future.await.map_err(|e| {
+            error!("Could not start built-in arrow flight server.");
+            BallistaError::TonicError(e)
+        })
     })
 }
 
@@ -642,4 +655,36 @@ mod tests {
         let count2 = fs::read_dir(work_dir.clone()).unwrap().count();
         assert_eq!(count2, 0);
     }
+
+    #[tokio::test]
+    async fn test_arrow_flight_provider_ergonomics() {
+        let config = crate::executor_process::ExecutorProcessConfig {
+            override_arrow_flight_service: Some(std::sync::Arc::new(
+                move |address, mut grpc_shutdown| {
+                    tokio::spawn(async move {
+                        log::info!(
+                            "custom arrow flight server listening on: {:?}",
+                            address
+                        );
+
+                        let server_future = 
ballista_core::utils::create_grpc_server()
+                        .add_service(
+                            
arrow_flight::flight_service_server::FlightServiceServer::new(
+                                
crate::flight_service::BallistaFlightService::new(),
+                            ),
+                        )
+                        .serve_with_shutdown(address, grpc_shutdown.recv());
+
+                        server_future.await.map_err(|e| {
+                            log::error!("Could not start built-in arrow flight 
server.");
+                            ballista_core::error::BallistaError::TonicError(e)
+                        })
+                    })
+                },
+            )),
+            ..Default::default()
+        };
+
+        assert!(config.override_arrow_flight_service.is_some());
+    }
 }
diff --git a/ballista/executor/src/lib.rs b/ballista/executor/src/lib.rs
index 23e68f85..005c8049 100644
--- a/ballista/executor/src/lib.rs
+++ b/ballista/executor/src/lib.rs
@@ -33,18 +33,33 @@ pub mod terminate;
 mod cpu_bound_executor;
 mod standalone;
 
+use ballista_core::error::BallistaError;
+use std::net::SocketAddr;
+
 pub use standalone::new_standalone_executor;
 pub use standalone::new_standalone_executor_from_builder;
 pub use standalone::new_standalone_executor_from_state;
 
 use log::info;
 
+use crate::shutdown::Shutdown;
 use ballista_core::serde::protobuf::{
     task_status, FailedTask, OperatorMetricsSet, ShuffleWritePartition, 
SuccessfulTask,
     TaskStatus,
 };
 use ballista_core::serde::scheduler::PartitionId;
 
+/// [ArrowFlightServerProvider] provides a function which creates a new Arrow 
Flight server.
+///
+/// The function should take two arguments:
+/// [SocketAddr] - the address to bind the server to
+/// [Shutdown] - a shutdown signal to gracefully shutdown the server
+/// Returns a [tokio::task::JoinHandle] which will be registered as service 
handler
+///
+pub type ArrowFlightServerProvider = dyn Fn(SocketAddr, Shutdown) -> 
tokio::task::JoinHandle<Result<(), BallistaError>>
+    + Send
+    + Sync;
+
 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
 pub struct TaskExecutionTimes {
     launch_time: u64,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to