This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push: new e9e8f9a7 feat: executor support pluggable arrow flight server (#1170) e9e8f9a7 is described below commit e9e8f9a783d9dbc8b40f3e57e2a7294788df6b93 Author: Marko Milenković <milenkov...@users.noreply.github.com> AuthorDate: Fri Jan 24 20:31:21 2025 +0000 feat: executor support pluggable arrow flight server (#1170) --- ballista/executor/src/config.rs | 1 + ballista/executor/src/executor_process.rs | 115 +++++++++++++++++++++--------- ballista/executor/src/lib.rs | 15 ++++ 3 files changed, 96 insertions(+), 35 deletions(-) diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs index 91b54732..02ff0869 100644 --- a/ballista/executor/src/config.rs +++ b/ballista/executor/src/config.rs @@ -54,6 +54,7 @@ impl TryFrom<Config> for ExecutorProcessConfig { override_runtime_producer: None, override_logical_codec: None, override_physical_codec: None, + override_arrow_flight_service: None, }) } } diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index bfd76b05..b8edc596 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -63,8 +63,8 @@ use crate::flight_service::BallistaFlightService; use crate::metrics::LoggingMetricsCollector; use crate::shutdown::Shutdown; use crate::shutdown::ShutdownNotifier; -use crate::terminate; use crate::{execution_loop, executor_server}; +use crate::{terminate, ArrowFlightServerProvider}; pub struct ExecutorProcessConfig { pub bind_host: String, @@ -101,6 +101,8 @@ pub struct ExecutorProcessConfig { pub override_logical_codec: Option<Arc<dyn LogicalExtensionCodec>>, /// [PhysicalExtensionCodec] override option pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>, + /// [ArrowFlightServerProvider] implementation override option + pub override_arrow_flight_service: Option<Arc<ArrowFlightServerProvider>>, } impl ExecutorProcessConfig { @@ -143,6 +145,7 @@ impl Default for ExecutorProcessConfig { override_config_producer: None, override_logical_codec: None, override_physical_codec: None, + override_arrow_flight_service: None, } } } @@ -151,7 +154,7 @@ pub async fn start_executor_process( opt: Arc<ExecutorProcessConfig>, ) -> ballista_core::error::Result<()> { let addr = format!("{}:{}", opt.bind_host, opt.port); - let addr = addr.parse().map_err(|e: std::net::AddrParseError| { + let address = addr.parse().map_err(|e: std::net::AddrParseError| { BallistaError::Configuration(e.to_string()) })?; @@ -174,9 +177,12 @@ pub async fn start_executor_process( opt.concurrent_tasks }; - info!("Running with config:"); - info!("work_dir: {}", work_dir); - info!("concurrent_tasks: {}", concurrent_tasks); + info!( + "Executor starting ... (Datafusion Ballista {})", + BALLISTA_VERSION + ); + info!("Executor working directory: {}", work_dir); + info!("Executor number of concurrent tasks: {}", concurrent_tasks); // assign this executor an unique ID let executor_id = Uuid::new_v4().to_string(); @@ -261,16 +267,16 @@ pub async fn start_executor_process( "Could not connect to scheduler".to_string(), ) }) { - Ok(conn) => { + Ok(connection) => { info!("Connected to scheduler at {}", scheduler_url); - x = Some(conn); + x = Some(connection); } Err(e) => { warn!( "Failed to connect to scheduler at {} ({}); retrying ...", scheduler_url, e ); - std::thread::sleep(time::Duration::from_millis(500)); + tokio::time::sleep(time::Duration::from_millis(500)).await; } } } @@ -290,13 +296,15 @@ pub async fn start_executor_process( let job_data_ttl_seconds = opt.job_data_ttl_seconds; // Graceful shutdown notification - let shutdown_noti = ShutdownNotifier::new(); + let shutdown_notification = ShutdownNotifier::new(); if opt.job_data_clean_up_interval_seconds > 0 { let mut interval_time = time::interval(Duration::from_secs(opt.job_data_clean_up_interval_seconds)); - let mut shuffle_cleaner_shutdown = shutdown_noti.subscribe_for_shutdown(); - let shuffle_cleaner_complete = shutdown_noti.shutdown_complete_tx.clone(); + + let mut shuffle_cleaner_shutdown = shutdown_notification.subscribe_for_shutdown(); + let shuffle_cleaner_complete = shutdown_notification.shutdown_complete_tx.clone(); + tokio::spawn(async move { // As long as the shutdown notification has not been received while !shuffle_cleaner_shutdown.is_shutdown() { @@ -338,7 +346,7 @@ pub async fn start_executor_process( executor.clone(), default_codec, stop_send, - &shutdown_noti, + &shutdown_notification, ) .await?, ); @@ -351,10 +359,19 @@ pub async fn start_executor_process( ))); } }; - service_handlers.push(tokio::spawn(flight_server_run( - addr, - shutdown_noti.subscribe_for_shutdown(), - ))); + let shutdown = shutdown_notification.subscribe_for_shutdown(); + let override_flight = opt.override_arrow_flight_service.clone(); + + service_handlers.push(match override_flight { + None => { + info!("Starting built-in arrow flight service"); + flight_server_task(address, shutdown).await + } + Some(flight_provider) => { + info!("Starting custom, user provided, arrow flight service"); + (flight_provider)(address, shutdown) + } + }); let tasks_drained = TasksDrainedFuture(executor); @@ -436,7 +453,7 @@ pub async fn start_executor_process( shutdown_complete_tx, notify_shutdown, .. - } = shutdown_noti; + } = shutdown_notification; // When `notify_shutdown` is dropped, all components which have `subscribe`d will // receive the shutdown signal and can exit @@ -451,25 +468,21 @@ pub async fn start_executor_process( } // Arrow flight service -async fn flight_server_run( - addr: SocketAddr, +async fn flight_server_task( + address: SocketAddr, mut grpc_shutdown: Shutdown, -) -> Result<(), BallistaError> { - let service = BallistaFlightService::new(); - let server = FlightServiceServer::new(service); - info!( - "Ballista v{} Rust Executor Flight Server listening on {:?}", - BALLISTA_VERSION, addr - ); - - let shutdown_signal = grpc_shutdown.recv(); - let server_future = create_grpc_server() - .add_service(server) - .serve_with_shutdown(addr, shutdown_signal); - - server_future.await.map_err(|e| { - error!("Tonic error, Could not start Executor Flight Server."); - BallistaError::TonicError(e) +) -> JoinHandle<Result<(), BallistaError>> { + tokio::spawn(async move { + info!("Built-in arrow flight server listening on: {:?}", address); + + let server_future = create_grpc_server() + .add_service(FlightServiceServer::new(BallistaFlightService::new())) + .serve_with_shutdown(address, grpc_shutdown.recv()); + + server_future.await.map_err(|e| { + error!("Could not start built-in arrow flight server."); + BallistaError::TonicError(e) + }) }) } @@ -642,4 +655,36 @@ mod tests { let count2 = fs::read_dir(work_dir.clone()).unwrap().count(); assert_eq!(count2, 0); } + + #[tokio::test] + async fn test_arrow_flight_provider_ergonomics() { + let config = crate::executor_process::ExecutorProcessConfig { + override_arrow_flight_service: Some(std::sync::Arc::new( + move |address, mut grpc_shutdown| { + tokio::spawn(async move { + log::info!( + "custom arrow flight server listening on: {:?}", + address + ); + + let server_future = ballista_core::utils::create_grpc_server() + .add_service( + arrow_flight::flight_service_server::FlightServiceServer::new( + crate::flight_service::BallistaFlightService::new(), + ), + ) + .serve_with_shutdown(address, grpc_shutdown.recv()); + + server_future.await.map_err(|e| { + log::error!("Could not start built-in arrow flight server."); + ballista_core::error::BallistaError::TonicError(e) + }) + }) + }, + )), + ..Default::default() + }; + + assert!(config.override_arrow_flight_service.is_some()); + } } diff --git a/ballista/executor/src/lib.rs b/ballista/executor/src/lib.rs index 23e68f85..005c8049 100644 --- a/ballista/executor/src/lib.rs +++ b/ballista/executor/src/lib.rs @@ -33,18 +33,33 @@ pub mod terminate; mod cpu_bound_executor; mod standalone; +use ballista_core::error::BallistaError; +use std::net::SocketAddr; + pub use standalone::new_standalone_executor; pub use standalone::new_standalone_executor_from_builder; pub use standalone::new_standalone_executor_from_state; use log::info; +use crate::shutdown::Shutdown; use ballista_core::serde::protobuf::{ task_status, FailedTask, OperatorMetricsSet, ShuffleWritePartition, SuccessfulTask, TaskStatus, }; use ballista_core::serde::scheduler::PartitionId; +/// [ArrowFlightServerProvider] provides a function which creates a new Arrow Flight server. +/// +/// The function should take two arguments: +/// [SocketAddr] - the address to bind the server to +/// [Shutdown] - a shutdown signal to gracefully shutdown the server +/// Returns a [tokio::task::JoinHandle] which will be registered as service handler +/// +pub type ArrowFlightServerProvider = dyn Fn(SocketAddr, Shutdown) -> tokio::task::JoinHandle<Result<(), BallistaError>> + + Send + + Sync; + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct TaskExecutionTimes { launch_time: u64, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org